From d2ff0f61e4b99cc1c9b38c87944782f77cd1a15c Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 03:24:45 +0500 Subject: [PATCH 01/29] add handshake timeout, refactor client handling a bit --- mtprotoproxy.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index b3a229b..3d16f8f 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -72,6 +72,7 @@ else: PORT = config["PORT"] USERS = config["USERS"] +AD_TAG = bytes.fromhex(config.get("AD_TAG", "")) # load advanced settings PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) @@ -82,7 +83,7 @@ PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24) READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384) WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) -AD_TAG = bytes.fromhex(config.get("AD_TAG", "")) +CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) TG_DATACENTER_PORT = 443 @@ -728,9 +729,13 @@ async def handle_client(reader_clt, writer_clt): set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) set_bufsizes(writer_clt.get_extra_info("socket")) - clt_data = await handle_handshake(reader_clt, writer_clt) + try: + clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt), + timeout=CLIENT_HANDSHAKE_TIMEOUT) + except asyncio.TimeoutError: + return + if not clt_data: - writer_clt.transport.abort() return reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data @@ -747,7 +752,6 @@ async def handle_client(reader_clt, writer_clt): tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port) if not tg_data: - writer_clt.transport.abort() return reader_tg, writer_tg = tg_data @@ -800,14 +804,19 @@ async def handle_client(reader_clt, writer_clt): wr.transport.abort() update_stats(user, curr_connects_x2=-1) - asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) - asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) + task_tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user) + task_clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user) + + await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) + writer_tg.transport.abort() async def handle_client_wrapper(reader, writer): try: await handle_client(reader, writer) except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError): + pass + finally: writer.transport.abort() From 9077ceb471b7ff94646db97a1f640d771ddc4221 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 03:38:11 +0500 Subject: [PATCH 02/29] simplify current connects counting --- mtprotoproxy.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 3d16f8f..c0528ee 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -153,13 +153,13 @@ def init_stats(): stats = {user: collections.Counter() for user in USERS} -def update_stats(user, connects=0, curr_connects_x2=0, octets=0): +def update_stats(user, connects=0, curr_connects=0, octets=0): global stats if user not in stats: stats[user] = collections.Counter() - stats[user].update(connects=connects, curr_connects_x2=curr_connects_x2, + stats[user].update(connects=connects, curr_connects=curr_connects, octets=octets) @@ -779,7 +779,6 @@ async def handle_client(reader_clt, writer_clt): return async def connect_reader_to_writer(rd, wr, user): - update_stats(user, curr_connects_x2=1) try: while True: data = await rd.read(READ_BUF_SIZE) @@ -802,12 +801,13 @@ async def handle_client(reader_clt, writer_clt): pass finally: wr.transport.abort() - update_stats(user, curr_connects_x2=-1) task_tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user) task_clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user) + update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) + update_stats(user, curr_connects=-1) writer_tg.transport.abort() @@ -828,7 +828,7 @@ async def stats_printer(): print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S")) for user, stat in stats.items(): print("%s: %d connects (%d current), %.2f MB" % ( - user, stat["connects"], stat["curr_connects_x2"] // 2, + user, stat["connects"], stat["curr_connects"], stat["octets"] / 1000000)) print(flush=True) From bd3d9731d730ba411d9d567ca220b76832ad8778 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 11:48:58 +0500 Subject: [PATCH 03/29] if the handshake failed, just consume all the data --- mtprotoproxy.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index c0528ee..ba9cef8 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -487,6 +487,11 @@ async def handle_handshake(reader, writer): reader = CryptoWrappedStreamReader(reader, decryptor) writer = CryptoWrappedStreamWriter(writer, encryptor) return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv + + while True: + # just consume all the data + await reader.read(READ_BUF_SIZE) + return False From accba06b45fe365053eea3d3dbb2122d578d3001 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 20:17:52 +0500 Subject: [PATCH 04/29] count client stats only for successfull clients --- mtprotoproxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ba9cef8..e4e6f40 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -745,8 +745,6 @@ async def handle_client(reader_clt, writer_clt): reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data - update_stats(user, connects=1) - if not USE_MIDDLE_PROXY: if FAST_MODE: tg_data = await do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=enc_key_and_iv) @@ -759,6 +757,8 @@ async def handle_client(reader_clt, writer_clt): if not tg_data: return + update_stats(user, connects=1) + reader_tg, writer_tg = tg_data if not USE_MIDDLE_PROXY and FAST_MODE: From ed088d94499bbb6097c5e9470d29b7b4cc8867ec Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 20:21:51 +0500 Subject: [PATCH 05/29] revert the last commit --- mtprotoproxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index e4e6f40..ba9cef8 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -745,6 +745,8 @@ async def handle_client(reader_clt, writer_clt): reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data + update_stats(user, connects=1) + if not USE_MIDDLE_PROXY: if FAST_MODE: tg_data = await do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=enc_key_and_iv) @@ -757,8 +759,6 @@ async def handle_client(reader_clt, writer_clt): if not tg_data: return - update_stats(user, connects=1) - reader_tg, writer_tg = tg_data if not USE_MIDDLE_PROXY and FAST_MODE: From 444a1876b6072613fee77e2492c87d196e0a4b3c Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 20:39:43 +0500 Subject: [PATCH 06/29] refactor task canceling a bit --- mtprotoproxy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ba9cef8..86a4860 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -801,18 +801,22 @@ async def handle_client(reader_clt, writer_clt): update_stats(user, octets=len(data)) wr.write(data, extra) await wr.drain() - except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e: + except (OSError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass finally: wr.transport.abort() - task_tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user) - task_clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user) + task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) + task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) update_stats(user, curr_connects=-1) + + task_tg_to_clt.cancel() + task_clt_to_tg.cancel() + writer_tg.transport.abort() From a20b1c99293b67ff3aa0c44a1687691d6a7bf92d Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Tue, 26 Jun 2018 22:53:46 +0500 Subject: [PATCH 07/29] simplify dissconnect logic --- mtprotoproxy.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 86a4860..2c28dcb 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -795,7 +795,6 @@ async def handle_client(reader_clt, writer_clt): if not data: wr.write_eof() await wr.drain() - wr.close() return else: update_stats(user, octets=len(data)) @@ -804,8 +803,6 @@ async def handle_client(reader_clt, writer_clt): except (OSError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass - finally: - wr.transport.abort() task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) From 32d3bffc7b8d440849de8e76e03a64225893c00c Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 01:04:06 +0500 Subject: [PATCH 08/29] Revert "simplify dissconnect logic". The idea with task cancelation doesn't work This reverts commit a20b1c99293b67ff3aa0c44a1687691d6a7bf92d. --- mtprotoproxy.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 2c28dcb..86a4860 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -795,6 +795,7 @@ async def handle_client(reader_clt, writer_clt): if not data: wr.write_eof() await wr.drain() + wr.close() return else: update_stats(user, octets=len(data)) @@ -803,6 +804,8 @@ async def handle_client(reader_clt, writer_clt): except (OSError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass + finally: + wr.transport.abort() task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) From b74079c4337939b695dc68c2a6979112eb368297 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 01:05:08 +0500 Subject: [PATCH 09/29] Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work This reverts commit 444a1876b6072613fee77e2492c87d196e0a4b3c. --- mtprotoproxy.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 86a4860..ba9cef8 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -801,22 +801,18 @@ async def handle_client(reader_clt, writer_clt): update_stats(user, octets=len(data)) wr.write(data, extra) await wr.drain() - except (OSError, asyncio.streams.IncompleteReadError) as e: + except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass finally: wr.transport.abort() - task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) - task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) + task_tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user) + task_clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user) update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) update_stats(user, curr_connects=-1) - - task_tg_to_clt.cancel() - task_clt_to_tg.cancel() - writer_tg.transport.abort() From 5f35b4ed0a259429cc9a50e5c56e7729de40f9c9 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 01:14:44 +0500 Subject: [PATCH 10/29] add debugging signal --- mtprotoproxy.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ba9cef8..bc83a28 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -12,6 +12,7 @@ import binascii import sys import re import runpy +import signal try: import uvloop @@ -65,6 +66,13 @@ except (ValueError, OSError): except ImportError: pass + +def debug_signal(signum, frame): + import pdb + pdb.set_trace() + +signal.signal(signal.SIGUSR1, debug_signal) + if len(sys.argv) > 1: config = runpy.run_path(sys.argv[1]) else: From d74bb68f031b955474a9800e4e72a2f93d85bbd8 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 11:11:45 +0500 Subject: [PATCH 11/29] Revert "Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work" This reverts commit b74079c4337939b695dc68c2a6979112eb368297. --- mtprotoproxy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index bc83a28..a470108 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -809,18 +809,22 @@ async def handle_client(reader_clt, writer_clt): update_stats(user, octets=len(data)) wr.write(data, extra) await wr.drain() - except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e: + except (OSError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass finally: wr.transport.abort() - task_tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user) - task_clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user) + task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) + task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) update_stats(user, curr_connects=-1) + + task_tg_to_clt.cancel() + task_clt_to_tg.cancel() + writer_tg.transport.abort() From 2e86308e90c9febbd418600dee306b684e9f8e55 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 11:11:50 +0500 Subject: [PATCH 12/29] Revert "Revert "simplify dissconnect logic". The idea with task cancelation doesn't work" This reverts commit 32d3bffc7b8d440849de8e76e03a64225893c00c. --- mtprotoproxy.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index a470108..0363482 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -803,7 +803,6 @@ async def handle_client(reader_clt, writer_clt): if not data: wr.write_eof() await wr.drain() - wr.close() return else: update_stats(user, octets=len(data)) @@ -812,8 +811,6 @@ async def handle_client(reader_clt, writer_clt): except (OSError, asyncio.streams.IncompleteReadError) as e: # print_err(e) pass - finally: - wr.transport.abort() task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) From 7eea7d3201f6c9e89f63058ab5fed59ed2ee0f64 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 11:13:42 +0500 Subject: [PATCH 13/29] replace infinite loop with timeout with while loop, when the client is bad --- mtprotoproxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 0363482..39d7afe 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -496,9 +496,9 @@ async def handle_handshake(reader, writer): writer = CryptoWrappedStreamWriter(writer, encryptor) return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv - while True: + while await reader.read(READ_BUF_SIZE): # just consume all the data - await reader.read(READ_BUF_SIZE) + pass return False From 71e3206b1916889bd59c7554dd4feb737a374ad3 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 13:33:51 +0500 Subject: [PATCH 14/29] check if signal exists before placing it. It can absent in some OSes, like Windows --- mtprotoproxy.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 39d7afe..cc2e608 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -66,12 +66,12 @@ except (ValueError, OSError): except ImportError: pass +if hasattr(signal, 'SIGUSR1'): + def debug_signal(signum, frame): + import pdb + pdb.set_trace() -def debug_signal(signum, frame): - import pdb - pdb.set_trace() - -signal.signal(signal.SIGUSR1, debug_signal) + signal.signal(signal.SIGUSR1, debug_signal) if len(sys.argv) > 1: config = runpy.run_path(sys.argv[1]) From 63b689e3bf61cd701a4cdfb4a670cc913f1906b8 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 18:25:40 +0500 Subject: [PATCH 15/29] Add a section about advanced usage --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4cd1689..e6be07c 100644 --- a/README.md +++ b/README.md @@ -16,4 +16,11 @@ To advertise a channel get a tag from **@MTProxybot** and write it to *config.py ## Performance ## The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on -the smallest VDS instance with 1 CPU core and 1024MB RAM. +the VDS instance with 1 CPU core and 1024MB RAM. + +## Advanced Usage ## + +The proxy can be launched: +- with a custom config: `python3 mtprotoproxy.py [configfile]` +- several times, clients will be automaticaly balanced between instances +- using *PyPy* interprteter From ec1c6b4fb6e838f97657dd9c96147dc690237c11 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 27 Jun 2018 20:04:05 +0500 Subject: [PATCH 16/29] we need at least one undocumented launching way :) --- mtprotoproxy.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index cc2e608..022eb22 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -73,10 +73,18 @@ if hasattr(signal, 'SIGUSR1'): signal.signal(signal.SIGUSR1, debug_signal) -if len(sys.argv) > 1: +if len(sys.argv) < 2: + config = runpy.run_module("config") +elif len(sys.argv) == 2: config = runpy.run_path(sys.argv[1]) else: - config = runpy.run_module("config") + # undocumented way of launching + config = {} + config["PORT"] = int(sys.argv[1]) + secrets = sys.argv[2].split(",") + config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))} + if len(sys.argv) > 3: + config["AD_TAG"] = sys.argv[3] PORT = config["PORT"] USERS = config["USERS"] From 532021ab8790285d72b25c98278d844c9199cba1 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Thu, 28 Jun 2018 20:47:12 +0500 Subject: [PATCH 17/29] support for cryptography module and advise to use it --- mtprotoproxy.py | 61 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 022eb22..128d7fd 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -20,7 +20,46 @@ try: except ImportError: pass -try: + +def try_use_cryptography_module(): + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.backends import default_backend + + def create_aes_ctr(key, iv): + class EncryptorAdapter: + def __init__(self, cipher): + self.encryptor = cipher.encryptor() + self.decryptor = cipher.decryptor() + + def encrypt(self, data): + return self.encryptor.update(data) + + def decrypt(self, data): + return self.decryptor.update(data) + + iv_bytes = int.to_bytes(iv, 16, "big") + cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend()) + return EncryptorAdapter(cipher) + + def create_aes_cbc(key, iv): + class EncryptorAdapter: + def __init__(self, cipher): + self.encryptor = cipher.encryptor() + self.decryptor = cipher.decryptor() + + def encrypt(self, data): + return self.encryptor.update(data) + + def decrypt(self, data): + return self.decryptor.update(data) + + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend()) + return EncryptorAdapter(cipher) + + return create_aes_ctr, create_aes_cbc + + +def try_use_pycrypto_or_pycryptodome_module(): from Crypto.Cipher import AES from Crypto.Util import Counter @@ -31,11 +70,16 @@ try: def create_aes_cbc(key, iv): return AES.new(key, AES.MODE_CBC, iv) -except ImportError: - print("Failed to find pycryptodome or pycrypto, using slow AES implementation", - flush=True, file=sys.stderr) + return create_aes_ctr, create_aes_cbc + + +def use_slow_bundled_cryptography_module(): import pyaes + msg = "To make the program a *lot* faster, please install cryptography module: " + msg += "pip install cryptography\n" + print(msg, flush=True, file=sys.stderr) + def create_aes_ctr(key, iv): ctr = pyaes.Counter(iv) return pyaes.AESModeOfOperationCTR(key, ctr) @@ -55,8 +99,17 @@ except ImportError: mode = pyaes.AESModeOfOperationCBC(key, iv) return EncryptorAdapter(mode) + return create_aes_ctr, create_aes_cbc +try: + create_aes_ctr, create_aes_cbc = try_use_cryptography_module() +except ImportError: + try: + create_aes_ctr, create_aes_cbc = try_use_pycrypto_or_pycryptodome_module() + except ImportError: + create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module() + try: import resource soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE) From 3477402c0d781bc536e5201ed2a79d2fa2fc0180 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Fri, 29 Jun 2018 01:07:16 +0500 Subject: [PATCH 18/29] use cryptography module in docker file, do not copy pyaes --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3545fb8..3622790 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,10 +2,9 @@ FROM alpine:3.6 RUN adduser tgproxy -u 10000 -D -RUN apk add --no-cache python3 py3-crypto ca-certificates libcap +RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap COPY mtprotoproxy.py config.py /home/tgproxy/ -COPY pyaes/*.py /home/tgproxy/pyaes/ RUN chown -R tgproxy:tgproxy /home/tgproxy RUN setcap cap_net_bind_service=+ep /usr/bin/python3.6 From 03f7ca1d4c6ea0d39d34482ca6f3eabe4d31f34f Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Fri, 29 Jun 2018 02:00:46 +0500 Subject: [PATCH 19/29] more reliable logic to check reuseport availability --- mtprotoproxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 128d7fd..ee5af3e 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -1069,7 +1069,7 @@ def main(): middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) asyncio.ensure_future(middle_proxy_updater_task) - reuse_port = (sys.platform != "win32") + reuse_port = hasattr(socket, "SO_REUSEPORT") task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT, limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop) From 6a270966181c40c1437058ea2e5400e48572aa15 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Fri, 29 Jun 2018 17:52:37 +0500 Subject: [PATCH 20/29] add secure tag --- mtprotoproxy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ee5af3e..9a50ed3 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -203,6 +203,7 @@ DC_IDX_POS = 60 PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef" PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee" +PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd" CBC_PADDING = 16 PADDING_FILLER = b"\x04\x00\x00\x00" @@ -548,7 +549,7 @@ async def handle_handshake(reader, writer): decrypted = decryptor.decrypt(handshake) proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4] - if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE): + if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE): continue dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True) From 372861ac6e3dd3d1d4996282f0905c36c5163fba Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Fri, 29 Jun 2018 18:51:47 +0500 Subject: [PATCH 21/29] support for secure mode --- mtprotoproxy.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 9a50ed3..a381f48 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -430,6 +430,10 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase): data = await self.upstream.readexactly(msg_len) + if msg_len % 4 != 0: + cut_border = msg_len - (msg_len % 4) + data = data[:cut_border] + return data, extra @@ -847,7 +851,7 @@ async def handle_client(reader_clt, writer_clt): if proto_tag == PROTO_TAG_ABRIDGED: reader_clt = MTProtoCompactFrameStreamReader(reader_clt) writer_clt = MTProtoCompactFrameStreamWriter(writer_clt) - elif proto_tag == PROTO_TAG_INTERMEDIATE: + elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE): reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt) writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt) else: @@ -1033,6 +1037,10 @@ def print_tg_info(): params_encodeded = urllib.parse.urlencode(params, safe=':') print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True) + params = {"server": ip, "port": PORT, "secret": "dd" + secret} + params_encodeded = urllib.parse.urlencode(params, safe=':') + print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True) + def loop_exception_handler(loop, context): exception = context.get("exception") From b31768165c3691b6216104d25fb8f0ba0c32e008 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sat, 30 Jun 2018 22:54:11 +0500 Subject: [PATCH 22/29] buffers redesign --- mtprotoproxy.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index a381f48..ad6e115 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -149,8 +149,8 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) FAST_MODE = config.get("FAST_MODE", True) STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600) PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24) -READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384) -WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536) +TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) +TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) @@ -562,7 +562,8 @@ async def handle_handshake(reader, writer): writer = CryptoWrappedStreamWriter(writer, encryptor) return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv - while await reader.read(READ_BUF_SIZE): + EMPTY_READ_BUF_SIZE = 4096 + while await reader.read(EMPTY_READ_BUF_SIZE): # just consume all the data pass @@ -588,7 +589,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): try: reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, - limit=READ_BUF_SIZE) + limit=TO_CLT_BUFSIZE) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) return False @@ -668,7 +669,7 @@ def set_keepalive(sock, interval=40, attempts=5): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) -def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE): +def set_bufsizes(sock, recv_buf, send_buf): sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) @@ -700,9 +701,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) try: - reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE) + reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE) set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", addr, port) return False @@ -724,7 +725,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): old_reader = reader_tgt reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) - ans = await reader_tgt.read(READ_BUF_SIZE) + ans = await reader_tgt.read(TO_CLT_BUFSIZE) if len(ans) != RPC_NONCE_ANS_LEN: return False @@ -806,7 +807,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def handle_client(reader_clt, writer_clt): set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) - set_bufsizes(writer_clt.get_extra_info("socket")) + set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) try: clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt), @@ -857,10 +858,10 @@ async def handle_client(reader_clt, writer_clt): else: return - async def connect_reader_to_writer(rd, wr, user): + async def connect_reader_to_writer(rd, wr, user, rd_buf_size): try: while True: - data = await rd.read(READ_BUF_SIZE) + data = await rd.read(rd_buf_size) if isinstance(data, tuple): data, extra = data else: @@ -878,8 +879,10 @@ async def handle_client(reader_clt, writer_clt): # print_err(e) pass - task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) - task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) + tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE) + clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE) + task_tg_to_clt = asyncio.ensure_future(tg_to_clt) + task_clt_to_tg = asyncio.ensure_future(clt_to_tg) update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) @@ -1081,12 +1084,12 @@ def main(): reuse_port = hasattr(socket, "SO_REUSEPORT") task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT, - limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop) + limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) server_v4 = loop.run_until_complete(task_v4) if socket.has_ipv6: task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT, - limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop) + limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) server_v6 = loop.run_until_complete(task_v6) try: From 675d5a6aba17f6571eb6909c70e3f69b58a03957 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sat, 30 Jun 2018 23:09:43 +0500 Subject: [PATCH 23/29] send buffer size on the direct handshake also --- mtprotoproxy.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index ad6e115..d9b6c42 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -570,6 +570,21 @@ async def handle_handshake(reader, writer): return False +def set_keepalive(sock, interval=40, attempts=5): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if hasattr(socket, "TCP_KEEPIDLE"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval) + if hasattr(socket, "TCP_KEEPINTVL"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval) + if hasattr(socket, "TCP_KEEPCNT"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) + + +def set_bufsizes(sock, recv_buf, send_buf): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) + + async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): RESERVED_NONCE_FIRST_CHARS = [b"\xef"] RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54", @@ -590,6 +605,9 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): try: reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE) + set_keepalive(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) + except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) return False @@ -659,21 +677,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por return key, iv -def set_keepalive(sock, interval=40, attempts=5): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - if hasattr(socket, "TCP_KEEPIDLE"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval) - if hasattr(socket, "TCP_KEEPINTVL"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval) - if hasattr(socket, "TCP_KEEPCNT"): - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) - - -def set_bufsizes(sock, recv_buf, send_buf): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) - - async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): START_SEQ_NO = -2 NONCE_LEN = 16 From b38084bf361363ab5b79613364974441d5a4dc44 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sun, 1 Jul 2018 01:40:30 +0500 Subject: [PATCH 24/29] add information about Prometheus to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index e6be07c..b2fed90 100644 --- a/README.md +++ b/README.md @@ -24,3 +24,4 @@ The proxy can be launched: - with a custom config: `python3 mtprotoproxy.py [configfile]` - several times, clients will be automaticaly balanced between instances - using *PyPy* interprteter +- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch From bcac5eb878a57930227b0ad96eaaa156eb24789c Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sun, 1 Jul 2018 16:43:54 +0500 Subject: [PATCH 25/29] add sending timeout --- mtprotoproxy.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index d9b6c42..d6ce3ee 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -153,6 +153,7 @@ TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) +CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5 * 60) TG_DATACENTER_PORT = 443 @@ -580,6 +581,11 @@ def set_keepalive(sock, interval=40, attempts=5): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) +def set_ack_timeout(sock, timeout): + if hasattr(socket, "TCP_USER_TIMEOUT"): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000) + + def set_bufsizes(sock, recv_buf, send_buf): sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) @@ -810,6 +816,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def handle_client(reader_clt, writer_clt): set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) + set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT) set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) try: From 33fabe7590b7655d21dd58ada4dbcbfc106a093e Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Mon, 2 Jul 2018 00:47:35 +0500 Subject: [PATCH 26/29] ignore no route to host error --- mtprotoproxy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index d6ce3ee..c569aa7 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -1066,7 +1066,8 @@ def loop_exception_handler(loop, context): return if isinstance(exception, OSError): IGNORE_ERRNO = { - 10038 # operation on non-socket on Windows, likely because fd == -1 + 10038, # operation on non-socket on Windows, likely because fd == -1 + 113 # no route to host } if exception.errno in IGNORE_ERRNO: return From 0caf5f89a831b6e650b0120037bafcf88a2ac447 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Mon, 2 Jul 2018 02:28:43 +0500 Subject: [PATCH 27/29] count msgs --- mtprotoproxy.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index c569aa7..4c101b1 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -224,14 +224,14 @@ def init_stats(): stats = {user: collections.Counter() for user in USERS} -def update_stats(user, connects=0, curr_connects=0, octets=0): +def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0): global stats if user not in stats: stats[user] = collections.Counter() stats[user].update(connects=connects, curr_connects=curr_connects, - octets=octets) + octets=octets, msgs=msgs) class LayeredStreamReaderBase: @@ -882,7 +882,7 @@ async def handle_client(reader_clt, writer_clt): await wr.drain() return else: - update_stats(user, octets=len(data)) + update_stats(user, octets=len(data), msgs=1) wr.write(data, extra) await wr.drain() except (OSError, asyncio.streams.IncompleteReadError) as e: @@ -920,9 +920,9 @@ async def stats_printer(): print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S")) for user, stat in stats.items(): - print("%s: %d connects (%d current), %.2f MB" % ( + print("%s: %d connects (%d current), %.2f MB, %d msgs" % ( user, stat["connects"], stat["curr_connects"], - stat["octets"] / 1000000)) + stat["octets"] / 1000000, stat["msgs"])) print(flush=True) From 0a7e2d85b8bb072f689ac1bf97020725d5e5dcf6 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 4 Jul 2018 13:54:27 +0500 Subject: [PATCH 28/29] shrink timeouts, removed annoying message about timeouts --- mtprotoproxy.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 4c101b1..474eb23 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -148,12 +148,12 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) # disables tg->client trafic reencryption, faster but less secure FAST_MODE = config.get("FAST_MODE", True) STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600) -PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24) +PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60) TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) -CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) +CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) -CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5 * 60) +CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60) TG_DATACENTER_PORT = 443 @@ -815,7 +815,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def handle_client(reader_clt, writer_clt): - set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) + set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3) set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT) set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) @@ -1061,16 +1061,23 @@ def loop_exception_handler(loop, context): if exception: if isinstance(exception, TimeoutError): if transport: - print_err("Timeout, killing transport") transport.abort() return if isinstance(exception, OSError): IGNORE_ERRNO = { 10038, # operation on non-socket on Windows, likely because fd == -1 - 113 # no route to host + } + + FORCE_CLOSE_ERRNO = { + 113, # no route to host + } if exception.errno in IGNORE_ERRNO: return + elif exception.errno in FORCE_CLOSE_ERRNO: + if transport: + transport.abort() + return loop.default_exception_handler(context) From 6f8bfdb5685ecff55c6c7dba84fb6e69191864d6 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Thu, 5 Jul 2018 15:45:53 +0500 Subject: [PATCH 29/29] add timeout error to errno --- mtprotoproxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 474eb23..80716b1 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -1066,6 +1066,7 @@ def loop_exception_handler(loop, context): if isinstance(exception, OSError): IGNORE_ERRNO = { 10038, # operation on non-socket on Windows, likely because fd == -1 + 121, # the semaphore timeout period has expired on Windows } FORCE_CLOSE_ERRNO = {