diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ebfa636..29f420c 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -6,6 +6,9 @@ import urllib.parse import urllib.request import collections import time +import datetime +import hmac +import base64 import hashlib import random import binascii @@ -13,48 +16,231 @@ import sys import re import runpy import signal +import os +import stat +import traceback -try: - import uvloop - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -except ImportError: - pass + +TG_DATACENTER_PORT = 443 + +TG_DATACENTERS_V4 = [ + "149.154.175.50", "149.154.167.51", "149.154.175.100", + "149.154.167.91", "149.154.171.5" +] + +TG_DATACENTERS_V6 = [ + "2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a", + "2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a" +] + +# This list will be updated in the runtime +TG_MIDDLE_PROXIES_V4 = { + 1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)], + 2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)], + 3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)], + 4: [("91.108.4.136", 8888)], -4: [("149.154.165.109", 8888)], + 5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)] +} + +TG_MIDDLE_PROXIES_V6 = { + 1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)], + 2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)], + 3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)], + 4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)], + 5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)] +} + +PROXY_SECRET = bytes.fromhex( + "c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" + + "d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" + + "6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" + + "54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212" +) + +SKIP_LEN = 8 +PREKEY_LEN = 32 +KEY_LEN = 32 +IV_LEN = 16 +HANDSHAKE_LEN = 64 +TLS_HANDSHAKE_LEN = 1 + 2 + 2 + 512 +PROTO_TAG_POS = 56 +DC_IDX_POS = 60 + +PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef" +PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee" +PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd" + +CBC_PADDING = 16 +PADDING_FILLER = b"\x04\x00\x00\x00" + +MIN_MSG_LEN = 12 +MAX_MSG_LEN = 2 ** 24 + + +my_ip_info = {"ipv4": None, "ipv6": None} +used_handshakes = collections.OrderedDict() +disable_middle_proxy = False +is_time_skewed = False +fake_cert_len = random.randrange(1024, 4096) +mask_host_cached_ip = None + +config = {} + +if len(sys.argv) < 3: + random_secret = "".join(random.choice("0123456789abcdef") for i in range(32)) + ad_tag = "3c09c680b76ee91a4c25ad51f742267d" + print("Usage: mtprotoproxy [ad_tag] [tls_domain]") + print("Example: mtprotoproxy 443 %s %s google.com" % (random_secret, ad_tag)) + sys.exit(0) + + +def init_config(): + global config + # we use conf_dict to protect the original config from exceptions when reloading + if len(sys.argv) < 2: + conf_dict = runpy.run_module("config") + elif len(sys.argv) == 2: + # launch with own config + conf_dict = runpy.run_path(sys.argv[1]) + else: + # undocumented way of launching + conf_dict = {} + conf_dict["PORT"] = int(sys.argv[1]) + secrets = sys.argv[2].split(",") + conf_dict["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))} + if len(sys.argv) > 3: + conf_dict["AD_TAG"] = sys.argv[3] + if len(sys.argv) > 4: + conf_dict["TLS_DOMAIN"] = sys.argv[4] + conf_dict["TLS_ONLY"] = True + conf_dict["SECURE_ONLY"] = True + + conf_dict = {k: v for k, v in conf_dict.items() if k.isupper()} + + conf_dict.setdefault("PORT", 3256) + conf_dict.setdefault("USERS", {"tg": "00000000000000000000000000000000"}) + conf_dict["AD_TAG"] = bytes.fromhex(conf_dict.get("AD_TAG", "")) + + # load advanced settings + + # use middle proxy, necessary to show ad + conf_dict.setdefault("USE_MIDDLE_PROXY", len(conf_dict["AD_TAG"]) == 16) + + # if IPv6 avaliable, use it by default + conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6) + + # disables tg->client trafic reencryption, faster but less secure + conf_dict.setdefault("FAST_MODE", True) + + # doesn't allow to connect in not-secure mode + conf_dict.setdefault("SECURE_ONLY", False) + + # allows to connect in tls mode only + conf_dict.setdefault("TLS_ONLY", False) + + # accept incoming connections only with proxy protocol v1/v2, useful for nginx and haproxy + conf_dict.setdefault("PROXY_PROTOCOL", False) + + # set the tls domain for the proxy, has an influence only on starting message + conf_dict.setdefault("TLS_DOMAIN", "www.google.com") + + # enable proxying bad clients to some host + conf_dict.setdefault("MASK", True) + + # the next host to forward bad clients + conf_dict.setdefault("MASK_HOST", conf_dict["TLS_DOMAIN"]) + + # the next host's port to forward bad clients + conf_dict.setdefault("MASK_PORT", 443) + + # user tcp connection limits, the mapping from name to the integer limit + # one client can create many tcp connections, up to 8 + conf_dict.setdefault("USER_MAX_TCP_CONNS", {}) + + # expiration date for users in format of day/month/year + conf_dict.setdefault("USER_EXPIRATIONS", {}) + for user in conf_dict["USER_EXPIRATIONS"]: + expiration = datetime.datetime.strptime(conf_dict["USER_EXPIRATIONS"][user], "%d/%m/%Y") + conf_dict["USER_EXPIRATIONS"][user] = expiration + + # the data quota for user + conf_dict.setdefault("USER_DATA_QUOTA", {}) + + # length of used handshake randoms for active fingerprinting protection, zero to disable + conf_dict.setdefault("REPLAY_CHECK_LEN", 32768) + + # block bad first packets to even more protect against replay-based fingerprinting + conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", True) + + # delay in seconds between stats printing + conf_dict.setdefault("STATS_PRINT_PERIOD", 600) + + # delay in seconds between middle proxy info updates + conf_dict.setdefault("PROXY_INFO_UPDATE_PERIOD", 24*60*60) + + # delay in seconds between time getting, zero means disabled + conf_dict.setdefault("GET_TIME_PERIOD", 10*60) + + # delay in seconds between getting the length of certificate on the mask host + conf_dict.setdefault("GET_CERT_LEN_PERIOD", random.randrange(4*60*60, 6*60*60)) + + # max socket buffer size to the client direction, the more the faster, but more RAM hungry + # can be the tuple (low, users_margin, high) for the adaptive case. If no much users, use high + conf_dict.setdefault("TO_CLT_BUFSIZE", (16384, 100, 131072)) + + # max socket buffer size to the telegram servers direction, also can be the tuple + conf_dict.setdefault("TO_TG_BUFSIZE", 65536) + + # keepalive period for clients in secs + conf_dict.setdefault("CLIENT_KEEPALIVE", 10*60) + + # drop client after this timeout if the handshake fail + conf_dict.setdefault("CLIENT_HANDSHAKE_TIMEOUT", random.randrange(5, 15)) + + # if client doesn't confirm data for this number of seconds, it is dropped + conf_dict.setdefault("CLIENT_ACK_TIMEOUT", 5*60) + + # telegram servers connect timeout in seconds + conf_dict.setdefault("TG_CONNECT_TIMEOUT", 10) + + # listen address for IPv4 + conf_dict.setdefault("LISTEN_ADDR_IPV4", "0.0.0.0") + + # listen address for IPv6 + conf_dict.setdefault("LISTEN_ADDR_IPV6", "::") + + # listen unix socket + conf_dict.setdefault("LISTEN_UNIX_SOCK", "") + + # allow access to config by attributes + config = type("config", (dict,), conf_dict)(conf_dict) def try_use_cryptography_module(): from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend + class CryptographyEncryptorAdapter: + __slots__ = ('encryptor', 'decryptor') + + def __init__(self, cipher): + self.encryptor = cipher.encryptor() + self.decryptor = cipher.decryptor() + + def encrypt(self, data): + return self.encryptor.update(data) + + def decrypt(self, data): + return self.decryptor.update(data) + def create_aes_ctr(key, iv): - class EncryptorAdapter: - def __init__(self, cipher): - self.encryptor = cipher.encryptor() - self.decryptor = cipher.decryptor() - - def encrypt(self, data): - return self.encryptor.update(data) - - def decrypt(self, data): - return self.decryptor.update(data) - iv_bytes = int.to_bytes(iv, 16, "big") cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend()) - return EncryptorAdapter(cipher) + return CryptographyEncryptorAdapter(cipher) def create_aes_cbc(key, iv): - class EncryptorAdapter: - def __init__(self, cipher): - self.encryptor = cipher.encryptor() - self.decryptor = cipher.decryptor() - - def encrypt(self, data): - return self.encryptor.update(data) - - def decrypt(self, data): - return self.decryptor.update(data) - cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend()) - return EncryptorAdapter(cipher) + return CryptographyEncryptorAdapter(cipher) return create_aes_ctr, create_aes_cbc @@ -80,25 +266,27 @@ def use_slow_bundled_cryptography_module(): msg += "pip install cryptography\n" print(msg, flush=True, file=sys.stderr) + class BundledEncryptorAdapter: + __slots__ = ('mode', ) + + def __init__(self, mode): + self.mode = mode + + def encrypt(self, data): + encrypter = pyaes.Encrypter(self.mode, pyaes.PADDING_NONE) + return encrypter.feed(data) + encrypter.feed() + + def decrypt(self, data): + decrypter = pyaes.Decrypter(self.mode, pyaes.PADDING_NONE) + return decrypter.feed(data) + decrypter.feed() + def create_aes_ctr(key, iv): ctr = pyaes.Counter(iv) return pyaes.AESModeOfOperationCTR(key, ctr) def create_aes_cbc(key, iv): - class EncryptorAdapter: - def __init__(self, mode): - self.mode = mode - - def encrypt(self, data): - encrypter = pyaes.Encrypter(self.mode, pyaes.PADDING_NONE) - return encrypter.feed(data) + encrypter.feed() - - def decrypt(self, data): - decrypter = pyaes.Decrypter(self.mode, pyaes.PADDING_NONE) - return decrypter.feed(data) + decrypter.feed() - mode = pyaes.AESModeOfOperationCBC(key, iv) - return EncryptorAdapter(mode) + return BundledEncryptorAdapter(mode) return create_aes_ctr, create_aes_cbc @@ -110,115 +298,6 @@ except ImportError: except ImportError: create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module() -try: - import resource - soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE) - resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit)) -except (ValueError, OSError): - print("Failed to increase the limit of opened files", flush=True, file=sys.stderr) -except ImportError: - pass - -if hasattr(signal, 'SIGUSR1'): - def debug_signal(signum, frame): - import pdb - pdb.set_trace() - - signal.signal(signal.SIGUSR1, debug_signal) - -if len(sys.argv) < 3: - print("Usage: mtprotoproxy [ad_tag]") - print("Example: mtprotoproxy 3256 0123456789abcdef0123456789abcdef 3c09c680b76ee91a4c25ad51f742267d") - sys.exit(0) - -if len(sys.argv) < 2: - config = runpy.run_module("config") -elif len(sys.argv) == 2: - config = runpy.run_path(sys.argv[1]) -else: - # undocumented way of launching - config = {} - config["PORT"] = int(sys.argv[1]) - secrets = sys.argv[2].split(",") - config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))} - if len(sys.argv) > 3: - config["AD_TAG"] = sys.argv[3] - -PORT = config["PORT"] -USERS = config["USERS"] -AD_TAG = bytes.fromhex(config.get("AD_TAG", "")) - -# load advanced settings -PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) -# disables tg->client trafic reencryption, faster but less secure -FAST_MODE = config.get("FAST_MODE", True) -STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600) -PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60) -TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) -TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) -CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60) -CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) -CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60) - -TG_DATACENTER_PORT = 443 - -TG_DATACENTERS_V4 = [ - "149.154.175.50", "149.154.167.51", "149.154.175.100", - "149.154.167.91", "149.154.171.5" -] - -TG_DATACENTERS_V6 = [ - "2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a", - "2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a" -] - -# This list will be updated in the runtime -TG_MIDDLE_PROXIES_V4 = { - 1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)], - 2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)], - 3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)], - 4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)], - 5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)] -} - -TG_MIDDLE_PROXIES_V6 = { - 1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)], - 2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)], - 3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)], - 4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)], - 5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)] -} - - -USE_MIDDLE_PROXY = (len(AD_TAG) == 16) - -PROXY_SECRET = bytes.fromhex( - "c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" + - "d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" + - "6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" + - "54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212" -) - -SKIP_LEN = 8 -PREKEY_LEN = 32 -KEY_LEN = 32 -IV_LEN = 16 -HANDSHAKE_LEN = 64 -PROTO_TAG_POS = 56 -DC_IDX_POS = 60 - -PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef" -PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee" -PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd" - -CBC_PADDING = 16 -PADDING_FILLER = b"\x04\x00\x00\x00" - -MIN_MSG_LEN = 12 -MAX_MSG_LEN = 2 ** 24 - -my_ip_info = {"ipv4": None, "ipv6": None} - def print_err(*params): print(*params, file=sys.stderr, flush=True) @@ -226,7 +305,7 @@ def print_err(*params): def init_stats(): global stats - stats = {user: collections.Counter() for user in USERS} + stats = {user: collections.Counter() for user in config.USERS} def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0): @@ -239,7 +318,62 @@ def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0): octets=octets, msgs=msgs) +def get_curr_connects_count(): + global stats + + all_connects = 0 + for user, stat in stats.items(): + all_connects += stat["curr_connects"] + return all_connects + + +def get_to_tg_bufsize(): + if isinstance(config.TO_TG_BUFSIZE, int): + return config.TO_TG_BUFSIZE + + low, margin, high = config.TO_TG_BUFSIZE + return high if get_curr_connects_count() < margin else low + + +def get_to_clt_bufsize(): + if isinstance(config.TO_CLT_BUFSIZE, int): + return config.TO_CLT_BUFSIZE + + low, margin, high = config.TO_CLT_BUFSIZE + return high if get_curr_connects_count() < margin else low + + +class MyRandom(random.Random): + def __init__(self): + super().__init__() + key = bytes([random.randrange(256) for i in range(32)]) + iv = random.randrange(256**16) + + self.encryptor = create_aes_ctr(key, iv) + self.buffer = bytearray() + + def getrandbits(self, k): + numbytes = (k + 7) // 8 + return int.from_bytes(self.getrandbytes(numbytes), 'big') >> (numbytes * 8 - k) + + def getrandbytes(self, n): + CHUNK_SIZE = 512 + + while n > len(self.buffer): + data = int.to_bytes(super().getrandbits(CHUNK_SIZE*8), CHUNK_SIZE, "big") + self.buffer += self.encryptor.encrypt(data) + + result = self.buffer[:n] + self.buffer = self.buffer[n:] + return bytes(result) + + +myrandom = MyRandom() + + class LayeredStreamReaderBase: + __slots__ = ("upstream", ) + def __init__(self, upstream): self.upstream = upstream @@ -251,6 +385,8 @@ class LayeredStreamReaderBase: class LayeredStreamWriterBase: + __slots__ = ("upstream", ) + def __init__(self, upstream): self.upstream = upstream @@ -269,12 +405,75 @@ class LayeredStreamWriterBase: def abort(self): return self.upstream.transport.abort() + def get_extra_info(self, name): + return self.upstream.get_extra_info(name) + @property def transport(self): return self.upstream.transport +class FakeTLSStreamReader(LayeredStreamReaderBase): + __slots__ = ('buf', ) + + def __init__(self, upstream): + self.upstream = upstream + self.buf = bytearray() + + async def read(self, n, ignore_buf=False): + if self.buf and not ignore_buf: + data = self.buf + self.buf = bytearray() + return bytes(data) + + while True: + tls_rec_type = await self.upstream.readexactly(1) + if not tls_rec_type: + return b"" + + if tls_rec_type not in [b"\x14", b"\x17"]: + print_err("BUG: bad tls type %s in FakeTLSStreamReader" % tls_rec_type) + return b"" + + version = await self.upstream.readexactly(2) + if version != b"\x03\x03": + print_err("BUG: unknown version %s in FakeTLSStreamReader" % version) + return b"" + + data_len = int.from_bytes(await self.upstream.readexactly(2), "big") + data = await self.upstream.readexactly(data_len) + if tls_rec_type == b"\x14": + continue + return data + + async def readexactly(self, n): + while len(self.buf) < n: + tls_data = await self.read(1, ignore_buf=True) + if not tls_data: + return b"" + self.buf += tls_data + data, self.buf = self.buf[:n], self.buf[n:] + return bytes(data) + + +class FakeTLSStreamWriter(LayeredStreamWriterBase): + __slots__ = () + + def __init__(self, upstream): + self.upstream = upstream + + def write(self, data, extra={}): + MAX_CHUNK_SIZE = 16384 + 24 + for start in range(0, len(data), MAX_CHUNK_SIZE): + end = min(start+MAX_CHUNK_SIZE, len(data)) + self.upstream.write(b"\x17\x03\x03" + int.to_bytes(end-start, 2, "big")) + self.upstream.write(data[start: end]) + return len(data) + + class CryptoWrappedStreamReader(LayeredStreamReaderBase): + __slots__ = ('decryptor', 'block_size', 'buf') + def __init__(self, upstream, decryptor, block_size=1): self.upstream = upstream self.decryptor = decryptor @@ -311,6 +510,8 @@ class CryptoWrappedStreamReader(LayeredStreamReaderBase): class CryptoWrappedStreamWriter(LayeredStreamWriterBase): + __slots__ = ('encryptor', 'block_size') + def __init__(self, upstream, encryptor, block_size=1): self.upstream = upstream self.encryptor = encryptor @@ -326,6 +527,8 @@ class CryptoWrappedStreamWriter(LayeredStreamWriterBase): class MTProtoFrameStreamReader(LayeredStreamReaderBase): + __slots__ = ('seq_no', ) + def __init__(self, upstream, seq_no=0): self.upstream = upstream self.seq_no = seq_no @@ -362,6 +565,8 @@ class MTProtoFrameStreamReader(LayeredStreamReaderBase): class MTProtoFrameStreamWriter(LayeredStreamWriterBase): + __slots__ = ('seq_no', ) + def __init__(self, upstream, seq_no=0): self.upstream = upstream self.seq_no = seq_no @@ -381,6 +586,8 @@ class MTProtoFrameStreamWriter(LayeredStreamWriterBase): class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase): + __slots__ = () + async def read(self, buf_size): msg_len_bytes = await self.upstream.readexactly(1) msg_len = int.from_bytes(msg_len_bytes, "little") @@ -402,6 +609,8 @@ class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase): class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase): + __slots__ = () + def write(self, data, extra={}): SMALL_PKT_BORDER = 0x7f LARGE_PKT_BORGER = 256 ** 3 @@ -425,6 +634,34 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase): class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase): + __slots__ = () + + async def read(self, buf_size): + msg_len_bytes = await self.upstream.readexactly(4) + msg_len = int.from_bytes(msg_len_bytes, "little") + + extra = {} + if msg_len > 0x80000000: + extra["QUICKACK_FLAG"] = True + msg_len -= 0x80000000 + + data = await self.upstream.readexactly(msg_len) + return data, extra + + +class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase): + __slots__ = () + + def write(self, data, extra={}): + if extra.get("SIMPLE_ACK"): + return self.upstream.write(data) + else: + return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data) + + +class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase): + __slots__ = () + async def read(self, buf_size): msg_len_bytes = await self.upstream.readexactly(4) msg_len = int.from_bytes(msg_len_bytes, "little") @@ -443,15 +680,24 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase): return data, extra -class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase): +class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase): + __slots__ = () + def write(self, data, extra={}): + MAX_PADDING_LEN = 4 if extra.get("SIMPLE_ACK"): + # TODO: make this unpredictable return self.upstream.write(data) else: - return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data) + padding_len = myrandom.randrange(MAX_PADDING_LEN) + padding = myrandom.getrandbytes(padding_len) + padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little') + return self.upstream.write(padded_data_len_bytes + data + padding) class ProxyReqStreamReader(LayeredStreamReaderBase): + __slots__ = () + async def read(self, msg): RPC_PROXY_ANS = b"\x0d\xda\x03\x44" RPC_CLOSE_EXT = b"\xa2\x34\xb6\x5e" @@ -479,6 +725,8 @@ class ProxyReqStreamReader(LayeredStreamReaderBase): class ProxyReqStreamWriter(LayeredStreamWriterBase): + __slots__ = ('remote_ip_port', 'our_ip_port', 'out_conn_id', 'proto_tag') + def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port, proto_tag): self.upstream = upstream @@ -495,7 +743,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): else: self.our_ip_port = socket.inet_pton(socket.AF_INET6, my_ip) self.our_ip_port += int.to_bytes(my_port, 4, "little") - self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)]) + self.out_conn_id = myrandom.getrandbytes(8) self.proto_tag = proto_tag @@ -509,6 +757,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): FLAG_HAS_AD_TAG = 0x8 FLAG_MAGIC = 0x1000 FLAG_EXTMODE2 = 0x20000 + FLAG_PAD = 0x8000000 FLAG_INTERMEDIATE = 0x20000000 FLAG_ABRIDGED = 0x40000000 FLAG_QUICKACK = 0x80000000 @@ -523,6 +772,8 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): flags |= FLAG_ABRIDGED elif self.proto_tag == PROTO_TAG_INTERMEDIATE: flags |= FLAG_INTERMEDIATE + elif self.proto_tag == PROTO_TAG_SECURE: + flags |= FLAG_INTERMEDIATE | FLAG_PAD if extra.get("QUICKACK_FLAG"): flags |= FLAG_QUICKACK @@ -533,26 +784,321 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): full_msg = bytearray() full_msg += RPC_PROXY_REQ + int.to_bytes(flags, 4, "little") + self.out_conn_id full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG - full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER + full_msg += bytes([len(config.AD_TAG)]) + config.AD_TAG + FOUR_BYTES_ALIGNER full_msg += msg - self.first_flag_byte = b"\x08" return self.upstream.write(full_msg) +def try_setsockopt(sock, level, option, value): + try: + sock.setsockopt(level, option, value) + except OSError as E: + pass + + +def set_keepalive(sock, interval=40, attempts=5): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if hasattr(socket, "TCP_KEEPIDLE"): + try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval) + if hasattr(socket, "TCP_KEEPINTVL"): + try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval) + if hasattr(socket, "TCP_KEEPCNT"): + try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) + + +def set_ack_timeout(sock, timeout): + if hasattr(socket, "TCP_USER_TIMEOUT"): + try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000) + + +def set_bufsizes(sock, recv_buf, send_buf): + try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) + try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) + + +def set_instant_rst(sock): + INSTANT_RST = b"\x01\x00\x00\x00\x00\x00\x00\x00" + if hasattr(socket, "SO_LINGER"): + try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_LINGER, INSTANT_RST) + + +def gen_x25519_public_key(): + # generates some number which has square root by modulo P + P = 2**255 - 19 + n = myrandom.randrange(P) + return int.to_bytes((n*n) % P, length=32, byteorder="little") + + +async def handle_bad_client(reader_clt, writer_clt, handshake): + BUF_SIZE = 8192 + CONNECT_TIMEOUT = 5 + + global mask_host_cached_ip + + if writer_clt.transport.is_closing(): + return + + set_bufsizes(writer_clt.get_extra_info("socket"), BUF_SIZE, BUF_SIZE) + + if not config.MASK or handshake is None: + while await reader_clt.read(BUF_SIZE): + # just consume all the data + pass + return + + async def connect_reader_to_writer(reader, writer): + try: + while True: + data = await reader.read(BUF_SIZE) + + if not data: + if not writer.transport.is_closing(): + writer.write_eof() + await writer.drain() + return + + writer.write(data) + await writer.drain() + except OSError: + pass + + writer_srv = None + try: + host = mask_host_cached_ip or config.MASK_HOST + task = asyncio.open_connection(host, config.MASK_PORT, limit=BUF_SIZE) + reader_srv, writer_srv = await asyncio.wait_for(task, timeout=CONNECT_TIMEOUT) + if not mask_host_cached_ip: + mask_host_cached_ip = writer_srv.get_extra_info("peername")[0] + writer_srv.write(handshake) + await writer_srv.drain() + + srv_to_clt = connect_reader_to_writer(reader_srv, writer_clt) + clt_to_srv = connect_reader_to_writer(reader_clt, writer_srv) + task_srv_to_clt = asyncio.ensure_future(srv_to_clt) + task_clt_to_srv = asyncio.ensure_future(clt_to_srv) + + await asyncio.wait([task_srv_to_clt, task_clt_to_srv], return_when=asyncio.FIRST_COMPLETED) + + task_srv_to_clt.cancel() + task_clt_to_srv.cancel() + + if writer_clt.transport.is_closing(): + return + + # if the server closed the connection with RST or FIN-RST, copy them to the client + if not writer_srv.transport.is_closing(): + # workaround for uvloop, it doesn't fire exceptions on write_eof + sock = writer_srv.get_extra_info('socket') + raw_sock = socket.socket(sock.family, sock.type, sock.proto, sock.fileno()) + try: + raw_sock.shutdown(socket.SHUT_WR) + except OSError as E: + set_instant_rst(writer_clt.get_extra_info("socket")) + finally: + raw_sock.detach() + else: + set_instant_rst(writer_clt.get_extra_info("socket")) + except ConnectionRefusedError as E: + return + except (OSError, asyncio.TimeoutError) as E: + return + finally: + if writer_srv is not None: + writer_srv.transport.abort() + + +async def handle_fake_tls_handshake(handshake, reader, writer, peer): + global used_handshakes + global fake_cert_len + + TIME_SKEW_MIN = -20 * 60 + TIME_SKEW_MAX = 10 * 60 + + TLS_VERS = b"\x03\x03" + TLS_CIPHERSUITE = b"\x13\x01" + TLS_CHANGE_CIPHER = b"\x14" + TLS_VERS + b"\x00\x01\x01" + TLS_APP_HTTP2_HDR = b"\x17" + TLS_VERS + + DIGEST_LEN = 32 + DIGEST_POS = 11 + + SESSION_ID_LEN_POS = DIGEST_POS + DIGEST_LEN + SESSION_ID_POS = SESSION_ID_LEN_POS + 1 + + tls_extensions = b"\x00\x2e" + b"\x00\x33\x00\x24" + b"\x00\x1d\x00\x20" + tls_extensions += gen_x25519_public_key() + b"\x00\x2b\x00\x02\x03\x04" + + digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN] + + if digest in used_handshakes: + print_err("Active TLS fingerprinting detected from %s, handling it" % peer[0]) + return False + + sess_id_len = handshake[SESSION_ID_LEN_POS] + sess_id = handshake[SESSION_ID_POS: SESSION_ID_POS + sess_id_len] + + for user in config.USERS: + secret = bytes.fromhex(config.USERS[user]) + + msg = handshake[:DIGEST_POS] + b"\x00"*DIGEST_LEN + handshake[DIGEST_POS+DIGEST_LEN:] + computed_digest = hmac.new(secret, msg, digestmod=hashlib.sha256).digest() + + xored_digest = bytes(digest[i] ^ computed_digest[i] for i in range(DIGEST_LEN)) + digest_good = xored_digest.startswith(b"\x00" * (DIGEST_LEN-4)) + + if not digest_good: + continue + + timestamp = int.from_bytes(xored_digest[-4:], "little") + client_time_is_ok = TIME_SKEW_MIN < time.time() - timestamp < TIME_SKEW_MAX + # some clients fail to read unix time and send the time since boot instead + client_time_is_small = timestamp < 60*60*24*1000 + if not client_time_is_ok and not is_time_skewed and not client_time_is_small: + print_err("Client with time skew detected from %s, can be a replay-attack" % peer[0]) + print_err("The clocks were %d minutes behind" % ((time.time() - timestamp) // 60)) + continue + + http_data = myrandom.getrandbytes(fake_cert_len) + + srv_hello = TLS_VERS + b"\x00"*DIGEST_LEN + bytes([sess_id_len]) + sess_id + srv_hello += TLS_CIPHERSUITE + b"\x00" + tls_extensions + + hello_pkt = b"\x16" + TLS_VERS + int.to_bytes(len(srv_hello) + 4, 2, "big") + hello_pkt += b"\x02" + int.to_bytes(len(srv_hello), 3, "big") + srv_hello + hello_pkt += TLS_CHANGE_CIPHER + TLS_APP_HTTP2_HDR + hello_pkt += int.to_bytes(len(http_data), 2, "big") + http_data + + computed_digest = hmac.new(secret, msg=digest+hello_pkt, digestmod=hashlib.sha256).digest() + hello_pkt = hello_pkt[:DIGEST_POS] + computed_digest + hello_pkt[DIGEST_POS+DIGEST_LEN:] + + writer.write(hello_pkt) + await writer.drain() + + if config.REPLAY_CHECK_LEN > 0: + while len(used_handshakes) >= config.REPLAY_CHECK_LEN: + used_handshakes.popitem(last=False) + used_handshakes[digest] = True + + reader = FakeTLSStreamReader(reader) + writer = FakeTLSStreamWriter(writer) + return reader, writer + + return False + + +async def handle_proxy_protocol(reader, peer=None): + PROXY_SIGNATURE = b"PROXY " + PROXY_MIN_LEN = 6 + PROXY_TCP4 = b"TCP4" + PROXY_TCP6 = b"TCP6" + PROXY_UNKNOWN = b"UNKNOWN" + + PROXY2_SIGNATURE = b"\x0d\x0a\x0d\x0a\x00\x0d\x0a\x51\x55\x49\x54\x0a" + PROXY2_MIN_LEN = 16 + PROXY2_AF_UNSPEC = 0x0 + PROXY2_AF_INET = 0x1 + PROXY2_AF_INET6 = 0x2 + + header = await reader.readexactly(PROXY_MIN_LEN) + if header.startswith(PROXY_SIGNATURE): + # proxy header v1 + header += await reader.readuntil(b"\r\n") + _, proxy_fam, *proxy_addr = header[:-2].split(b" ") + if proxy_fam in (PROXY_TCP4, PROXY_TCP6): + if len(proxy_addr) == 4: + src_addr = proxy_addr[0].decode('ascii') + src_port = proxy_addr[2].decode('ascii') + return (src_addr, src_port) + elif proxy_fam == PROXY_UNKNOWN: + return peer + return False + + header += await reader.readexactly(PROXY2_MIN_LEN - PROXY_MIN_LEN) + if header.startswith(PROXY2_SIGNATURE): + # proxy header v2 + proxy_ver = header[12] + if proxy_ver & 0xf0 != 0x20: + return False + proxy_len = int.from_bytes(header[14:16], "big") + proxy_addr = await reader.readexactly(proxy_len) + if proxy_ver == 0x21: + proxy_fam = header[13] >> 4 + if proxy_fam == PROXY2_AF_INET: + if proxy_len >= (4 + 2)*2: + src_addr = socket.inet_ntop(socket.AF_INET, proxy_addr[:4]) + src_port = int.from_bytes(proxy_addr[8:10], "big") + return (src_addr, src_port) + elif proxy_fam == PROXY2_AF_INET6: + if proxy_len >= (16 + 2)*2: + src_addr = socket.inet_ntop(socket.AF_INET6, proxy_addr[:16]) + src_port = int.from_bytes(proxy_addr[32:34], "big") + return (src_addr, src_port) + elif proxy_fam == PROXY2_AF_UNSPEC: + return peer + elif proxy_ver == 0x20: + return peer + + return False + + async def handle_handshake(reader, writer): - handshake = await reader.readexactly(HANDSHAKE_LEN) + global used_handshakes - for user in USERS: - secret = bytes.fromhex(USERS[user]) + TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03" + + if writer.transport.is_closing() or writer.get_extra_info("peername") is None: + return False + + peer = writer.get_extra_info("peername")[:2] + + if config.PROXY_PROTOCOL: + ip = peer[0] if peer else "unknown address" + peer = await handle_proxy_protocol(reader, peer) + if not peer: + print_err("Client from %s sent bad proxy protocol headers" % ip) + await handle_bad_client(reader, writer, None) + return False + + is_tls_handshake = True + handshake = b"" + for expected_byte in TLS_START_BYTES: + handshake += await reader.readexactly(1) + if handshake[-1] != expected_byte: + is_tls_handshake = False + break + + if is_tls_handshake: + handshake += await reader.readexactly(TLS_HANDSHAKE_LEN - len(handshake)) + tls_handshake_result = await handle_fake_tls_handshake(handshake, reader, writer, peer) + + if not tls_handshake_result: + await handle_bad_client(reader, writer, handshake) + return False + reader, writer = tls_handshake_result + handshake = await reader.readexactly(HANDSHAKE_LEN) + else: + if config.TLS_ONLY: + await handle_bad_client(reader, writer, handshake) + return False + handshake += await reader.readexactly(HANDSHAKE_LEN - len(handshake)) + + dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN] + dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:] + enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1] + enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:] + + if dec_prekey_and_iv in used_handshakes: + print_err("Active fingerprinting detected from %s, handling it" % peer[0]) + await handle_bad_client(reader, writer, handshake) + return False + + for user in config.USERS: + secret = bytes.fromhex(config.USERS[user]) - dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN] - dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:] dec_key = hashlib.sha256(dec_prekey + secret).digest() decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big")) - enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1] - enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:] enc_key = hashlib.sha256(enc_prekey + secret).digest() encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big")) @@ -562,49 +1108,51 @@ async def handle_handshake(reader, writer): if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE): continue + if config.SECURE_ONLY and proto_tag != PROTO_TAG_SECURE: + continue + dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True) + if config.REPLAY_CHECK_LEN > 0: + while len(used_handshakes) >= config.REPLAY_CHECK_LEN: + used_handshakes.popitem(last=False) + used_handshakes[dec_prekey_and_iv] = True + reader = CryptoWrappedStreamReader(reader, decryptor) writer = CryptoWrappedStreamWriter(writer, encryptor) - return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv - - EMPTY_READ_BUF_SIZE = 4096 - while await reader.read(EMPTY_READ_BUF_SIZE): - # just consume all the data - pass + return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv, peer + await handle_bad_client(reader, writer, handshake) return False -def set_keepalive(sock, interval=40, attempts=5): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - if hasattr(socket, "TCP_KEEPIDLE"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval) - if hasattr(socket, "TCP_KEEPINTVL"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval) - if hasattr(socket, "TCP_KEEPCNT"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) +async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3): + for attempt in range(max_attempts-1): + try: + task = asyncio.open_connection(addr, port, limit=limit) + reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout) + return reader_tgt, writer_tgt + except (OSError, asyncio.TimeoutError): + continue - -def set_ack_timeout(sock, timeout): - if hasattr(socket, "TCP_USER_TIMEOUT"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000) - - -def set_bufsizes(sock, recv_buf, send_buf): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) + # the last attempt + task = asyncio.open_connection(addr, port, limit=limit) + reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout) + return reader_tgt, writer_tgt async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): RESERVED_NONCE_FIRST_CHARS = [b"\xef"] RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54", - b"\x47\x45\x54\x20", b"\xee\xee\xee\xee"] + b"\x47\x45\x54\x20", b"\xee\xee\xee\xee", + b"\xdd\xdd\xdd\xdd", b"\x16\x03\x01\x02"] RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"] + global my_ip_info + dc_idx = abs(dc_idx) - 1 - if PREFER_IPV6: + if my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]): if not 0 <= dc_idx < len(TG_DATACENTERS_V6): return False dc = TG_DATACENTERS_V6[dc_idx] @@ -614,20 +1162,20 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): dc = TG_DATACENTERS_V4[dc_idx] try: - reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, - limit=TO_CLT_BUFSIZE) - set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) - + reader_tgt, writer_tgt = await open_connection_tryer( + dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=config.TG_CONNECT_TIMEOUT) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) return False - except OSError as E: + except (OSError, asyncio.TimeoutError) as E: print_err("Unable to connect to", dc, TG_DATACENTER_PORT) return False + set_keepalive(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize()) + while True: - rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)]) + rnd = bytearray(myrandom.getrandbytes(HANDSHAKE_LEN)) if rnd[:1] in RESERVED_NONCE_FIRST_CHARS: continue if rnd[:4] in RESERVED_NONCE_BEGININGS: @@ -702,35 +1250,39 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): # pass as consts to simplify code RPC_FLAGS = b"\x00\x00\x00\x00" - use_ipv6_tg = PREFER_IPV6 + global my_ip_info + + use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"])) use_ipv6_clt = (":" in cl_ip) if use_ipv6_tg: if dc_idx not in TG_MIDDLE_PROXIES_V6: return False - addr, port = random.choice(TG_MIDDLE_PROXIES_V6[dc_idx]) + addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx]) else: if dc_idx not in TG_MIDDLE_PROXIES_V4: return False - addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) + addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) try: - reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE) - set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) + reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(), + timeout=config.TG_CONNECT_TIMEOUT) except ConnectionRefusedError as E: - print_err("Got connection refused while trying to connect to", addr, port) + print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port)) return False - except OSError as E: - print_err("Unable to connect to", addr, port) + except (OSError, asyncio.TimeoutError) as E: + print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port)) return False + set_keepalive(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize()) + writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO) key_selector = PROXY_SECRET[:4] crypto_ts = int.to_bytes(int(time.time()) % (256**4), 4, "little") - nonce = bytes([random.randrange(0, 256) for i in range(NONCE_LEN)]) + nonce = myrandom.getrandbytes(NONCE_LEN) msg = RPC_NONCE + key_selector + CRYPTO_AES + crypto_ts + nonce @@ -739,7 +1291,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): old_reader = reader_tgt reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) - ans = await reader_tgt.read(TO_CLT_BUFSIZE) + ans = await reader_tgt.read(get_to_clt_bufsize()) if len(ans) != RPC_NONCE_ANS_LEN: return False @@ -755,7 +1307,6 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): tg_ip, tg_port = writer_tgt.upstream.get_extra_info('peername')[:2] my_ip, my_port = writer_tgt.upstream.get_extra_info('sockname')[:2] - global my_ip_info if not use_ipv6_tg: if my_ip_info["ipv4"]: # prefer global ip settings to work behind NAT @@ -820,30 +1371,32 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def handle_client(reader_clt, writer_clt): - set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3) - set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT) - set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) + set_keepalive(writer_clt.get_extra_info("socket"), config.CLIENT_KEEPALIVE, attempts=3) + set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT) + set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize()) try: clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt), - timeout=CLIENT_HANDSHAKE_TIMEOUT) + timeout=config.CLIENT_HANDSHAKE_TIMEOUT) except asyncio.TimeoutError: return if not clt_data: return - reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data + reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv, peer = clt_data + cl_ip, cl_port = peer update_stats(user, connects=1) - if not USE_MIDDLE_PROXY: - if FAST_MODE: + connect_directly = (not config.USE_MIDDLE_PROXY or disable_middle_proxy) + + if connect_directly: + if config.FAST_MODE: tg_data = await do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=enc_key_and_iv) else: tg_data = await do_direct_handshake(proto_tag, dc_idx) else: - cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2] tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port) if not tg_data: @@ -851,7 +1404,7 @@ async def handle_client(reader_clt, writer_clt): reader_tg, writer_tg = tg_data - if not USE_MIDDLE_PROXY and FAST_MODE: + if connect_directly and config.FAST_MODE: class FakeEncryptor: def encrypt(self, data): return data @@ -863,17 +1416,21 @@ async def handle_client(reader_clt, writer_clt): reader_tg.decryptor = FakeDecryptor() writer_clt.encryptor = FakeEncryptor() - if USE_MIDDLE_PROXY: + if not connect_directly: if proto_tag == PROTO_TAG_ABRIDGED: reader_clt = MTProtoCompactFrameStreamReader(reader_clt) writer_clt = MTProtoCompactFrameStreamWriter(writer_clt) - elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE): + elif proto_tag == PROTO_TAG_INTERMEDIATE: reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt) writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt) + elif proto_tag == PROTO_TAG_SECURE: + reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt) + writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt) else: return - async def connect_reader_to_writer(rd, wr, user, rd_buf_size): + async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_if_first_pkt_bad=False): + is_first_pkt = True try: while True: data = await rd.read(rd_buf_size) @@ -882,6 +1439,18 @@ async def handle_client(reader_clt, writer_clt): else: extra = {} + # protection against replay-based fingerprinting + if is_first_pkt: + is_first_pkt = False + + ERR_PKT_DATA = b'l\xfe\xff\xff' + if block_if_first_pkt_bad and data == ERR_PKT_DATA: + print_err("Active fingerprinting detected from %s, dropping it" % cl_ip) + + wr.write_eof() + await wr.drain() + return + if not data: wr.write_eof() await wr.drain() @@ -894,13 +1463,32 @@ async def handle_client(reader_clt, writer_clt): # print_err(e) pass - tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE) - clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE) + tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(), + block_if_first_pkt_bad=config.BLOCK_IF_FIRST_PKT_BAD) + clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize()) task_tg_to_clt = asyncio.ensure_future(tg_to_clt) task_clt_to_tg = asyncio.ensure_future(clt_to_tg) update_stats(user, curr_connects=1) - await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) + + tcp_limit_hit = ( + user in config.USER_MAX_TCP_CONNS and + stats[user]["curr_connects"] > config.USER_MAX_TCP_CONNS[user] + ) + + user_expired = ( + user in config.USER_EXPIRATIONS and + datetime.datetime.now() > config.USER_EXPIRATIONS[user] + ) + + user_data_quota_hit = ( + user in config.USER_DATA_QUOTA and + stats[user]["octets"] > config.USER_DATA_QUOTA[user] + ) + + if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit): + await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) + update_stats(user, curr_connects=-1) task_tg_to_clt.cancel() @@ -914,6 +1502,8 @@ async def handle_client_wrapper(reader, writer): await handle_client(reader, writer) except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError): pass + except Exception: + traceback.print_exc() finally: writer.transport.abort() @@ -921,7 +1511,7 @@ async def handle_client_wrapper(reader, writer): async def stats_printer(): global stats while True: - await asyncio.sleep(STATS_PRINT_PERIOD) + await asyncio.sleep(config.STATS_PRINT_PERIOD) print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S")) for user, stat in stats.items(): @@ -931,34 +1521,164 @@ async def stats_printer(): print(flush=True) -async def update_middle_proxy_info(): - async def make_https_req(url): - # returns resp body - SSL_PORT = 443 - url_data = urllib.parse.urlparse(url) +async def make_https_req(url, host="core.telegram.org"): + """ Make request, return resp body and headers. """ + SSL_PORT = 443 + url_data = urllib.parse.urlparse(url) - HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org", - "Connection: close"]) + "\r\n\r\n" + HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: %s", + "Connection: close"]) + "\r\n\r\n" + reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True) + req = HTTP_REQ_TEMPLATE % (urllib.parse.quote(url_data.path), host) + writer.write(req.encode("utf8")) + data = await reader.read() + writer.close() + + headers, body = data.split(b"\r\n\r\n", 1) + return headers, body + + +def gen_tls_client_hello_msg(server_name): + msg = bytearray() + msg += b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03" + myrandom.getrandbytes(32) + msg += b"\x20" + myrandom.getrandbytes(32) + msg += b"\x00\x22\x4a\x4a\x13\x01\x13\x02\x13\x03\xc0\x2b\xc0\x2f\xc0\x2c\xc0\x30\xcc\xa9" + msg += b"\xcc\xa8\xc0\x13\xc0\x14\x00\x9c\x00\x9d\x00\x2f\x00\x35\x00\x0a\x01\x00\x01\x91" + msg += b"\xda\xda\x00\x00\x00\x00" + msg += int.to_bytes(len(server_name) + 5, 2, "big") + msg += int.to_bytes(len(server_name) + 3, 2, "big") + b"\x00" + msg += int.to_bytes(len(server_name), 2, "big") + server_name.encode("ascii") + msg += b"\x00\x17\x00\x00\xff\x01\x00\x01\x00\x00\x0a\x00\x0a\x00\x08\xaa\xaa\x00\x1d\x00" + msg += b"\x17\x00\x18\x00\x0b\x00\x02\x01\x00\x00\x23\x00\x00\x00\x10\x00\x0e\x00\x0c\x02" + msg += b"\x68\x32\x08\x68\x74\x74\x70\x2f\x31\x2e\x31\x00\x05\x00\x05\x01\x00\x00\x00\x00" + msg += b"\x00\x0d\x00\x14\x00\x12\x04\x03\x08\x04\x04\x01\x05\x03\x08\x05\x05\x01\x08\x06" + msg += b"\x06\x01\x02\x01\x00\x12\x00\x00\x00\x33\x00\x2b\x00\x29\xaa\xaa\x00\x01\x00\x00" + msg += b"\x1d\x00\x20" + gen_x25519_public_key() + msg += b"\x00\x2d\x00\x02\x01\x01\x00\x2b\x00\x0b\x0a\xba\xba\x03\x04\x03\x03\x03\x02\x03" + msg += b"\x01\x00\x1b\x00\x03\x02\x00\x02\x3a\x3a\x00\x01\x00\x00\x15" + msg += int.to_bytes(517 - len(msg) - 2, 2, "big") + msg += b"\x00" * (517 - len(msg)) + return bytes(msg) + + +async def get_encrypted_cert(host, port, server_name): + async def get_tls_record(reader): try: - reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True) - req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path) - writer.write(req.encode("utf8")) - data = await reader.read() - writer.close() + record_type = (await reader.readexactly(1))[0] + tls_version = await reader.readexactly(2) + if tls_version != b"\x03\x03": + return 0, b"" + record_len = int.from_bytes(await reader.readexactly(2), "big") + record = await reader.readexactly(record_len) - headers, body = data.split(b"\r\n\r\n", 1) - return body - except Exception: - return b"" + return record_type, record + except asyncio.IncompleteReadError: + return 0, b"" + reader, writer = await asyncio.open_connection(host, port) + writer.write(gen_tls_client_hello_msg(server_name)) + await writer.drain() + + record1_type, record1 = await get_tls_record(reader) + if record1_type != 22: + return b"" + + record2_type, record2 = await get_tls_record(reader) + if record2_type != 20: + return b"" + + record3_type, record3 = await get_tls_record(reader) + if record3_type != 23: + return b"" + + return record3 + + +async def get_mask_host_cert_len(): + global fake_cert_len + + GET_CERT_TIMEOUT = 10 + MASK_ENABLING_CHECK_PERIOD = 60 + + while True: + try: + if not config.MASK: + # do nothing + await asyncio.sleep(MASK_ENABLING_CHECK_PERIOD) + continue + + task = get_encrypted_cert(config.MASK_HOST, config.MASK_PORT, config.TLS_DOMAIN) + cert = await asyncio.wait_for(task, timeout=GET_CERT_TIMEOUT) + if cert: + if len(cert) != fake_cert_len: + fake_cert_len = len(cert) + else: + print_err("The MASK_HOST %s is not TLS 1.3 host, this is not recommended" % + config.MASK_HOST) + except ConnectionRefusedError: + print_err("The MASK_HOST %s is refusing connections, this is not recommended" % + config.MASK_HOST) + except (TimeoutError, asyncio.TimeoutError): + print_err("Got timeout while getting TLS handshake from MASK_HOST %s" % + config.MASK_HOST) + except Exception as E: + print_err("Failed to connect to MASK_HOST %s: %s" % ( + config.MASK_HOST, E)) + + await asyncio.sleep(config.GET_CERT_LEN_PERIOD) + + +async def get_srv_time(): + TIME_SYNC_ADDR = "https://core.telegram.org/getProxySecret" + MAX_TIME_SKEW = 30 + + global disable_middle_proxy + global is_time_skewed + + want_to_reenable_advertising = False + while True: + try: + headers, secret = await make_https_req(TIME_SYNC_ADDR) + + for line in headers.split(b"\r\n"): + if not line.startswith(b"Date: "): + continue + line = line[len("Date: "):].decode() + srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z") + now_time = datetime.datetime.utcnow() + is_time_skewed = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW + if is_time_skewed and config.USE_MIDDLE_PROXY and not disable_middle_proxy: + print_err("Time skew detected, please set the clock") + print_err("Server time:", srv_time, "your time:", now_time) + print_err("Disabling advertising to continue serving") + print_err("Putting down the shields against replay attacks") + + disable_middle_proxy = True + want_to_reenable_advertising = True + elif not is_time_skewed and want_to_reenable_advertising: + print_err("Time is ok, reenabling advertising") + disable_middle_proxy = False + want_to_reenable_advertising = False + except Exception as E: + print_err("Error getting server time", E) + + await asyncio.sleep(config.GET_TIME_PERIOD) + + +async def clear_ip_resolving_cache(): + global mask_host_cached_ip + min_sleep = myrandom.randrange(60 - 10, 60 + 10) + max_sleep = myrandom.randrange(120 - 10, 120 + 10) + while True: + mask_host_cached_ip = None + await asyncio.sleep(myrandom.randrange(min_sleep, max_sleep)) + + +async def update_middle_proxy_info(): async def get_new_proxies(url): PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;") - ans = {} - try: - body = await make_https_req(url) - except Exception: - return ans + headers, body = await make_https_req(url) fields = PROXY_REGEXP.findall(body.decode("utf8")) if fields: @@ -986,78 +1706,156 @@ async def update_middle_proxy_info(): if not v4_proxies: raise Exception("no proxy data") TG_MIDDLE_PROXIES_V4 = v4_proxies - except Exception: - print_err("Error updating middle proxy list") + except Exception as E: + print_err("Error updating middle proxy list:", E) try: v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6) if not v6_proxies: raise Exception("no proxy data (ipv6)") TG_MIDDLE_PROXIES_V6 = v6_proxies - except Exception: - print_err("Error updating middle proxy list for IPv6") + except Exception as E: + print_err("Error updating middle proxy list for IPv6:", E) try: - secret = await make_https_req(PROXY_SECRET_ADDR) + headers, secret = await make_https_req(PROXY_SECRET_ADDR) if not secret: raise Exception("no secret") if secret != PROXY_SECRET: PROXY_SECRET = secret print_err("Middle proxy secret updated") - except Exception: - print_err("Error updating middle proxy secret, using old") + except Exception as E: + print_err("Error updating middle proxy secret, using old", E) - await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD) + await asyncio.sleep(config.PROXY_INFO_UPDATE_PERIOD) def init_ip_info(): - global USE_MIDDLE_PROXY - global PREFER_IPV6 global my_ip_info - TIMEOUT = 5 + global disable_middle_proxy - try: - with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f: - if f.status != 200: - raise Exception("Invalid status code") - my_ip_info["ipv4"] = f.read().decode().strip() - except Exception: - pass - - if PREFER_IPV6: + def get_ip_from_url(url): + TIMEOUT = 5 try: - with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f: + with urllib.request.urlopen(url, timeout=TIMEOUT) as f: if f.status != 200: raise Exception("Invalid status code") - my_ip_info["ipv6"] = f.read().decode().strip() + return f.read().decode().strip() except Exception: - PREFER_IPV6 = False - else: - print_err("IPv6 found, using it for external communication") + return None - if USE_MIDDLE_PROXY: - if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or - (PREFER_IPV6 and not my_ip_info["ipv6"])): + IPV4_URL1 = "http://v4.ident.me/" + IPV4_URL2 = "http://ipv4.icanhazip.com/" + + IPV6_URL1 = "http://v6.ident.me/" + IPV6_URL2 = "http://ipv6.icanhazip.com/" + + my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2) + my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2) + + if my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]): + print_err("IPv6 found, using it for external communication") + + if config.USE_MIDDLE_PROXY: + if not my_ip_info["ipv4"] and not my_ip_info["ipv6"]: print_err("Failed to determine your ip, advertising disabled") - USE_MIDDLE_PROXY = False + disable_middle_proxy = True def print_tg_info(): global my_ip_info + print_default_warning = False + + if config.PORT == 3256: + print("The default port 3256 is used, this is not recommended", flush=True) + if config.TLS_ONLY: + print("Since you have TLS only mode enabled the best port is 443", flush=True) + print_default_warning = True + ip_addrs = [ip for ip in my_ip_info.values() if ip] if not ip_addrs: ip_addrs = ["YOUR_IP"] - for user, secret in sorted(USERS.items(), key=lambda x: x[0]): + for user, secret in sorted(config.USERS.items(), key=lambda x: x[0]): for ip in ip_addrs: - params = {"server": ip, "port": PORT, "secret": secret} - params_encodeded = urllib.parse.urlencode(params, safe=':') - print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True) + if not config.TLS_ONLY: + if not config.SECURE_ONLY: + params = {"server": ip, "port": config.PORT, "secret": secret} + params_encodeded = urllib.parse.urlencode(params, safe=':') + print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True) - params = {"server": ip, "port": PORT, "secret": "dd" + secret} + params = {"server": ip, "port": config.PORT, "secret": "dd" + secret} + params_encodeded = urllib.parse.urlencode(params, safe=':') + print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True) + + tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex() + # the base64 links is buggy on ios + # tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode() + # tls_secret_base64 = base64.urlsafe_b64encode(tls_secret) + params = {"server": ip, "port": config.PORT, "secret": tls_secret} params_encodeded = urllib.parse.urlencode(params, safe=':') - print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True) + print("{}: tg://proxy?{} (new)".format(user, params_encodeded), flush=True) + + if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef"]: + msg = "The default secret {} is used, this is not recommended".format(secret) + print(msg, flush=True) + random_secret = "".join(myrandom.choice("0123456789abcdef") for i in range(32)) + print("You can change it to this random secret:", random_secret, flush=True) + print_default_warning = True + + if config.TLS_DOMAIN == "www.google.com": + print("The default TLS_DOMAIN www.google.com is used, this is not recommended", flush=True) + msg = "You should use random existing domain instead, bad clients are proxied there" + print(msg, flush=True) + print_default_warning = True + + if print_default_warning: + print_err("Warning: one or more default settings detected") + + +def setup_files_limit(): + try: + import resource + soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit)) + except (ValueError, OSError): + print("Failed to increase the limit of opened files", flush=True, file=sys.stderr) + except ImportError: + pass + + +def setup_signals(): + if hasattr(signal, 'SIGUSR1'): + def debug_signal(signum, frame): + import pdb + pdb.set_trace() + + signal.signal(signal.SIGUSR1, debug_signal) + + if hasattr(signal, 'SIGUSR2'): + def reload_signal(signum, frame): + init_config() + print("Config reloaded", flush=True, file=sys.stderr) + print_tg_info() + + signal.signal(signal.SIGUSR2, reload_signal) + + +def try_setup_uvloop(): + try: + import uvloop + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + except ImportError: + pass + + +def remove_unix_socket(path): + try: + if stat.S_ISSOCK(os.stat(path).st_mode): + os.unlink(path) + except (FileNotFoundError, NotADirectoryError): + pass def loop_exception_handler(loop, context): @@ -1089,6 +1887,10 @@ def loop_exception_handler(loop, context): def main(): + setup_files_limit() + setup_signals() + try_setup_uvloop() + init_stats() if sys.platform == "win32": @@ -1101,39 +1903,61 @@ def main(): stats_printer_task = asyncio.Task(stats_printer()) asyncio.ensure_future(stats_printer_task) - if USE_MIDDLE_PROXY: + if config.USE_MIDDLE_PROXY: middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) asyncio.ensure_future(middle_proxy_updater_task) + if config.GET_TIME_PERIOD: + time_get_task = asyncio.Task(get_srv_time()) + asyncio.ensure_future(time_get_task) + + get_cert_len_task = asyncio.Task(get_mask_host_cert_len()) + asyncio.ensure_future(get_cert_len_task) + + clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache()) + asyncio.ensure_future(clear_resolving_cache_task) + reuse_port = hasattr(socket, "SO_REUSEPORT") + has_unix = hasattr(socket, "AF_UNIX") + servers = [] - task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT, - limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) - server_v4 = loop.run_until_complete(task_v4) + if config.LISTEN_ADDR_IPV4: + task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT, + limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop) + servers.append(loop.run_until_complete(task)) - if socket.has_ipv6: - task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT, - limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) - server_v6 = loop.run_until_complete(task_v6) + if config.LISTEN_ADDR_IPV6 and socket.has_ipv6: + task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV6, config.PORT, + limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop) + servers.append(loop.run_until_complete(task)) + + if config.LISTEN_UNIX_SOCK and has_unix: + remove_unix_socket(config.LISTEN_UNIX_SOCK) + task = asyncio.start_unix_server(handle_client_wrapper, config.LISTEN_UNIX_SOCK, + limit=get_to_tg_bufsize(), loop=loop) + servers.append(loop.run_until_complete(task)) + os.chmod(config.LISTEN_UNIX_SOCK, 0o666) try: loop.run_forever() except KeyboardInterrupt: pass - stats_printer_task.cancel() + for task in asyncio.Task.all_tasks(): + task.cancel() - server_v4.close() - loop.run_until_complete(server_v4.wait_closed()) + for server in servers: + server.close() + loop.run_until_complete(server.wait_closed()) - if socket.has_ipv6: - server_v6.close() - loop.run_until_complete(server_v6.wait_closed()) + if config.LISTEN_UNIX_SOCK and has_unix: + remove_unix_socket(config.LISTEN_UNIX_SOCK) loop.close() if __name__ == "__main__": + init_config() init_ip_info() print_tg_info() main()