buffers redesign

This commit is contained in:
Alexander Bersenev
2018-06-30 22:54:11 +05:00
parent 372861ac6e
commit b31768165c

View File

@@ -149,8 +149,8 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
FAST_MODE = config.get("FAST_MODE", True)
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
@@ -562,7 +562,8 @@ async def handle_handshake(reader, writer):
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
while await reader.read(READ_BUF_SIZE):
EMPTY_READ_BUF_SIZE = 4096
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
@@ -588,7 +589,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
limit=READ_BUF_SIZE)
limit=TO_CLT_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
@@ -668,7 +669,7 @@ def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
def set_bufsizes(sock, recv_buf, send_buf):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
@@ -700,9 +701,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE)
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
@@ -724,7 +725,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(READ_BUF_SIZE)
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
if len(ans) != RPC_NONCE_ANS_LEN:
return False
@@ -806,7 +807,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
set_bufsizes(writer_clt.get_extra_info("socket"))
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
@@ -857,10 +858,10 @@ async def handle_client(reader_clt, writer_clt):
else:
return
async def connect_reader_to_writer(rd, wr, user):
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
try:
while True:
data = await rd.read(READ_BUF_SIZE)
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
@@ -878,8 +879,10 @@ async def handle_client(reader_clt, writer_clt):
# print_err(e)
pass
task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
update_stats(user, curr_connects=1)
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
@@ -1081,12 +1084,12 @@ def main():
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop)
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop)
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6)
try: