move more logic to pooled connection

This commit is contained in:
Alexander Bersenev
2019-11-13 02:56:05 +05:00
parent 4a4d449a34
commit 522b0cfe75

View File

@@ -1345,15 +1345,19 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
return key, iv
async def middleproxy_handshake_after_connect(host, port, reader_tgt, writer_tgt):
""" The first stage of middleproxy handshake """
async def middleproxy_handshake(host, port, reader_tgt, writer_tgt):
""" The most logic of middleproxy handshake, launched in pool """
START_SEQ_NO = -2
NONCE_LEN = 16
RPC_HANDSHAKE = b"\xf5\xee\x82\x76"
RPC_NONCE = b"\xaa\x87\xcb\x7a"
# pass as consts to simplify code
RPC_FLAGS = b"\x00\x00\x00\x00"
CRYPTO_AES = b"\x01\x00\x00\x00"
RPC_NONCE_ANS_LEN = 32
RPC_HANDSHAKE_ANS_LEN = 32
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
key_selector = PROXY_SECRET[:4]
@@ -1379,49 +1383,12 @@ async def middleproxy_handshake_after_connect(host, port, reader_tgt, writer_tgt
if rpc_type != RPC_NONCE or rpc_key_selector != key_selector or rpc_schema != CRYPTO_AES:
raise ConnectionAbortedError("bad rpc answer")
return reader_tgt, writer_tgt, nonce, rpc_nonce, crypto_ts
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
RPC_HANDSHAKE = b"\xf5\xee\x82\x76"
# pass as consts to simplify code
RPC_FLAGS = b"\x00\x00\x00\x00"
RPC_HANDSHAKE_ANS_LEN = 32
global my_ip_info
global tg_connection_pool
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
use_ipv6_clt = (":" in cl_ip)
if use_ipv6_tg:
if dc_idx not in TG_MIDDLE_PROXIES_V6:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
else:
if dc_idx not in TG_MIDDLE_PROXIES_V4:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
ret = await tg_connection_pool.get_connection(addr, port,
middleproxy_handshake_after_connect)
reader_tgt, writer_tgt, nonce, rpc_nonce, crypto_ts = ret
except ConnectionRefusedError as E:
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
return False
except ConnectionAbortedError as E:
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
return False
# get keys
tg_ip, tg_port = writer_tgt.upstream.get_extra_info('peername')[:2]
my_ip, my_port = writer_tgt.upstream.get_extra_info('sockname')[:2]
use_ipv6_tg = (":" in tg_ip)
if not use_ipv6_tg:
if my_ip_info["ipv4"]:
# prefer global ip settings to work behind NAT
@@ -1472,11 +1439,42 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
handshake_ans = await reader_tgt.read(1)
if len(handshake_ans) != RPC_HANDSHAKE_ANS_LEN:
return False
raise ConnectionAbortedError("bad rpc handshake answer length")
handshake_type, handshake_flags, handshake_sender_pid, handshake_peer_pid = (
handshake_ans[:4], handshake_ans[4:8], handshake_ans[8:20], handshake_ans[20:32])
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
raise ConnectionAbortedError("bad rpc handshake answer")
return reader_tgt, writer_tgt, my_ip, my_port
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
global my_ip_info
global tg_connection_pool
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
if use_ipv6_tg:
if dc_idx not in TG_MIDDLE_PROXIES_V6:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
else:
if dc_idx not in TG_MIDDLE_PROXIES_V4:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
ret = await tg_connection_pool.get_connection(addr, port, middleproxy_handshake)
reader_tgt, writer_tgt, my_ip, my_port = ret
except ConnectionRefusedError as E:
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
return False
except ConnectionAbortedError as E:
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
return False
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)