From b31768165c3691b6216104d25fb8f0ba0c32e008 Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sat, 30 Jun 2018 22:54:11 +0500 Subject: [PATCH] buffers redesign --- mtprotoproxy.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index a381f48..ad6e115 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -149,8 +149,8 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) FAST_MODE = config.get("FAST_MODE", True) STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600) PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24) -READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384) -WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536) +TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) +TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) @@ -562,7 +562,8 @@ async def handle_handshake(reader, writer): writer = CryptoWrappedStreamWriter(writer, encryptor) return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv - while await reader.read(READ_BUF_SIZE): + EMPTY_READ_BUF_SIZE = 4096 + while await reader.read(EMPTY_READ_BUF_SIZE): # just consume all the data pass @@ -588,7 +589,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): try: reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, - limit=READ_BUF_SIZE) + limit=TO_CLT_BUFSIZE) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) return False @@ -668,7 +669,7 @@ def set_keepalive(sock, interval=40, attempts=5): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) -def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE): +def set_bufsizes(sock, recv_buf, send_buf): sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) @@ -700,9 +701,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) try: - reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE) + reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE) set_keepalive(writer_tgt.get_extra_info("socket")) - set_bufsizes(writer_tgt.get_extra_info("socket")) + set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) except ConnectionRefusedError as E: print_err("Got connection refused while trying to connect to", addr, port) return False @@ -724,7 +725,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): old_reader = reader_tgt reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) - ans = await reader_tgt.read(READ_BUF_SIZE) + ans = await reader_tgt.read(TO_CLT_BUFSIZE) if len(ans) != RPC_NONCE_ANS_LEN: return False @@ -806,7 +807,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def handle_client(reader_clt, writer_clt): set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) - set_bufsizes(writer_clt.get_extra_info("socket")) + set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) try: clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt), @@ -857,10 +858,10 @@ async def handle_client(reader_clt, writer_clt): else: return - async def connect_reader_to_writer(rd, wr, user): + async def connect_reader_to_writer(rd, wr, user, rd_buf_size): try: while True: - data = await rd.read(READ_BUF_SIZE) + data = await rd.read(rd_buf_size) if isinstance(data, tuple): data, extra = data else: @@ -878,8 +879,10 @@ async def handle_client(reader_clt, writer_clt): # print_err(e) pass - task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) - task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) + tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE) + clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE) + task_tg_to_clt = asyncio.ensure_future(tg_to_clt) + task_clt_to_tg = asyncio.ensure_future(clt_to_tg) update_stats(user, curr_connects=1) await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED) @@ -1081,12 +1084,12 @@ def main(): reuse_port = hasattr(socket, "SO_REUSEPORT") task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT, - limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop) + limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) server_v4 = loop.run_until_complete(task_v4) if socket.has_ipv6: task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT, - limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop) + limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) server_v6 = loop.run_until_complete(task_v6) try: