From a6e39a66d160de5055232c35f5fd7fa6e85697be Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Mon, 18 Jun 2018 01:35:01 +0500 Subject: [PATCH] merge using intermediate protocol for middle proxy --- mtprotoproxy.py | 55 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 15f0b37..1bc05fd 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -314,16 +314,12 @@ class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase): class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase): - def __init__(self, upstream, seq_no=0): - self.upstream = upstream - self.seq_no = seq_no - def write(self, data): SMALL_PKT_BORDER = 0x7f LARGE_PKT_BORGER = 256 ** 3 if len(data) % 4 != 0: - print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(data)) + print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len", len(data)) return 0 len_div_four = len(data) // 4 @@ -338,6 +334,24 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase): return 0 +class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase): + async def read(self, buf_size): + msg_len_bytes = await self.upstream.readexactly(4) + msg_len = int.from_bytes(msg_len_bytes, "little") + + if msg_len > 0x80000000: + msg_len -= 0x80000000 + + data = await self.upstream.readexactly(msg_len) + + return data + + +class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase): + def write(self, data): + return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data) + + class ProxyReqStreamReader(LayeredStreamReaderBase): async def read(self, msg): RPC_PROXY_ANS = b"\x0d\xda\x03\x44" @@ -360,7 +374,7 @@ class ProxyReqStreamReader(LayeredStreamReaderBase): class ProxyReqStreamWriter(LayeredStreamWriterBase): - def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port): + def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port, proto_tag): self.upstream = upstream if ":" not in cl_ip: @@ -378,6 +392,13 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): self.our_ip_port += int.to_bytes(my_port, 4, "little") self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)]) + if proto_tag == PROTO_TAG_ABRIDGED: + self.last_flag_byte = b"\x40" + elif proto_tag == PROTO_TAG_INTERMEDIATE: + self.last_flag_byte = b"\x20" + else: + self.last_flag_byte = b"\x00" + def write(self, msg): RPC_PROXY_REQ = b"\xee\xf1\xce\x36" EXTRA_SIZE = b"\x18\x00\x00\x00" @@ -389,12 +410,12 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase): return 0 if msg.startswith(b"\x00" * 8): - FLAGS = b"\x0a\x10\x02\x40" + flags = b"\x0a\x10\x02" + self.last_flag_byte else: - FLAGS = b"\x08\x10\x02\x40" + flags = b"\x08\x10\x02" + self.last_flag_byte full_msg = bytearray() - full_msg += RPC_PROXY_REQ + FLAGS + self.out_conn_id + full_msg += RPC_PROXY_REQ + flags + self.out_conn_id full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER full_msg += msg @@ -537,7 +558,7 @@ def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE): sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) -async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port): +async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): START_SEQ_NO = -2 NONCE_LEN = 16 @@ -662,7 +683,7 @@ async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port): if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID: return False - writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port) + writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag) reader_tgt = ProxyReqStreamReader(reader_tgt) return reader_tgt, writer_tgt @@ -688,7 +709,7 @@ async def handle_client(reader_clt, writer_clt): tg_data = await do_direct_handshake(proto_tag, dc_idx) else: cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2] - tg_data = await do_middleproxy_handshake(dc_idx, cl_ip, cl_port) + tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port) if not tg_data: writer_clt.transport.abort() @@ -709,8 +730,14 @@ async def handle_client(reader_clt, writer_clt): writer_clt.encryptor = FakeEncryptor() if USE_MIDDLE_PROXY: - reader_clt = MTProtoCompactFrameStreamReader(reader_clt) - writer_clt = MTProtoCompactFrameStreamWriter(writer_clt) + if proto_tag == PROTO_TAG_ABRIDGED: + reader_clt = MTProtoCompactFrameStreamReader(reader_clt) + writer_clt = MTProtoCompactFrameStreamWriter(writer_clt) + elif proto_tag == PROTO_TAG_INTERMEDIATE: + reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt) + writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt) + else: + return async def connect_reader_to_writer(rd, wr, user): update_stats(user, curr_connects_x2=1)