diff --git a/mtprotoproxy.py b/mtprotoproxy.py index e8d2275..cbaaec1 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -154,6 +154,7 @@ TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60) +TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10) TG_DATACENTER_PORT = 443 @@ -591,6 +592,21 @@ def set_bufsizes(sock, recv_buf, send_buf): sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) +async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3): + for attempt in range(max_attempts-1): + try: + task = asyncio.open_connection(addr, port, limit=limit) + reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout) + return reader_tgt, writer_tgt + except (OSError, asyncio.TimeoutError): + continue + + # the last attempt + task = asyncio.open_connection(addr, port, limit=limit) + reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout) + return reader_tgt, writer_tgts + + async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): RESERVED_NONCE_FIRST_CHARS = [b"\xef"] RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54", @@ -609,18 +625,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): dc = TG_DATACENTERS_V4[dc_idx] try: - reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, - limit=TO_CLT_BUFSIZE) - set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) - + reader_tgt, writer_tgt = await open_connection_tryer( + dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) return False - except OSError as E: + except (OSError, asyncio.TimeoutError) as E: print_err("Unable to connect to", dc, TG_DATACENTER_PORT) return False + set_keepalive(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) + while True: rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)]) if rnd[:1] in RESERVED_NONCE_FIRST_CHARS: @@ -710,16 +726,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) try: - reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE) - set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) + reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE, + timeout=TG_CONNECT_TIMEOUT) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", addr, port) return False - except OSError as E: + except (OSError, asyncio.TimeoutError) as E: print_err("Unable to connect to", addr, port) return False + set_keepalive(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) + writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO) key_selector = PROXY_SECRET[:4]