From 6da40e47d27840b608d373855bde1f663368fa5b Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Sat, 2 Jun 2018 16:40:55 +0500 Subject: [PATCH] refactoring for advertising support --- mtprotoproxy.py | 102 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index bc658b0..4ff1dfa 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -49,6 +49,8 @@ TG_DATACENTERS_V6 = [ "2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a" ] +USE_MIDDLE_PROXY = False + SKIP_LEN = 8 PREKEY_LEN = 32 KEY_LEN = 32 @@ -74,6 +76,30 @@ def update_stats(user, connects=0, curr_connects_x2=0, octets=0): octets=octets) +class CryptoWrappedStreamReader: + def __init__(self, stream, decryptor): + self.stream = stream + self.decryptor = decryptor + + def __getattr__(self, attr): + return self.stream.__getattribute__(attr) + + async def read(self, n): + return self.decryptor.decrypt(await self.stream.read(n)) + + +class CryptoWrappedStreamWriter: + def __init__(self, stream, encryptor): + self.stream = stream + self.encryptor = encryptor + + def __getattr__(self, attr): + return self.stream.__getattribute__(attr) + + def write(self, data): + return self.stream.write(self.encryptor.encrypt(data)) + + async def handle_handshake(reader, writer): handshake = await reader.readexactly(HANDSHAKE_LEN) @@ -98,25 +124,27 @@ async def handle_handshake(reader, writer): dc_idx = abs(int.from_bytes(decrypted[60:62], "little", signed=True)) - 1 - if PREFER_IPV6: - if not 0 <= dc_idx < len(TG_DATACENTERS_V6): - continue - dc = TG_DATACENTERS_V6[dc_idx] - else: - if not 0 <= dc_idx < len(TG_DATACENTERS_V4): - continue - dc = TG_DATACENTERS_V4[dc_idx] - - return encryptor, decryptor, user, dc, enc_key + enc_iv + reader = CryptoWrappedStreamReader(reader, decryptor) + writer = CryptoWrappedStreamWriter(writer, encryptor) + return reader, writer, user, dc_idx, enc_key + enc_iv return False -async def do_handshake(dc, dec_key_and_iv=None): +async def do_direct_handshake(dc_idx, dec_key_and_iv=None): RESERVED_NONCE_FIRST_CHARS = [b"\xef"] RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54", b"\x47\x45\x54\x20", b"\xee\xee\xee\xee"] RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"] + if PREFER_IPV6: + if not 0 <= dc_idx < len(TG_DATACENTERS_V6): + return False + dc = TG_DATACENTERS_V6[dc_idx] + else: + if not 0 <= dc_idx < len(TG_DATACENTERS_V4): + return False + dc = TG_DATACENTERS_V4[dc_idx] + try: reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT) except ConnectionRefusedError as E: @@ -154,30 +182,49 @@ async def do_handshake(dc, dec_key_and_iv=None): writer_tgt.write(rnd_enc) await writer_tgt.drain() - return encryptor, decryptor, reader_tgt, writer_tgt + reader_tgt = CryptoWrappedStreamReader(reader_tgt, decryptor) + writer_tgt = CryptoWrappedStreamWriter(writer_tgt, encryptor) + + return reader_tgt, writer_tgt -async def handle_client(reader, writer): - clt_data = await handle_handshake(reader, writer) +async def handle_client(reader_clt, writer_clt): + clt_data = await handle_handshake(reader_clt, writer_clt) if not clt_data: - writer.close() + writer_clt.close() return - clt_enc, clt_dec, user, dc, enc_key_and_iv = clt_data + reader_clt, writer_clt, user, dc_idx, enc_key_and_iv = clt_data update_stats(user, connects=1) - if FAST_MODE: - tg_data = await do_handshake(dc, dec_key_and_iv=enc_key_and_iv) + if not USE_MIDDLE_PROXY: + if FAST_MODE: + tg_data = await do_direct_handshake(dc_idx, dec_key_and_iv=enc_key_and_iv) + else: + tg_data = await do_direct_handshake(dc_idx) else: - tg_data = await do_handshake(dc) + tg_data = await do_middleproxy_handshake(dc_idx) + if not tg_data: - writer.close() + writer_clt.close() return - tg_enc, tg_dec, reader_tg, writer_tg = tg_data + reader_tg, writer_tg = tg_data - async def connect_reader_to_writer(rd, wr, rd_dec, wr_enc, user, fast=False): + if not USE_MIDDLE_PROXY and FAST_MODE: + class FakeEncryptor: + def encrypt(self, data): + return data + + class FakeDecryptor: + def decrypt(self, data): + return data + + reader_tg.decryptor = FakeDecryptor() + writer_clt.encryptor = FakeEncryptor() + + async def connect_reader_to_writer(rd, wr, user): update_stats(user, curr_connects_x2=1) try: while True: @@ -189,12 +236,6 @@ async def handle_client(reader, writer): return else: update_stats(user, octets=len(data)) - - before_data = data - if not fast: - dec_data = rd_dec.decrypt(data) - data = wr_enc.encrypt(dec_data) - wr.write(data) await wr.drain() except (ConnectionResetError, BrokenPipeError, OSError, @@ -204,9 +245,8 @@ async def handle_client(reader, writer): finally: update_stats(user, curr_connects_x2=-1) - asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer, tg_dec, clt_enc, user, - fast=FAST_MODE)) - asyncio.ensure_future(connect_reader_to_writer(reader, writer_tg, clt_dec, tg_enc, user)) + asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) + asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) async def handle_client_wrapper(reader, writer):