diff --git a/README.md b/README.md index e46ca87..9afe521 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,16 @@ -# Async mtproto proxy # +# Async MTProto Proxy # Fast and simple to setup mtproto proxy. **This is pre-alpha. Don't recommended for production use yet** -## Starting up ## +## Starting Up ## 1. `git clone https://github.com/alexbers/mtprotoproxy.git; cd mtprotoproxy` -2. *(optional, recommended)* edit *config.py*, set **PORT** and **USERS** +2. *(optional, recommended)* edit *config.py*, set **PORT**, **USERS** and **AD_TAG** 3. `docker-compose up --build -d` (or just `python3 mtprotoproxy.py` if you don't like docker) 4. *(optional, shows telegram link to set the proxy)* `docker-compose logs` + +## Channel Advertising "" + +To advertise a channel get a tag from **@MTProxybot** and write it to *config.py*. diff --git a/config.py b/config.py index 4f1bb1a..23297f2 100644 --- a/config.py +++ b/config.py @@ -5,3 +5,6 @@ USERS = { "tg": "00000000000000000000000000000000", "tg2": "0123456789abcdef0123456789abcdef" } + +# Tag for advertising, obtainable from @MTProxybot +# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d" diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 4ff1dfa..3f3ecb7 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -8,23 +8,43 @@ import collections import time import hashlib import random +import binascii try: from Crypto.Cipher import AES from Crypto.Util import Counter - def create_aes(key, iv): + def create_aes_ctr(key, iv): ctr = Counter.new(128, initial_value=iv) return AES.new(key, AES.MODE_CTR, counter=ctr) + def create_aes_cbc(key, iv): + return AES.new(key, AES.MODE_CBC, iv) + except ImportError: print("Failed to find pycrypto, using slow AES version", flush=True) import pyaes - def create_aes(key, iv): + def create_aes_ctr(key, iv): ctr = pyaes.Counter(iv) return pyaes.AESModeOfOperationCTR(key, ctr) + def create_aes_cbc(key, iv): + class EncryptorAdapter: + def __init__(self, mode): + self.mode = mode + + def encrypt(self, data): + encrypter = pyaes.Encrypter(self.mode, pyaes.PADDING_NONE) + return encrypter.feed(data) + encrypter.feed() + + def decrypt(self, data): + decrypter = pyaes.Decrypter(self.mode, pyaes.PADDING_NONE) + return decrypter.feed(data) + decrypter.feed() + + mode = pyaes.AESModeOfOperationCBC(key, iv) + return EncryptorAdapter(mode) + import config PORT = getattr(config, "PORT") @@ -36,6 +56,7 @@ PREFER_IPV6 = getattr(config, "PREFER_IPV6", False) FAST_MODE = getattr(config, "FAST_MODE", True) STATS_PRINT_PERIOD = getattr(config, "STATS_PRINT_PERIOD", 600) READ_BUF_SIZE = getattr(config, "READ_BUF_SIZE", 4096) +AD_TAG = bytes.fromhex(getattr(config, "AD_TAG", "")) TG_DATACENTER_PORT = 443 @@ -49,7 +70,20 @@ TG_DATACENTERS_V6 = [ "2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a" ] -USE_MIDDLE_PROXY = False +TG_MIDDLE_PROXIES_V4 = [ + ("149.154.175.50", 8888), ("149.154.162.38", 80), ("149.154.175.100", 8888), + ("91.108.4.136", 8888), ("91.108.56.181", 8888) +] + + +USE_MIDDLE_PROXY = (len(AD_TAG) == 16) + +PROXY_SECRET = bytes.fromhex( + "c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" + + "d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" + + "6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" + + "54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212" +) SKIP_LEN = 8 PREKEY_LEN = 32 @@ -60,6 +94,14 @@ MAGIC_VAL_POS = 56 MAGIC_VAL_TO_CHECK = b'\xef\xef\xef\xef' +CBC_PADDING = 16 +PADDING_FILLER = b"\x04\x00\x00\x00" + +MIN_MSG_LEN = 12 +MAX_MSG_LEN = 2 ** 24 + +global_my_ip = None + def init_stats(): global stats @@ -77,27 +119,233 @@ def update_stats(user, connects=0, curr_connects_x2=0, octets=0): class CryptoWrappedStreamReader: - def __init__(self, stream, decryptor): + def __init__(self, stream, decryptor, block_size=1): self.stream = stream self.decryptor = decryptor + self.block_size = block_size + self.buf = bytearray() def __getattr__(self, attr): - return self.stream.__getattribute__(attr) + return getattr(self.stream, attr) async def read(self, n): - return self.decryptor.decrypt(await self.stream.read(n)) + if self.buf: + ret = self.buf + self.buf.clear() + return ret + else: + readed = await self.stream.read(n) + + needed_till_full_block = -len(readed) % self.block_size + if needed_till_full_block > 0: + readed += self.stream.readexactly(needed_till_full_block) + return self.decryptor.decrypt(readed) + + async def readexactly(self, n): + if n > len(self.buf): + to_read = n - len(self.buf) + needed_till_full_block = -to_read % self.block_size + + to_read_block_aligned = to_read + needed_till_full_block + data = await self.stream.readexactly(to_read_block_aligned) + self.buf += self.decryptor.decrypt(data) + + ret = bytes(self.buf[:n]) + self.buf = self.buf[n:] + return ret class CryptoWrappedStreamWriter: - def __init__(self, stream, encryptor): + def __init__(self, stream, encryptor, block_size=1): self.stream = stream self.encryptor = encryptor + self.block_size = block_size def __getattr__(self, attr): - return self.stream.__getattribute__(attr) + return getattr(self.stream, attr) def write(self, data): - return self.stream.write(self.encryptor.encrypt(data)) + if len(data) % self.block_size != 0: + print("BUG: writing %d bytes not aligned to block size %d" % ( + len(data), self.block_size)) + return 0 + q = self.encryptor.encrypt(data) + return self.stream.write(q) + + +class MTProtoFrameStreamReader: + def __init__(self, stream, seq_no=0): + self.stream = stream + self.seq_no = seq_no + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + async def read(self, buf_size): + msg_len_bytes = await self.stream.readexactly(4) + msg_len = int.from_bytes(msg_len_bytes, "little") + # skip paddings + while msg_len == 4: + msg_len_bytes = await self.stream.readexactly(4) + msg_len = int.from_bytes(msg_len_bytes, "little") + + len_is_impossible = (msg_len % len(PADDING_FILLER) != 0) + if not MIN_MSG_LEN <= msg_len <= MAX_MSG_LEN or len_is_impossible: + print("msg_len is bad, closing connection", msg_len) + self.stream.feed_eof() + return b"" + + msg_seq_bytes = await self.stream.readexactly(4) + msg_seq = int.from_bytes(msg_seq_bytes, "little", signed=True) + if msg_seq != self.seq_no: + print("unexpected seq_no") + self.stream.feed_eof() + return b"" + + self.seq_no += 1 + + data = await self.stream.readexactly(msg_len - 4 - 4 - 4) + checksum_bytes = await self.stream.readexactly(4) + checksum = int.from_bytes(checksum_bytes, "little") + + computed_checksum = binascii.crc32(msg_len_bytes + msg_seq_bytes + data) + if computed_checksum != checksum: + self.stream.feed_eof() + return b"" + return data + + +class MTProtoCompactFrameStreamReader: + def __init__(self, stream): + self.stream = stream + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + async def read(self, buf_size): + msg_len_bytes = await self.stream.readexactly(1) + msg_len = int.from_bytes(msg_len_bytes, "little") + + if msg_len >= 0x80: + msg_len -= 0x80 + + if msg_len == 0x7f: + msg_len_bytes = await self.stream.readexactly(3) + msg_len = int.from_bytes(msg_len_bytes, "little") + + msg_len *= 4 + + data = await self.stream.readexactly(msg_len) + + return data + + +class MTProtoCompactFrameStreamWriter: + def __init__(self, stream, seq_no=0): + self.stream = stream + self.seq_no = seq_no + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + def write(self, data): + SMALL_PKT_BORDER = 0x7f + LARGE_PKT_BORGER = 256 ** 3 + + if len(data) % 4 != 0: + print("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(msg)) + return 0 + + len_div_four = len(data) // 4 + + if len_div_four < SMALL_PKT_BORDER: + return self.stream.write(bytes([len_div_four]) + data) + elif len_div_four < LARGE_PKT_BORGER: + return self.stream.write(b'\x7f' + bytes(int.to_bytes(len_div_four, 3, 'little')) + + data) + else: + print("Attempted to send too large pkt len =", len(data)) + return 0 + + +class MTProtoFrameStreamWriter: + def __init__(self, stream, seq_no=0): + self.stream = stream + self.seq_no = seq_no + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + def write(self, msg): + len_bytes = int.to_bytes(len(msg) + 4 + 4 + 4, 4, "little") + seq_bytes = int.to_bytes(self.seq_no, 4, "little", signed=True) + self.seq_no += 1 + + msg_without_checksum = len_bytes + seq_bytes + msg + checksum = int.to_bytes(binascii.crc32(msg_without_checksum), 4, "little") + + full_msg = msg_without_checksum + checksum + padding = PADDING_FILLER * ((-len(full_msg) % CBC_PADDING) // len(PADDING_FILLER)) + + return self.stream.write(full_msg + padding) + + +class ProxyReqStreamReader: + def __init__(self, stream): + self.stream = stream + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + async def read(self, msg): + RPC_PROXY_ANS = b"\x0d\xda\x03\x44" + RPC_CLOSE_EXT = b"\xa2\x34\xb6\x5e" + + data = await self.stream.read(1) + + if len(data) < 4: + return b"" + + ans_type, ans_flags, conn_id, conn_data = data[:4], data[4:8], data[8:16], data[16:] + if ans_type == RPC_CLOSE_EXT: + self.feed_eof() + return b"" + + if ans_type != RPC_PROXY_ANS: + print("ans_type != RPC_PROXY_ANS", ans_type) + return b"" + + return conn_data + + +class ProxyReqStreamWriter: + def __init__(self, stream): + self.stream = stream + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + def write(self, msg): + RPC_PROXY_REQ = b"\xee\xf1\xce\x36" + FLAGS = b"\x08\x10\x02\x40" + OUT_CONN_ID = bytearray([random.randrange(0, 256) for i in range(8)]) + REMOTE_IP_PORT = b"A" * 20 + OUR_IP_PORT = b"B" * 20 + EXTRA_SIZE = b"\x18\x00\x00\x00" + PROXY_TAG = b"\xae\x26\x1e\xdb" + FOUR_BYTES_ALIGNER = b"\x00\x00\x00" + + if len(msg) % 4 != 0: + print("BUG: attempted to send msg with len %d" % len(msg)) + return 0 + + full_msg = bytearray() + full_msg += RPC_PROXY_REQ + FLAGS + OUT_CONN_ID + REMOTE_IP_PORT + full_msg += OUR_IP_PORT + EXTRA_SIZE + PROXY_TAG + full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER + full_msg += msg + + return self.stream.write(full_msg) async def handle_handshake(reader, writer): @@ -109,12 +357,12 @@ async def handle_handshake(reader, writer): dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN] dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:] dec_key = hashlib.sha256(dec_prekey + secret).digest() - decryptor = create_aes(key=dec_key, iv=int.from_bytes(dec_iv, "big")) + decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big")) enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1] enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:] enc_key = hashlib.sha256(enc_prekey + secret).digest() - encryptor = create_aes(key=enc_key, iv=int.from_bytes(enc_iv, "big")) + encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big")) decrypted = decryptor.decrypt(handshake) @@ -123,6 +371,8 @@ async def handle_handshake(reader, writer): continue dc_idx = abs(int.from_bytes(decrypted[60:62], "little", signed=True)) - 1 + if dc_idx == 0: + continue reader = CryptoWrappedStreamReader(reader, decryptor) writer = CryptoWrappedStreamWriter(writer, encryptor) @@ -171,11 +421,11 @@ async def do_direct_handshake(dc_idx, dec_key_and_iv=None): dec_key_and_iv = rnd[SKIP_LEN:SKIP_LEN+KEY_LEN+IV_LEN][::-1] dec_key, dec_iv = dec_key_and_iv[:KEY_LEN], dec_key_and_iv[KEY_LEN:] - decryptor = create_aes(key=dec_key, iv=int.from_bytes(dec_iv, "big")) + decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big")) enc_key_and_iv = rnd[SKIP_LEN:SKIP_LEN+KEY_LEN+IV_LEN] enc_key, enc_iv = enc_key_and_iv[:KEY_LEN], enc_key_and_iv[KEY_LEN:] - encryptor = create_aes(key=enc_key, iv=int.from_bytes(enc_iv, "big")) + encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big")) rnd_enc = rnd[:MAGIC_VAL_POS] + encryptor.encrypt(rnd)[MAGIC_VAL_POS:] @@ -188,6 +438,128 @@ async def do_direct_handshake(dc_idx, dec_key_and_iv=None): return reader_tgt, writer_tgt +def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_port, purpose, + clt_ip, srv_port, middleproxy_secret, clt_ipv6=None, + srv_ipv6=None): + + s = bytearray() + s += nonce_srv + nonce_clt + clt_ts + srv_ip + clt_port + purpose + clt_ip + srv_port + s += middleproxy_secret + nonce_srv + + if clt_ipv6 and srv_ipv6: + s += clt_ipv6 + srv_ipv6 + + s += nonce_clt + + md5_sum = hashlib.md5(s[1:]).digest() + sha1_sum = hashlib.sha1(s).digest() + + key = md5_sum[:12] + sha1_sum + iv = hashlib.md5(s[2:]).digest() + return key, iv + + +async def do_middleproxy_handshake(dc_idx): + START_SEQ_NO = -2 + NONCE_LEN = 16 + + RPC_NONCE = b"\xaa\x87\xcb\x7a" + RPC_HANDSHAKE = b"\xf5\xee\x82\x76" + CRYPTO_AES = b"\x01\x00\x00\x00" + + RPC_NONCE_ANS_LEN = 32 + RPC_HANDSHAKE_ANS_LEN = 32 + + # pass as consts to simplify code + RPC_FLAGS = b"\x00\x00\x00\x00" + SENDER_PID = b"IPIPPRPDTIME" + PEER_PID = b"IPIPPRPDTIME" + + if not 0 <= dc_idx < len(TG_MIDDLE_PROXIES_V4): + return False + addr, port = TG_MIDDLE_PROXIES_V4[dc_idx] + + try: + reader_tgt, writer_tgt = await asyncio.open_connection(addr, port) + except ConnectionRefusedError as E: + return False + except OSError as E: + return False + + writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO) + + key_selector = PROXY_SECRET[:4] + crypto_ts = int.to_bytes(int(time.time()) % (256**4), 4, "little") + + nonce = bytes([random.randrange(0, 256) for i in range(NONCE_LEN)]) + + msg = RPC_NONCE + key_selector + CRYPTO_AES + crypto_ts + nonce + + writer_tgt.write(msg) + await writer_tgt.drain() + + old_reader = reader_tgt + reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) + ans = await reader_tgt.read(READ_BUF_SIZE) + + if len(ans) != RPC_NONCE_ANS_LEN: + return False + + rpc_type, rpc_key_selector, rpc_schema, rpc_crypto_ts, rpc_nonce = ( + ans[:4], ans[4:8], ans[8:12], ans[12:16], ans[16:32] + ) + + if rpc_type != RPC_NONCE or rpc_key_selector != key_selector or rpc_schema != CRYPTO_AES: + return False + + # get keys + tg_ip, tg_port = writer_tgt.stream.get_extra_info('peername') + my_ip, my_port = writer_tgt.stream.get_extra_info('sockname') + + tg_ip_bytes = socket.inet_pton(socket.AF_INET, tg_ip)[::-1] + my_ip_bytes = socket.inet_pton(socket.AF_INET, global_my_ip)[::-1] + + tg_port_bytes = int.to_bytes(tg_port, 2, "little") + my_port_bytes = int.to_bytes(my_port, 2, "little") + + # TODO: IPv6 support + enc_key, enc_iv = get_middleproxy_aes_key_and_iv( + nonce_srv=rpc_nonce, nonce_clt=nonce, clt_ts=crypto_ts, srv_ip=tg_ip_bytes, + clt_port=my_port_bytes, purpose=b"CLIENT", clt_ip=my_ip_bytes, + srv_port=tg_port_bytes, middleproxy_secret=PROXY_SECRET, clt_ipv6=None, srv_ipv6=None) + + dec_key, dec_iv = get_middleproxy_aes_key_and_iv( + nonce_srv=rpc_nonce, nonce_clt=nonce, clt_ts=crypto_ts, srv_ip=tg_ip_bytes, + clt_port=my_port_bytes, purpose=b"SERVER", clt_ip=my_ip_bytes, + srv_port=tg_port_bytes, middleproxy_secret=PROXY_SECRET, clt_ipv6=None, srv_ipv6=None) + + encryptor = create_aes_cbc(key=enc_key, iv=enc_iv) + decryptor = create_aes_cbc(key=dec_key, iv=dec_iv) + + # TODO: pass client ip and port here for statistics + handshake = RPC_HANDSHAKE + RPC_FLAGS + SENDER_PID + PEER_PID + + writer_tgt.stream = CryptoWrappedStreamWriter(writer_tgt.stream, encryptor, block_size=16) + writer_tgt.write(handshake) + await writer_tgt.drain() + + reader_tgt.stream = CryptoWrappedStreamReader(reader_tgt.stream, decryptor, block_size=16) + + handshake_ans = await reader_tgt.read(1) + if len(handshake_ans) != RPC_HANDSHAKE_ANS_LEN: + return False + + handshake_type, handshake_flags, handshake_sender_pid, handshake_peer_pid = ( + handshake_ans[:4], handshake_ans[4:8], handshake_ans[8:20], handshake_ans[20:32]) + if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID: + return False + + writer_tgt = ProxyReqStreamWriter(writer_tgt) + reader_tgt = ProxyReqStreamReader(reader_tgt) + + return reader_tgt, writer_tgt + + async def handle_client(reader_clt, writer_clt): clt_data = await handle_handshake(reader_clt, writer_clt) if not clt_data: @@ -195,7 +567,7 @@ async def handle_client(reader_clt, writer_clt): return reader_clt, writer_clt, user, dc_idx, enc_key_and_iv = clt_data - + update_stats(user, connects=1) if not USE_MIDDLE_PROXY: @@ -224,6 +596,10 @@ async def handle_client(reader_clt, writer_clt): reader_tg.decryptor = FakeDecryptor() writer_clt.encryptor = FakeEncryptor() + if USE_MIDDLE_PROXY: + reader_clt = MTProtoCompactFrameStreamReader(reader_clt) + writer_clt = MTProtoCompactFrameStreamWriter(writer_clt) + async def connect_reader_to_writer(rd, wr, user): update_stats(user, curr_connects_x2=1) try: @@ -239,7 +615,7 @@ async def handle_client(reader_clt, writer_clt): wr.write(data) await wr.drain() except (ConnectionResetError, BrokenPipeError, OSError, - AttributeError) as e: + AttributeError, asyncio.streams.IncompleteReadError) as e: wr.close() # print(e) finally: @@ -270,13 +646,20 @@ async def stats_printer(): def print_tg_info(): + global USE_MIDDLE_PROXY + try: with urllib.request.urlopen('https://ifconfig.co/ip') as f: if f.status != 200: raise Exception("Invalid status code") my_ip = f.read().decode().strip() + global global_my_ip + global_my_ip = my_ip except Exception: my_ip = 'YOUR_IP' + if USE_MIDDLE_PROXY: + print("Failed to determine your ip, advertising disabled", flush=True) + USE_MIDDLE_PROXY = False for user, secret in sorted(USERS.items(), key=lambda x: x[0]): params = {