|
|
|
|
@@ -66,6 +66,8 @@ TLS_HANDSHAKE_LEN = 1 + 2 + 2 + 512
|
|
|
|
|
PROTO_TAG_POS = 56
|
|
|
|
|
DC_IDX_POS = 60
|
|
|
|
|
|
|
|
|
|
MIN_CERT_LEN = 1024
|
|
|
|
|
|
|
|
|
|
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
|
|
|
|
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
|
|
|
|
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
|
|
|
|
@@ -76,6 +78,7 @@ PADDING_FILLER = b"\x04\x00\x00\x00"
|
|
|
|
|
MIN_MSG_LEN = 12
|
|
|
|
|
MAX_MSG_LEN = 2 ** 24
|
|
|
|
|
|
|
|
|
|
STAT_DURATION_BUCKETS = [0.1, 0.5, 1, 2, 5, 15, 60, 300, 600, 1800, 2**31 - 1]
|
|
|
|
|
|
|
|
|
|
my_ip_info = {"ipv4": None, "ipv6": None}
|
|
|
|
|
used_handshakes = collections.OrderedDict()
|
|
|
|
|
@@ -83,6 +86,11 @@ disable_middle_proxy = False
|
|
|
|
|
is_time_skewed = False
|
|
|
|
|
fake_cert_len = random.randrange(1024, 4096)
|
|
|
|
|
mask_host_cached_ip = None
|
|
|
|
|
last_clients_with_time_skew = {}
|
|
|
|
|
last_clients_with_first_pkt_error = collections.Counter()
|
|
|
|
|
last_clients_with_same_handshake = collections.Counter()
|
|
|
|
|
proxy_start_time = 0
|
|
|
|
|
proxy_links = []
|
|
|
|
|
|
|
|
|
|
config = {}
|
|
|
|
|
|
|
|
|
|
@@ -103,6 +111,10 @@ def init_config():
|
|
|
|
|
conf_dict["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
|
|
|
|
if len(sys.argv) > 3:
|
|
|
|
|
conf_dict["AD_TAG"] = sys.argv[3]
|
|
|
|
|
if len(sys.argv) > 4:
|
|
|
|
|
conf_dict["TLS_DOMAIN"] = sys.argv[4]
|
|
|
|
|
conf_dict["TLS_ONLY"] = True
|
|
|
|
|
conf_dict["SECURE_ONLY"] = True
|
|
|
|
|
|
|
|
|
|
conf_dict = {k: v for k, v in conf_dict.items() if k.isupper()}
|
|
|
|
|
|
|
|
|
|
@@ -142,6 +154,16 @@ def init_config():
|
|
|
|
|
# the next host's port to forward bad clients
|
|
|
|
|
conf_dict.setdefault("MASK_PORT", 443)
|
|
|
|
|
|
|
|
|
|
# use upstream SOCKS5 proxy
|
|
|
|
|
conf_dict.setdefault("SOCKS5_HOST", None)
|
|
|
|
|
conf_dict.setdefault("SOCKS5_PORT", None)
|
|
|
|
|
conf_dict.setdefault("SOCKS5_USER", None)
|
|
|
|
|
conf_dict.setdefault("SOCKS5_PASS", None)
|
|
|
|
|
|
|
|
|
|
if conf_dict["SOCKS5_HOST"] and conf_dict["SOCKS5_PORT"]:
|
|
|
|
|
# Disable the middle proxy if using socks, they are not compatible
|
|
|
|
|
conf_dict["USE_MIDDLE_PROXY"] = False
|
|
|
|
|
|
|
|
|
|
# user tcp connection limits, the mapping from name to the integer limit
|
|
|
|
|
# one client can create many tcp connections, up to 8
|
|
|
|
|
conf_dict.setdefault("USER_MAX_TCP_CONNS", {})
|
|
|
|
|
@@ -156,10 +178,10 @@ def init_config():
|
|
|
|
|
conf_dict.setdefault("USER_DATA_QUOTA", {})
|
|
|
|
|
|
|
|
|
|
# length of used handshake randoms for active fingerprinting protection, zero to disable
|
|
|
|
|
conf_dict.setdefault("REPLAY_CHECK_LEN", 32768)
|
|
|
|
|
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
|
|
|
|
|
|
|
|
|
|
# block bad first packets to even more protect against replay-based fingerprinting
|
|
|
|
|
conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", True)
|
|
|
|
|
conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", not conf_dict["TLS_ONLY"])
|
|
|
|
|
|
|
|
|
|
# delay in seconds between stats printing
|
|
|
|
|
conf_dict.setdefault("STATS_PRINT_PERIOD", 600)
|
|
|
|
|
@@ -201,10 +223,43 @@ def init_config():
|
|
|
|
|
# listen unix socket
|
|
|
|
|
conf_dict.setdefault("LISTEN_UNIX_SOCK", "")
|
|
|
|
|
|
|
|
|
|
# prometheus exporter listen port, use some random port here
|
|
|
|
|
conf_dict.setdefault("METRICS_PORT", None)
|
|
|
|
|
|
|
|
|
|
# prometheus listen addr ipv4
|
|
|
|
|
conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV4", "0.0.0.0")
|
|
|
|
|
|
|
|
|
|
# prometheus listen addr ipv6
|
|
|
|
|
conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV6", None)
|
|
|
|
|
|
|
|
|
|
# prometheus scrapers whitelist
|
|
|
|
|
conf_dict.setdefault("METRICS_WHITELIST", ["127.0.0.1", "::1"])
|
|
|
|
|
|
|
|
|
|
# export proxy link to prometheus
|
|
|
|
|
conf_dict.setdefault("METRICS_EXPORT_LINKS", False)
|
|
|
|
|
|
|
|
|
|
# default prefix for metrics
|
|
|
|
|
conf_dict.setdefault("METRICS_PREFIX", "mtprotoproxy_")
|
|
|
|
|
|
|
|
|
|
# allow access to config by attributes
|
|
|
|
|
config = type("config", (dict,), conf_dict)(conf_dict)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def apply_upstream_proxy_settings():
|
|
|
|
|
# apply socks settings in place
|
|
|
|
|
if config.SOCKS5_HOST and config.SOCKS5_PORT:
|
|
|
|
|
import socks
|
|
|
|
|
print_err("Socket-proxy mode activated, it is incompatible with advertising and uvloop")
|
|
|
|
|
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS5, config.SOCKS5_HOST, config.SOCKS5_PORT,
|
|
|
|
|
username=config.SOCKS5_USER, password=config.SOCKS5_PASS)
|
|
|
|
|
if not hasattr(socket, "origsocket"):
|
|
|
|
|
socket.origsocket = socket.socket
|
|
|
|
|
socket.socket = socks.socksocket
|
|
|
|
|
elif hasattr(socket, "origsocket"):
|
|
|
|
|
socket.socket = socket.origsocket
|
|
|
|
|
del socket.origsocket
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def try_use_cryptography_module():
|
|
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
@@ -294,24 +349,45 @@ def print_err(*params):
|
|
|
|
|
|
|
|
|
|
def init_stats():
|
|
|
|
|
global stats
|
|
|
|
|
stats = {user: collections.Counter() for user in config.USERS}
|
|
|
|
|
global user_stats
|
|
|
|
|
|
|
|
|
|
stats = collections.Counter()
|
|
|
|
|
user_stats = {user: collections.Counter() for user in config.USERS}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
|
|
|
|
|
def init_proxy_start_time():
|
|
|
|
|
global proxy_start_time
|
|
|
|
|
proxy_start_time = time.time()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_stats(**kw_stats):
|
|
|
|
|
global stats
|
|
|
|
|
stats.update(**kw_stats)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_user_stats(user, **kw_stats):
|
|
|
|
|
global user_stats
|
|
|
|
|
|
|
|
|
|
if user not in user_stats:
|
|
|
|
|
user_stats[user] = collections.Counter()
|
|
|
|
|
user_stats[user].update(**kw_stats)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_durations(duration):
|
|
|
|
|
global stats
|
|
|
|
|
|
|
|
|
|
if user not in stats:
|
|
|
|
|
stats[user] = collections.Counter()
|
|
|
|
|
for bucket in STAT_DURATION_BUCKETS:
|
|
|
|
|
if duration <= bucket:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
stats[user].update(connects=connects, curr_connects=curr_connects,
|
|
|
|
|
octets=octets, msgs=msgs)
|
|
|
|
|
update_stats(**{"connects_with_duration_le_%s" % str(bucket): 1})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_curr_connects_count():
|
|
|
|
|
global stats
|
|
|
|
|
global user_stats
|
|
|
|
|
|
|
|
|
|
all_connects = 0
|
|
|
|
|
for user, stat in stats.items():
|
|
|
|
|
for user, stat in user_stats.items():
|
|
|
|
|
all_connects += stat["curr_connects"]
|
|
|
|
|
return all_connects
|
|
|
|
|
|
|
|
|
|
@@ -360,6 +436,60 @@ class MyRandom(random.Random):
|
|
|
|
|
myrandom = MyRandom()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TgConnectionPool:
|
|
|
|
|
MAX_CONNS_IN_POOL = 64
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.pools = {}
|
|
|
|
|
|
|
|
|
|
async def open_tg_connection(self, host, port, init_func=None):
|
|
|
|
|
task = asyncio.open_connection(host, port, limit=get_to_clt_bufsize())
|
|
|
|
|
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=config.TG_CONNECT_TIMEOUT)
|
|
|
|
|
|
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
|
|
|
|
|
|
|
|
|
if init_func:
|
|
|
|
|
return await asyncio.wait_for(init_func(host, port, reader_tgt, writer_tgt),
|
|
|
|
|
timeout=config.TG_CONNECT_TIMEOUT)
|
|
|
|
|
return reader_tgt, writer_tgt
|
|
|
|
|
|
|
|
|
|
def register_host_port(self, host, port, init_func):
|
|
|
|
|
if (host, port, init_func) not in self.pools:
|
|
|
|
|
self.pools[(host, port, init_func)] = []
|
|
|
|
|
|
|
|
|
|
while len(self.pools[(host, port, init_func)]) < TgConnectionPool.MAX_CONNS_IN_POOL:
|
|
|
|
|
connect_task = asyncio.ensure_future(self.open_tg_connection(host, port, init_func))
|
|
|
|
|
self.pools[(host, port, init_func)].append(connect_task)
|
|
|
|
|
|
|
|
|
|
async def get_connection(self, host, port, init_func=None):
|
|
|
|
|
self.register_host_port(host, port, init_func)
|
|
|
|
|
|
|
|
|
|
ret = None
|
|
|
|
|
for task in self.pools[(host, port, init_func)][::]:
|
|
|
|
|
if task.done():
|
|
|
|
|
if task.exception():
|
|
|
|
|
self.pools[(host, port, init_func)].remove(task)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
reader, writer, *other = task.result()
|
|
|
|
|
if writer.transport.is_closing():
|
|
|
|
|
self.pools[(host, port, init_func)].remove(task)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if not ret:
|
|
|
|
|
self.pools[(host, port, init_func)].remove(task)
|
|
|
|
|
ret = (reader, writer, *other)
|
|
|
|
|
|
|
|
|
|
self.register_host_port(host, port, init_func)
|
|
|
|
|
if ret:
|
|
|
|
|
return ret
|
|
|
|
|
return await self.open_tg_connection(host, port, init_func)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tg_connection_pool = TgConnectionPool()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LayeredStreamReaderBase:
|
|
|
|
|
__slots__ = ("upstream", )
|
|
|
|
|
|
|
|
|
|
@@ -825,6 +955,8 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
|
|
|
|
|
|
|
|
|
global mask_host_cached_ip
|
|
|
|
|
|
|
|
|
|
update_stats(connects_bad=1)
|
|
|
|
|
|
|
|
|
|
if writer_clt.transport.is_closing():
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
@@ -899,6 +1031,8 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
|
|
|
|
|
|
|
|
|
async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
|
|
|
|
global used_handshakes
|
|
|
|
|
global last_clients_with_time_skew
|
|
|
|
|
global last_clients_with_same_handshake
|
|
|
|
|
global fake_cert_len
|
|
|
|
|
|
|
|
|
|
TIME_SKEW_MIN = -20 * 60
|
|
|
|
|
@@ -910,6 +1044,7 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
|
|
|
|
TLS_APP_HTTP2_HDR = b"\x17" + TLS_VERS
|
|
|
|
|
|
|
|
|
|
DIGEST_LEN = 32
|
|
|
|
|
DIGEST_HALFLEN = 16
|
|
|
|
|
DIGEST_POS = 11
|
|
|
|
|
|
|
|
|
|
SESSION_ID_LEN_POS = DIGEST_POS + DIGEST_LEN
|
|
|
|
|
@@ -920,8 +1055,8 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
|
|
|
|
|
|
|
|
|
digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN]
|
|
|
|
|
|
|
|
|
|
if digest in used_handshakes:
|
|
|
|
|
print_err("Active TLS fingerprinting detected from %s, handling it" % peer[0])
|
|
|
|
|
if digest[:DIGEST_HALFLEN] in used_handshakes:
|
|
|
|
|
last_clients_with_same_handshake[peer[0]] += 1
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
sess_id_len = handshake[SESSION_ID_LEN_POS]
|
|
|
|
|
@@ -944,8 +1079,7 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
|
|
|
|
# some clients fail to read unix time and send the time since boot instead
|
|
|
|
|
client_time_is_small = timestamp < 60*60*24*1000
|
|
|
|
|
if not client_time_is_ok and not is_time_skewed and not client_time_is_small:
|
|
|
|
|
print_err("Client with time skew detected from %s, can be a replay-attack" % peer[0])
|
|
|
|
|
print_err("The clocks were %d minutes behind" % ((time.time() - timestamp) // 60))
|
|
|
|
|
last_clients_with_time_skew[peer[0]] = (time.time() - timestamp) // 60
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
http_data = myrandom.getrandbytes(fake_cert_len)
|
|
|
|
|
@@ -967,7 +1101,7 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
|
|
|
|
if config.REPLAY_CHECK_LEN > 0:
|
|
|
|
|
while len(used_handshakes) >= config.REPLAY_CHECK_LEN:
|
|
|
|
|
used_handshakes.popitem(last=False)
|
|
|
|
|
used_handshakes[digest] = True
|
|
|
|
|
used_handshakes[digest[:DIGEST_HALFLEN]] = True
|
|
|
|
|
|
|
|
|
|
reader = FakeTLSStreamReader(reader)
|
|
|
|
|
writer = FakeTLSStreamWriter(writer)
|
|
|
|
|
@@ -997,7 +1131,7 @@ async def handle_proxy_protocol(reader, peer=None):
|
|
|
|
|
if proxy_fam in (PROXY_TCP4, PROXY_TCP6):
|
|
|
|
|
if len(proxy_addr) == 4:
|
|
|
|
|
src_addr = proxy_addr[0].decode('ascii')
|
|
|
|
|
src_port = proxy_addr[2].decode('ascii')
|
|
|
|
|
src_port = int(proxy_addr[2].decode('ascii'))
|
|
|
|
|
return (src_addr, src_port)
|
|
|
|
|
elif proxy_fam == PROXY_UNKNOWN:
|
|
|
|
|
return peer
|
|
|
|
|
@@ -1033,6 +1167,7 @@ async def handle_proxy_protocol(reader, peer=None):
|
|
|
|
|
|
|
|
|
|
async def handle_handshake(reader, writer):
|
|
|
|
|
global used_handshakes
|
|
|
|
|
global last_clients_with_same_handshake
|
|
|
|
|
|
|
|
|
|
TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03"
|
|
|
|
|
|
|
|
|
|
@@ -1078,7 +1213,7 @@ async def handle_handshake(reader, writer):
|
|
|
|
|
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
|
|
|
|
|
|
|
|
|
|
if dec_prekey_and_iv in used_handshakes:
|
|
|
|
|
print_err("Active fingerprinting detected from %s, handling it" % peer[0])
|
|
|
|
|
last_clients_with_same_handshake[peer[0]] += 1
|
|
|
|
|
await handle_bad_client(reader, writer, handshake)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
@@ -1115,21 +1250,6 @@ async def handle_handshake(reader, writer):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
|
|
|
|
|
for attempt in range(max_attempts-1):
|
|
|
|
|
try:
|
|
|
|
|
task = asyncio.open_connection(addr, port, limit=limit)
|
|
|
|
|
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
|
|
|
|
return reader_tgt, writer_tgt
|
|
|
|
|
except (OSError, asyncio.TimeoutError):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# the last attempt
|
|
|
|
|
task = asyncio.open_connection(addr, port, limit=limit)
|
|
|
|
|
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
|
|
|
|
return reader_tgt, writer_tgt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
|
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
|
|
|
|
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
|
|
|
|
@@ -1138,6 +1258,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
|
RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"]
|
|
|
|
|
|
|
|
|
|
global my_ip_info
|
|
|
|
|
global tg_connection_pool
|
|
|
|
|
|
|
|
|
|
dc_idx = abs(dc_idx) - 1
|
|
|
|
|
|
|
|
|
|
@@ -1151,18 +1272,17 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
|
dc = TG_DATACENTERS_V4[dc_idx]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
reader_tgt, writer_tgt = await open_connection_tryer(
|
|
|
|
|
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=config.TG_CONNECT_TIMEOUT)
|
|
|
|
|
reader_tgt, writer_tgt = await tg_connection_pool.get_connection(dc, TG_DATACENTER_PORT)
|
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
|
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
|
|
|
|
return False
|
|
|
|
|
except ConnectionAbortedError as E:
|
|
|
|
|
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
|
|
|
|
|
return False
|
|
|
|
|
except (OSError, asyncio.TimeoutError) as E:
|
|
|
|
|
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
rnd = bytearray(myrandom.getrandbytes(HANDSHAKE_LEN))
|
|
|
|
|
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
|
|
|
|
|
@@ -1225,49 +1345,21 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
|
|
|
|
|
return key, iv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
async def middleproxy_handshake(host, port, reader_tgt, writer_tgt):
|
|
|
|
|
""" The most logic of middleproxy handshake, launched in pool """
|
|
|
|
|
START_SEQ_NO = -2
|
|
|
|
|
NONCE_LEN = 16
|
|
|
|
|
|
|
|
|
|
RPC_NONCE = b"\xaa\x87\xcb\x7a"
|
|
|
|
|
RPC_HANDSHAKE = b"\xf5\xee\x82\x76"
|
|
|
|
|
RPC_NONCE = b"\xaa\x87\xcb\x7a"
|
|
|
|
|
# pass as consts to simplify code
|
|
|
|
|
RPC_FLAGS = b"\x00\x00\x00\x00"
|
|
|
|
|
CRYPTO_AES = b"\x01\x00\x00\x00"
|
|
|
|
|
|
|
|
|
|
RPC_NONCE_ANS_LEN = 32
|
|
|
|
|
RPC_HANDSHAKE_ANS_LEN = 32
|
|
|
|
|
|
|
|
|
|
# pass as consts to simplify code
|
|
|
|
|
RPC_FLAGS = b"\x00\x00\x00\x00"
|
|
|
|
|
|
|
|
|
|
global my_ip_info
|
|
|
|
|
|
|
|
|
|
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
|
|
|
|
|
use_ipv6_clt = (":" in cl_ip)
|
|
|
|
|
|
|
|
|
|
if use_ipv6_tg:
|
|
|
|
|
if dc_idx not in TG_MIDDLE_PROXIES_V6:
|
|
|
|
|
return False
|
|
|
|
|
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
|
|
|
|
|
else:
|
|
|
|
|
if dc_idx not in TG_MIDDLE_PROXIES_V4:
|
|
|
|
|
return False
|
|
|
|
|
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
|
|
|
|
|
timeout=config.TG_CONNECT_TIMEOUT)
|
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
|
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
|
|
|
|
|
return False
|
|
|
|
|
except (OSError, asyncio.TimeoutError) as E:
|
|
|
|
|
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
|
|
|
|
|
|
|
|
|
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
|
|
|
|
|
|
|
|
|
key_selector = PROXY_SECRET[:4]
|
|
|
|
|
crypto_ts = int.to_bytes(int(time.time()) % (256**4), 4, "little")
|
|
|
|
|
|
|
|
|
|
@@ -1278,24 +1370,25 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
writer_tgt.write(msg)
|
|
|
|
|
await writer_tgt.drain()
|
|
|
|
|
|
|
|
|
|
old_reader = reader_tgt
|
|
|
|
|
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
|
|
|
|
ans = await reader_tgt.read(get_to_clt_bufsize())
|
|
|
|
|
|
|
|
|
|
if len(ans) != RPC_NONCE_ANS_LEN:
|
|
|
|
|
return False
|
|
|
|
|
raise ConnectionAbortedError("bad rpc answer length")
|
|
|
|
|
|
|
|
|
|
rpc_type, rpc_key_selector, rpc_schema, rpc_crypto_ts, rpc_nonce = (
|
|
|
|
|
ans[:4], ans[4:8], ans[8:12], ans[12:16], ans[16:32]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if rpc_type != RPC_NONCE or rpc_key_selector != key_selector or rpc_schema != CRYPTO_AES:
|
|
|
|
|
return False
|
|
|
|
|
raise ConnectionAbortedError("bad rpc answer")
|
|
|
|
|
|
|
|
|
|
# get keys
|
|
|
|
|
tg_ip, tg_port = writer_tgt.upstream.get_extra_info('peername')[:2]
|
|
|
|
|
my_ip, my_port = writer_tgt.upstream.get_extra_info('sockname')[:2]
|
|
|
|
|
|
|
|
|
|
use_ipv6_tg = (":" in tg_ip)
|
|
|
|
|
|
|
|
|
|
if not use_ipv6_tg:
|
|
|
|
|
if my_ip_info["ipv4"]:
|
|
|
|
|
# prefer global ip settings to work behind NAT
|
|
|
|
|
@@ -1346,11 +1439,42 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
|
|
|
|
|
handshake_ans = await reader_tgt.read(1)
|
|
|
|
|
if len(handshake_ans) != RPC_HANDSHAKE_ANS_LEN:
|
|
|
|
|
return False
|
|
|
|
|
raise ConnectionAbortedError("bad rpc handshake answer length")
|
|
|
|
|
|
|
|
|
|
handshake_type, handshake_flags, handshake_sender_pid, handshake_peer_pid = (
|
|
|
|
|
handshake_ans[:4], handshake_ans[4:8], handshake_ans[8:20], handshake_ans[20:32])
|
|
|
|
|
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
|
|
|
|
raise ConnectionAbortedError("bad rpc handshake answer")
|
|
|
|
|
|
|
|
|
|
return reader_tgt, writer_tgt, my_ip, my_port
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
global my_ip_info
|
|
|
|
|
global tg_connection_pool
|
|
|
|
|
|
|
|
|
|
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
|
|
|
|
|
|
|
|
|
|
if use_ipv6_tg:
|
|
|
|
|
if dc_idx not in TG_MIDDLE_PROXIES_V6:
|
|
|
|
|
return False
|
|
|
|
|
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
|
|
|
|
|
else:
|
|
|
|
|
if dc_idx not in TG_MIDDLE_PROXIES_V4:
|
|
|
|
|
return False
|
|
|
|
|
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
ret = await tg_connection_pool.get_connection(addr, port, middleproxy_handshake)
|
|
|
|
|
reader_tgt, writer_tgt, my_ip, my_port = ret
|
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
|
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
|
|
|
|
|
return False
|
|
|
|
|
except ConnectionAbortedError as E:
|
|
|
|
|
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
|
|
|
|
|
return False
|
|
|
|
|
except (OSError, asyncio.TimeoutError) as E:
|
|
|
|
|
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)
|
|
|
|
|
@@ -1364,10 +1488,13 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT)
|
|
|
|
|
set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize())
|
|
|
|
|
|
|
|
|
|
update_stats(connects_all=1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
|
|
|
|
|
timeout=config.CLIENT_HANDSHAKE_TIMEOUT)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
update_stats(handshake_timeouts=1)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not clt_data:
|
|
|
|
|
@@ -1376,7 +1503,7 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv, peer = clt_data
|
|
|
|
|
cl_ip, cl_port = peer
|
|
|
|
|
|
|
|
|
|
update_stats(user, connects=1)
|
|
|
|
|
update_user_stats(user, connects=1)
|
|
|
|
|
|
|
|
|
|
connect_directly = (not config.USE_MIDDLE_PROXY or disable_middle_proxy)
|
|
|
|
|
|
|
|
|
|
@@ -1419,6 +1546,7 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_if_first_pkt_bad=False):
|
|
|
|
|
global last_clients_with_first_pkt_error
|
|
|
|
|
is_first_pkt = True
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
|
|
|
|
@@ -1434,7 +1562,7 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
|
|
|
|
|
ERR_PKT_DATA = b'l\xfe\xff\xff'
|
|
|
|
|
if block_if_first_pkt_bad and data == ERR_PKT_DATA:
|
|
|
|
|
print_err("Active fingerprinting detected from %s, dropping it" % cl_ip)
|
|
|
|
|
last_clients_with_first_pkt_error[cl_ip] += 1
|
|
|
|
|
|
|
|
|
|
wr.write_eof()
|
|
|
|
|
await wr.drain()
|
|
|
|
|
@@ -1445,10 +1573,10 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
await wr.drain()
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
update_stats(user, octets=len(data), msgs=1)
|
|
|
|
|
update_user_stats(user, octets=len(data), msgs=1)
|
|
|
|
|
wr.write(data, extra)
|
|
|
|
|
await wr.drain()
|
|
|
|
|
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
|
|
|
|
except (OSError, asyncio.IncompleteReadError) as e:
|
|
|
|
|
# print_err(e)
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@@ -1458,11 +1586,11 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
|
|
|
|
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
|
|
|
|
|
|
|
|
|
update_stats(user, curr_connects=1)
|
|
|
|
|
update_user_stats(user, curr_connects=1)
|
|
|
|
|
|
|
|
|
|
tcp_limit_hit = (
|
|
|
|
|
user in config.USER_MAX_TCP_CONNS and
|
|
|
|
|
stats[user]["curr_connects"] > config.USER_MAX_TCP_CONNS[user]
|
|
|
|
|
user_stats[user]["curr_connects"] > config.USER_MAX_TCP_CONNS[user]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
user_expired = (
|
|
|
|
|
@@ -1472,13 +1600,15 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
|
|
|
|
|
user_data_quota_hit = (
|
|
|
|
|
user in config.USER_DATA_QUOTA and
|
|
|
|
|
stats[user]["octets"] > config.USER_DATA_QUOTA[user]
|
|
|
|
|
user_stats[user]["octets"] > config.USER_DATA_QUOTA[user]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit):
|
|
|
|
|
start = time.time()
|
|
|
|
|
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
|
|
|
|
|
update_durations(time.time() - start)
|
|
|
|
|
|
|
|
|
|
update_stats(user, curr_connects=-1)
|
|
|
|
|
update_user_stats(user, curr_connects=-1)
|
|
|
|
|
|
|
|
|
|
task_tg_to_clt.cancel()
|
|
|
|
|
task_clt_to_tg.cancel()
|
|
|
|
|
@@ -1489,7 +1619,9 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
|
async def handle_client_wrapper(reader, writer):
|
|
|
|
|
try:
|
|
|
|
|
await handle_client(reader, writer)
|
|
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
|
|
|
|
except (asyncio.IncompleteReadError, asyncio.CancelledError):
|
|
|
|
|
pass
|
|
|
|
|
except (ConnectionResetError, TimeoutError):
|
|
|
|
|
pass
|
|
|
|
|
except Exception:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
@@ -1497,18 +1629,139 @@ async def handle_client_wrapper(reader, writer):
|
|
|
|
|
writer.transport.abort()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stats_printer():
|
|
|
|
|
def make_metrics_pkt(metrics):
|
|
|
|
|
pkt_body_list = []
|
|
|
|
|
used_names = set()
|
|
|
|
|
|
|
|
|
|
for name, m_type, desc, val in metrics:
|
|
|
|
|
name = config.METRICS_PREFIX + name
|
|
|
|
|
if name not in used_names:
|
|
|
|
|
pkt_body_list.append("# HELP %s %s" % (name, desc))
|
|
|
|
|
pkt_body_list.append("# TYPE %s %s" % (name, m_type))
|
|
|
|
|
used_names.add(name)
|
|
|
|
|
|
|
|
|
|
if isinstance(val, dict):
|
|
|
|
|
tags = []
|
|
|
|
|
for tag, tag_val in val.items():
|
|
|
|
|
if tag == "val":
|
|
|
|
|
continue
|
|
|
|
|
tag_val = tag_val.replace('"', r'\"')
|
|
|
|
|
tags.append('%s="%s"' % (tag, tag_val))
|
|
|
|
|
pkt_body_list.append("%s{%s} %s" % (name, ",".join(tags), val["val"]))
|
|
|
|
|
else:
|
|
|
|
|
pkt_body_list.append("%s %s" % (name, val))
|
|
|
|
|
pkt_body = "\n".join(pkt_body_list) + "\n"
|
|
|
|
|
|
|
|
|
|
pkt_header_list = []
|
|
|
|
|
pkt_header_list.append("HTTP/1.1 200 OK")
|
|
|
|
|
pkt_header_list.append("Content-Length: %d" % len(pkt_body))
|
|
|
|
|
pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8")
|
|
|
|
|
pkt_header_list.append("Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
|
|
|
|
|
|
|
|
|
|
pkt_header = "\r\n".join(pkt_header_list)
|
|
|
|
|
|
|
|
|
|
pkt = pkt_header + "\r\n\r\n" + pkt_body
|
|
|
|
|
return pkt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_metrics(reader, writer):
|
|
|
|
|
global stats
|
|
|
|
|
global user_stats
|
|
|
|
|
global my_ip_info
|
|
|
|
|
global proxy_start_time
|
|
|
|
|
global proxy_links
|
|
|
|
|
global last_clients_with_time_skew
|
|
|
|
|
global last_clients_with_first_pkt_error
|
|
|
|
|
global last_clients_with_same_handshake
|
|
|
|
|
|
|
|
|
|
client_ip = writer.get_extra_info("peername")[0]
|
|
|
|
|
if client_ip not in config.METRICS_WHITELIST:
|
|
|
|
|
writer.close()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
metrics = []
|
|
|
|
|
metrics.append(["uptime", "counter", "proxy uptime", time.time() - proxy_start_time])
|
|
|
|
|
metrics.append(["connects_bad", "counter", "connects with bad secret",
|
|
|
|
|
stats["connects_bad"]])
|
|
|
|
|
metrics.append(["connects_all", "counter", "incoming connects", stats["connects_all"]])
|
|
|
|
|
metrics.append(["handshake_timeouts", "counter", "number of timed out handshakes",
|
|
|
|
|
stats["handshake_timeouts"]])
|
|
|
|
|
|
|
|
|
|
if config.METRICS_EXPORT_LINKS:
|
|
|
|
|
for link in proxy_links:
|
|
|
|
|
link_as_metric = link.copy()
|
|
|
|
|
link_as_metric["val"] = 1
|
|
|
|
|
metrics.append(["proxy_link_info", "counter",
|
|
|
|
|
"the proxy link info", link_as_metric])
|
|
|
|
|
|
|
|
|
|
bucket_start = 0
|
|
|
|
|
for bucket in STAT_DURATION_BUCKETS:
|
|
|
|
|
bucket_end = bucket if bucket != STAT_DURATION_BUCKETS[-1] else "+Inf"
|
|
|
|
|
metric = {
|
|
|
|
|
"bucket": "%s-%s" % (bucket_start, bucket_end),
|
|
|
|
|
"val": stats["connects_with_duration_le_%s" % str(bucket)]
|
|
|
|
|
}
|
|
|
|
|
metrics.append(["connects_by_duration", "counter", "connects by duration", metric])
|
|
|
|
|
bucket_start = bucket_end
|
|
|
|
|
|
|
|
|
|
user_metrics_desc = [
|
|
|
|
|
["user_connects", "counter", "user connects", "connects"],
|
|
|
|
|
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
|
|
|
|
|
["user_octets", "counter", "octets proxied for user", "octets"],
|
|
|
|
|
["user_msgs", "counter", "msgs proxied for user", "msgs"],
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
|
|
|
|
|
for user, stat in user_stats.items():
|
|
|
|
|
metric = {"user": user, "val": stat[stat_key]}
|
|
|
|
|
metrics.append([m_name, m_type, m_desc, metric])
|
|
|
|
|
|
|
|
|
|
pkt = make_metrics_pkt(metrics)
|
|
|
|
|
writer.write(pkt.encode())
|
|
|
|
|
await writer.drain()
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
finally:
|
|
|
|
|
writer.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stats_printer():
|
|
|
|
|
global user_stats
|
|
|
|
|
global last_clients_with_time_skew
|
|
|
|
|
global last_clients_with_first_pkt_error
|
|
|
|
|
global last_clients_with_same_handshake
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
await asyncio.sleep(config.STATS_PRINT_PERIOD)
|
|
|
|
|
|
|
|
|
|
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
|
|
|
|
for user, stat in stats.items():
|
|
|
|
|
for user, stat in user_stats.items():
|
|
|
|
|
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
|
|
|
|
user, stat["connects"], stat["curr_connects"],
|
|
|
|
|
stat["octets"] / 1000000, stat["msgs"]))
|
|
|
|
|
print(flush=True)
|
|
|
|
|
|
|
|
|
|
if last_clients_with_time_skew:
|
|
|
|
|
print("Clients with time skew (possible replay-attackers):")
|
|
|
|
|
for ip, skew_minutes in last_clients_with_time_skew.items():
|
|
|
|
|
print("%s, clocks were %d minutes behind" % (ip, skew_minutes))
|
|
|
|
|
print(flush=True)
|
|
|
|
|
last_clients_with_time_skew.clear()
|
|
|
|
|
if last_clients_with_first_pkt_error:
|
|
|
|
|
print("Clients with error on the first packet (possible replay-attackers):")
|
|
|
|
|
for ip, times in last_clients_with_first_pkt_error.items():
|
|
|
|
|
print("%s, %d times" % (ip, times))
|
|
|
|
|
print(flush=True)
|
|
|
|
|
last_clients_with_first_pkt_error.clear()
|
|
|
|
|
if last_clients_with_same_handshake:
|
|
|
|
|
print("Clients with duplicate handshake (likely replay-attackers):")
|
|
|
|
|
for ip, times in last_clients_with_same_handshake.items():
|
|
|
|
|
print("%s, %d times" % (ip, times))
|
|
|
|
|
print(flush=True)
|
|
|
|
|
last_clients_with_same_handshake.clear()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def make_https_req(url, host="core.telegram.org"):
|
|
|
|
|
""" Make request, return resp body and headers. """
|
|
|
|
|
@@ -1599,8 +1852,14 @@ async def get_mask_host_cert_len():
|
|
|
|
|
task = get_encrypted_cert(config.MASK_HOST, config.MASK_PORT, config.TLS_DOMAIN)
|
|
|
|
|
cert = await asyncio.wait_for(task, timeout=GET_CERT_TIMEOUT)
|
|
|
|
|
if cert:
|
|
|
|
|
if len(cert) != fake_cert_len:
|
|
|
|
|
if len(cert) < MIN_CERT_LEN:
|
|
|
|
|
msg = ("The MASK_HOST %s returned several TLS records, this is not supported" %
|
|
|
|
|
config.MASK_HOST)
|
|
|
|
|
print_err(msg)
|
|
|
|
|
elif len(cert) != fake_cert_len:
|
|
|
|
|
fake_cert_len = len(cert)
|
|
|
|
|
print_err("Got cert from the MASK_HOST %s, its length is %d" %
|
|
|
|
|
(config.MASK_HOST, fake_cert_len))
|
|
|
|
|
else:
|
|
|
|
|
print_err("The MASK_HOST %s is not TLS 1.3 host, this is not recommended" %
|
|
|
|
|
config.MASK_HOST)
|
|
|
|
|
@@ -1753,6 +2012,7 @@ def init_ip_info():
|
|
|
|
|
|
|
|
|
|
def print_tg_info():
|
|
|
|
|
global my_ip_info
|
|
|
|
|
global proxy_links
|
|
|
|
|
|
|
|
|
|
print_default_warning = False
|
|
|
|
|
|
|
|
|
|
@@ -1766,17 +2026,23 @@ def print_tg_info():
|
|
|
|
|
if not ip_addrs:
|
|
|
|
|
ip_addrs = ["YOUR_IP"]
|
|
|
|
|
|
|
|
|
|
proxy_links = []
|
|
|
|
|
|
|
|
|
|
for user, secret in sorted(config.USERS.items(), key=lambda x: x[0]):
|
|
|
|
|
for ip in ip_addrs:
|
|
|
|
|
if not config.TLS_ONLY:
|
|
|
|
|
if not config.SECURE_ONLY:
|
|
|
|
|
params = {"server": ip, "port": config.PORT, "secret": secret}
|
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
|
|
|
|
classic_link = "tg://proxy?{}".format(params_encodeded)
|
|
|
|
|
proxy_links.append({"user": user, "link": classic_link})
|
|
|
|
|
print("{}: {}".format(user, classic_link), flush=True)
|
|
|
|
|
|
|
|
|
|
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
|
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
|
|
|
|
dd_link = "tg://proxy?{}".format(params_encodeded)
|
|
|
|
|
proxy_links.append({"user": user, "link": dd_link})
|
|
|
|
|
print("{}: {}".format(user, dd_link), flush=True)
|
|
|
|
|
|
|
|
|
|
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
|
|
|
|
|
# the base64 links is buggy on ios
|
|
|
|
|
@@ -1784,9 +2050,12 @@ def print_tg_info():
|
|
|
|
|
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
|
|
|
|
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
|
print("{}: tg://proxy?{} (new)".format(user, params_encodeded), flush=True)
|
|
|
|
|
tls_link = "tg://proxy?{}".format(params_encodeded)
|
|
|
|
|
proxy_links.append({"user": user, "link": tls_link})
|
|
|
|
|
print("{}: {}".format(user, tls_link), flush=True)
|
|
|
|
|
|
|
|
|
|
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef"]:
|
|
|
|
|
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef",
|
|
|
|
|
"00000000000000000000000000000001"]:
|
|
|
|
|
msg = "The default secret {} is used, this is not recommended".format(secret)
|
|
|
|
|
print(msg, flush=True)
|
|
|
|
|
random_secret = "".join(myrandom.choice("0123456789abcdef") for i in range(32))
|
|
|
|
|
@@ -1825,6 +2094,7 @@ def setup_signals():
|
|
|
|
|
if hasattr(signal, 'SIGUSR2'):
|
|
|
|
|
def reload_signal(signum, frame):
|
|
|
|
|
init_config()
|
|
|
|
|
apply_upstream_proxy_settings()
|
|
|
|
|
print("Config reloaded", flush=True, file=sys.stderr)
|
|
|
|
|
print_tg_info()
|
|
|
|
|
|
|
|
|
|
@@ -1832,9 +2102,13 @@ def setup_signals():
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def try_setup_uvloop():
|
|
|
|
|
if config.SOCKS5_HOST and config.SOCKS5_PORT:
|
|
|
|
|
# socks mode is not compatible with uvloop
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
import uvloop
|
|
|
|
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
|
|
|
|
print_err("Found uvloop, using it for optimal performance")
|
|
|
|
|
except ImportError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@@ -1881,6 +2155,7 @@ def main():
|
|
|
|
|
try_setup_uvloop()
|
|
|
|
|
|
|
|
|
|
init_stats()
|
|
|
|
|
init_proxy_start_time()
|
|
|
|
|
|
|
|
|
|
if sys.platform == "win32":
|
|
|
|
|
loop = asyncio.ProactorEventLoop()
|
|
|
|
|
@@ -1912,21 +2187,31 @@ def main():
|
|
|
|
|
|
|
|
|
|
if config.LISTEN_ADDR_IPV4:
|
|
|
|
|
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
|
|
|
|
|
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
|
|
|
|
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
|
|
|
|
|
servers.append(loop.run_until_complete(task))
|
|
|
|
|
|
|
|
|
|
if config.LISTEN_ADDR_IPV6 and socket.has_ipv6:
|
|
|
|
|
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV6, config.PORT,
|
|
|
|
|
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
|
|
|
|
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
|
|
|
|
|
servers.append(loop.run_until_complete(task))
|
|
|
|
|
|
|
|
|
|
if config.LISTEN_UNIX_SOCK and has_unix:
|
|
|
|
|
remove_unix_socket(config.LISTEN_UNIX_SOCK)
|
|
|
|
|
task = asyncio.start_unix_server(handle_client_wrapper, config.LISTEN_UNIX_SOCK,
|
|
|
|
|
limit=get_to_tg_bufsize(), loop=loop)
|
|
|
|
|
limit=get_to_tg_bufsize())
|
|
|
|
|
servers.append(loop.run_until_complete(task))
|
|
|
|
|
os.chmod(config.LISTEN_UNIX_SOCK, 0o666)
|
|
|
|
|
|
|
|
|
|
if config.METRICS_PORT is not None:
|
|
|
|
|
if config.METRICS_LISTEN_ADDR_IPV4:
|
|
|
|
|
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4,
|
|
|
|
|
config.METRICS_PORT)
|
|
|
|
|
servers.append(loop.run_until_complete(task))
|
|
|
|
|
if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6:
|
|
|
|
|
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6,
|
|
|
|
|
config.METRICS_PORT)
|
|
|
|
|
servers.append(loop.run_until_complete(task))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loop.run_forever()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
@@ -1947,6 +2232,7 @@ def main():
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
init_config()
|
|
|
|
|
apply_upstream_proxy_settings()
|
|
|
|
|
init_ip_info()
|
|
|
|
|
print_tg_info()
|
|
|
|
|
main()
|
|
|
|
|
|