refactoring for advertising support

This commit is contained in:
Alexander Bersenev
2018-06-02 16:40:55 +05:00
parent dc982cacfa
commit 6da40e47d2

View File

@@ -49,6 +49,8 @@ TG_DATACENTERS_V6 = [
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
]
USE_MIDDLE_PROXY = False
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
@@ -74,6 +76,30 @@ def update_stats(user, connects=0, curr_connects_x2=0, octets=0):
octets=octets)
class CryptoWrappedStreamReader:
def __init__(self, stream, decryptor):
self.stream = stream
self.decryptor = decryptor
def __getattr__(self, attr):
return self.stream.__getattribute__(attr)
async def read(self, n):
return self.decryptor.decrypt(await self.stream.read(n))
class CryptoWrappedStreamWriter:
def __init__(self, stream, encryptor):
self.stream = stream
self.encryptor = encryptor
def __getattr__(self, attr):
return self.stream.__getattribute__(attr)
def write(self, data):
return self.stream.write(self.encryptor.encrypt(data))
async def handle_handshake(reader, writer):
handshake = await reader.readexactly(HANDSHAKE_LEN)
@@ -98,25 +124,27 @@ async def handle_handshake(reader, writer):
dc_idx = abs(int.from_bytes(decrypted[60:62], "little", signed=True)) - 1
if PREFER_IPV6:
if not 0 <= dc_idx < len(TG_DATACENTERS_V6):
continue
dc = TG_DATACENTERS_V6[dc_idx]
else:
if not 0 <= dc_idx < len(TG_DATACENTERS_V4):
continue
dc = TG_DATACENTERS_V4[dc_idx]
return encryptor, decryptor, user, dc, enc_key + enc_iv
reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, user, dc_idx, enc_key + enc_iv
return False
async def do_handshake(dc, dec_key_and_iv=None):
async def do_direct_handshake(dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
b"\x47\x45\x54\x20", b"\xee\xee\xee\xee"]
RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"]
if PREFER_IPV6:
if not 0 <= dc_idx < len(TG_DATACENTERS_V6):
return False
dc = TG_DATACENTERS_V6[dc_idx]
else:
if not 0 <= dc_idx < len(TG_DATACENTERS_V4):
return False
dc = TG_DATACENTERS_V4[dc_idx]
try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT)
except ConnectionRefusedError as E:
@@ -154,30 +182,49 @@ async def do_handshake(dc, dec_key_and_iv=None):
writer_tgt.write(rnd_enc)
await writer_tgt.drain()
return encryptor, decryptor, reader_tgt, writer_tgt
reader_tgt = CryptoWrappedStreamReader(reader_tgt, decryptor)
writer_tgt = CryptoWrappedStreamWriter(writer_tgt, encryptor)
return reader_tgt, writer_tgt
async def handle_client(reader, writer):
clt_data = await handle_handshake(reader, writer)
async def handle_client(reader_clt, writer_clt):
clt_data = await handle_handshake(reader_clt, writer_clt)
if not clt_data:
writer.close()
writer_clt.close()
return
clt_enc, clt_dec, user, dc, enc_key_and_iv = clt_data
reader_clt, writer_clt, user, dc_idx, enc_key_and_iv = clt_data
update_stats(user, connects=1)
if FAST_MODE:
tg_data = await do_handshake(dc, dec_key_and_iv=enc_key_and_iv)
if not USE_MIDDLE_PROXY:
if FAST_MODE:
tg_data = await do_direct_handshake(dc_idx, dec_key_and_iv=enc_key_and_iv)
else:
tg_data = await do_direct_handshake(dc_idx)
else:
tg_data = await do_handshake(dc)
tg_data = await do_middleproxy_handshake(dc_idx)
if not tg_data:
writer.close()
writer_clt.close()
return
tg_enc, tg_dec, reader_tg, writer_tg = tg_data
reader_tg, writer_tg = tg_data
async def connect_reader_to_writer(rd, wr, rd_dec, wr_enc, user, fast=False):
if not USE_MIDDLE_PROXY and FAST_MODE:
class FakeEncryptor:
def encrypt(self, data):
return data
class FakeDecryptor:
def decrypt(self, data):
return data
reader_tg.decryptor = FakeDecryptor()
writer_clt.encryptor = FakeEncryptor()
async def connect_reader_to_writer(rd, wr, user):
update_stats(user, curr_connects_x2=1)
try:
while True:
@@ -189,12 +236,6 @@ async def handle_client(reader, writer):
return
else:
update_stats(user, octets=len(data))
before_data = data
if not fast:
dec_data = rd_dec.decrypt(data)
data = wr_enc.encrypt(dec_data)
wr.write(data)
await wr.drain()
except (ConnectionResetError, BrokenPipeError, OSError,
@@ -204,9 +245,8 @@ async def handle_client(reader, writer):
finally:
update_stats(user, curr_connects_x2=-1)
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer, tg_dec, clt_enc, user,
fast=FAST_MODE))
asyncio.ensure_future(connect_reader_to_writer(reader, writer_tg, clt_dec, tg_enc, user))
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
async def handle_client_wrapper(reader, writer):