mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Merge branch 'master' into stable
Synchronize branches
This commit is contained in:
165
mtprotoproxy.py
165
mtprotoproxy.py
@@ -11,6 +11,7 @@ import random
|
|||||||
import binascii
|
import binascii
|
||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
|
import runpy
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -59,21 +60,24 @@ except (ValueError, OSError):
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
config = runpy.run_path(sys.argv[1])
|
||||||
|
else:
|
||||||
|
config = runpy.run_module("config")
|
||||||
|
|
||||||
import config
|
PORT = config["PORT"]
|
||||||
PORT = getattr(config, "PORT")
|
USERS = config["USERS"]
|
||||||
USERS = getattr(config, "USERS")
|
|
||||||
|
|
||||||
# load advanced settings
|
# load advanced settings
|
||||||
PREFER_IPV6 = getattr(config, "PREFER_IPV6", socket.has_ipv6)
|
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||||
# disables tg->client trafic reencryption, faster but less secure
|
# disables tg->client trafic reencryption, faster but less secure
|
||||||
FAST_MODE = getattr(config, "FAST_MODE", True)
|
FAST_MODE = config.get("FAST_MODE", True)
|
||||||
STATS_PRINT_PERIOD = getattr(config, "STATS_PRINT_PERIOD", 600)
|
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||||
PROXY_INFO_UPDATE_PERIOD = getattr(config, "PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
||||||
READ_BUF_SIZE = getattr(config, "READ_BUF_SIZE", 16384)
|
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
|
||||||
WRITE_BUF_SIZE = getattr(config, "WRITE_BUF_SIZE", 65536)
|
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
|
||||||
CLIENT_KEEPALIVE = getattr(config, "CLIENT_KEEPALIVE", 60*30)
|
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
||||||
AD_TAG = bytes.fromhex(getattr(config, "AD_TAG", ""))
|
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||||
|
|
||||||
TG_DATACENTER_PORT = 443
|
TG_DATACENTER_PORT = 443
|
||||||
|
|
||||||
@@ -119,10 +123,11 @@ PREKEY_LEN = 32
|
|||||||
KEY_LEN = 32
|
KEY_LEN = 32
|
||||||
IV_LEN = 16
|
IV_LEN = 16
|
||||||
HANDSHAKE_LEN = 64
|
HANDSHAKE_LEN = 64
|
||||||
MAGIC_VAL_POS = 56
|
PROTO_TAG_POS = 56
|
||||||
DC_IDX_POS = 60
|
DC_IDX_POS = 60
|
||||||
|
|
||||||
MAGIC_VAL_TO_CHECK = b'\xef\xef\xef\xef'
|
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
||||||
|
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
||||||
|
|
||||||
CBC_PADDING = 16
|
CBC_PADDING = 16
|
||||||
PADDING_FILLER = b"\x04\x00\x00\x00"
|
PADDING_FILLER = b"\x04\x00\x00\x00"
|
||||||
@@ -167,7 +172,7 @@ class LayeredStreamWriterBase:
|
|||||||
def __init__(self, upstream):
|
def __init__(self, upstream):
|
||||||
self.upstream = upstream
|
self.upstream = upstream
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data, extra={}):
|
||||||
return self.upstream.write(data)
|
return self.upstream.write(data)
|
||||||
|
|
||||||
def write_eof(self):
|
def write_eof(self):
|
||||||
@@ -229,7 +234,7 @@ class CryptoWrappedStreamWriter(LayeredStreamWriterBase):
|
|||||||
self.encryptor = encryptor
|
self.encryptor = encryptor
|
||||||
self.block_size = block_size
|
self.block_size = block_size
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data, extra={}):
|
||||||
if len(data) % self.block_size != 0:
|
if len(data) % self.block_size != 0:
|
||||||
print_err("BUG: writing %d bytes not aligned to block size %d" % (
|
print_err("BUG: writing %d bytes not aligned to block size %d" % (
|
||||||
len(data), self.block_size))
|
len(data), self.block_size))
|
||||||
@@ -279,7 +284,7 @@ class MTProtoFrameStreamWriter(LayeredStreamWriterBase):
|
|||||||
self.upstream = upstream
|
self.upstream = upstream
|
||||||
self.seq_no = seq_no
|
self.seq_no = seq_no
|
||||||
|
|
||||||
def write(self, msg):
|
def write(self, msg, extra={}):
|
||||||
len_bytes = int.to_bytes(len(msg) + 4 + 4 + 4, 4, "little")
|
len_bytes = int.to_bytes(len(msg) + 4 + 4 + 4, 4, "little")
|
||||||
seq_bytes = int.to_bytes(self.seq_no, 4, "little", signed=True)
|
seq_bytes = int.to_bytes(self.seq_no, 4, "little", signed=True)
|
||||||
self.seq_no += 1
|
self.seq_no += 1
|
||||||
@@ -298,7 +303,9 @@ class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase):
|
|||||||
msg_len_bytes = await self.upstream.readexactly(1)
|
msg_len_bytes = await self.upstream.readexactly(1)
|
||||||
msg_len = int.from_bytes(msg_len_bytes, "little")
|
msg_len = int.from_bytes(msg_len_bytes, "little")
|
||||||
|
|
||||||
|
extra = {"QUICKACK_FLAG": False}
|
||||||
if msg_len >= 0x80:
|
if msg_len >= 0x80:
|
||||||
|
extra["QUICKACK_FLAG"] = True
|
||||||
msg_len -= 0x80
|
msg_len -= 0x80
|
||||||
|
|
||||||
if msg_len == 0x7f:
|
if msg_len == 0x7f:
|
||||||
@@ -309,57 +316,84 @@ class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase):
|
|||||||
|
|
||||||
data = await self.upstream.readexactly(msg_len)
|
data = await self.upstream.readexactly(msg_len)
|
||||||
|
|
||||||
return data
|
return data, extra
|
||||||
|
|
||||||
|
|
||||||
class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
||||||
def __init__(self, upstream, seq_no=0):
|
def write(self, data, extra={}):
|
||||||
self.upstream = upstream
|
|
||||||
self.seq_no = seq_no
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
SMALL_PKT_BORDER = 0x7f
|
SMALL_PKT_BORDER = 0x7f
|
||||||
LARGE_PKT_BORGER = 256 ** 3
|
LARGE_PKT_BORGER = 256 ** 3
|
||||||
|
|
||||||
if len(data) % 4 != 0:
|
if len(data) % 4 != 0:
|
||||||
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(msg))
|
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len", len(data))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
if extra.get("SIMPLE_ACK"):
|
||||||
|
return self.upstream.write(data[::-1])
|
||||||
|
|
||||||
len_div_four = len(data) // 4
|
len_div_four = len(data) // 4
|
||||||
|
|
||||||
if len_div_four < SMALL_PKT_BORDER:
|
if len_div_four < SMALL_PKT_BORDER:
|
||||||
return self.upstream.write(bytes([len_div_four]) + data)
|
return self.upstream.write(bytes([len_div_four]) + data)
|
||||||
elif len_div_four < LARGE_PKT_BORGER:
|
elif len_div_four < LARGE_PKT_BORGER:
|
||||||
return self.upstream.write(b'\x7f' + bytes(int.to_bytes(len_div_four, 3, 'little')) +
|
return self.upstream.write(b'\x7f' + int.to_bytes(len_div_four, 3, 'little') + data)
|
||||||
data)
|
|
||||||
else:
|
else:
|
||||||
print_err("Attempted to send too large pkt len =", len(data))
|
print_err("Attempted to send too large pkt len =", len(data))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
||||||
|
async def read(self, buf_size):
|
||||||
|
msg_len_bytes = await self.upstream.readexactly(4)
|
||||||
|
msg_len = int.from_bytes(msg_len_bytes, "little")
|
||||||
|
|
||||||
|
extra = {}
|
||||||
|
if msg_len > 0x80000000:
|
||||||
|
extra["QUICKACK_FLAG"] = True
|
||||||
|
msg_len -= 0x80000000
|
||||||
|
|
||||||
|
data = await self.upstream.readexactly(msg_len)
|
||||||
|
|
||||||
|
return data, extra
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
||||||
|
def write(self, data, extra={}):
|
||||||
|
if extra.get("SIMPLE_ACK"):
|
||||||
|
return self.upstream.write(data)
|
||||||
|
else:
|
||||||
|
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
|
||||||
|
|
||||||
|
|
||||||
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||||
async def read(self, msg):
|
async def read(self, msg):
|
||||||
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
||||||
RPC_CLOSE_EXT = b"\xa2\x34\xb6\x5e"
|
RPC_CLOSE_EXT = b"\xa2\x34\xb6\x5e"
|
||||||
|
RPC_SIMPLE_ACK = b"\x9b\x40\xac\x3b"
|
||||||
|
|
||||||
data = await self.upstream.read(1)
|
data = await self.upstream.read(1)
|
||||||
|
|
||||||
if len(data) < 4:
|
if len(data) < 4:
|
||||||
return b""
|
return b""
|
||||||
|
|
||||||
ans_type, ans_flags, conn_id, conn_data = data[:4], data[4:8], data[8:16], data[16:]
|
ans_type = data[:4]
|
||||||
if ans_type == RPC_CLOSE_EXT:
|
if ans_type == RPC_CLOSE_EXT:
|
||||||
return b""
|
return b""
|
||||||
|
|
||||||
if ans_type != RPC_PROXY_ANS:
|
if ans_type == RPC_PROXY_ANS:
|
||||||
print_err("ans_type != RPC_PROXY_ANS", ans_type)
|
ans_flags, conn_id, conn_data = data[4:8], data[8:16], data[16:]
|
||||||
return b""
|
return conn_data
|
||||||
|
|
||||||
return conn_data
|
if ans_type == RPC_SIMPLE_ACK:
|
||||||
|
conn_id, confirm = data[4:12], data[12:16]
|
||||||
|
return confirm, {"SIMPLE_ACK": True}
|
||||||
|
|
||||||
|
print_err("unknown rpc ans type:", ans_type)
|
||||||
|
return b""
|
||||||
|
|
||||||
|
|
||||||
class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||||
def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port):
|
def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port, proto_tag):
|
||||||
self.upstream = upstream
|
self.upstream = upstream
|
||||||
|
|
||||||
if ":" not in cl_ip:
|
if ":" not in cl_ip:
|
||||||
@@ -377,23 +411,41 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
|||||||
self.our_ip_port += int.to_bytes(my_port, 4, "little")
|
self.our_ip_port += int.to_bytes(my_port, 4, "little")
|
||||||
self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)])
|
self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)])
|
||||||
|
|
||||||
def write(self, msg):
|
self.proto_tag = proto_tag
|
||||||
|
|
||||||
|
def write(self, msg, extra={}):
|
||||||
RPC_PROXY_REQ = b"\xee\xf1\xce\x36"
|
RPC_PROXY_REQ = b"\xee\xf1\xce\x36"
|
||||||
EXTRA_SIZE = b"\x18\x00\x00\x00"
|
EXTRA_SIZE = b"\x18\x00\x00\x00"
|
||||||
PROXY_TAG = b"\xae\x26\x1e\xdb"
|
PROXY_TAG = b"\xae\x26\x1e\xdb"
|
||||||
FOUR_BYTES_ALIGNER = b"\x00\x00\x00"
|
FOUR_BYTES_ALIGNER = b"\x00\x00\x00"
|
||||||
|
|
||||||
|
FLAG_NOT_ENCRYPTED = 0x2
|
||||||
|
FLAG_HAS_AD_TAG = 0x8
|
||||||
|
FLAG_MAGIC = 0x1000
|
||||||
|
FLAG_EXTMODE2 = 0x20000
|
||||||
|
FLAG_INTERMEDIATE = 0x20000000
|
||||||
|
FLAG_ABRIDGED = 0x40000000
|
||||||
|
FLAG_QUICKACK = 0x80000000
|
||||||
|
|
||||||
if len(msg) % 4 != 0:
|
if len(msg) % 4 != 0:
|
||||||
print_err("BUG: attempted to send msg with len %d" % len(msg))
|
print_err("BUG: attempted to send msg with len %d" % len(msg))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
flags = FLAG_HAS_AD_TAG | FLAG_MAGIC | FLAG_EXTMODE2
|
||||||
|
|
||||||
|
if self.proto_tag == PROTO_TAG_ABRIDGED:
|
||||||
|
flags |= FLAG_ABRIDGED
|
||||||
|
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
|
flags |= FLAG_INTERMEDIATE
|
||||||
|
|
||||||
|
if extra.get("QUICKACK_FLAG"):
|
||||||
|
flags |= FLAG_QUICKACK
|
||||||
|
|
||||||
if msg.startswith(b"\x00" * 8):
|
if msg.startswith(b"\x00" * 8):
|
||||||
FLAGS = b"\x0a\x10\x02\x40"
|
flags |= FLAG_NOT_ENCRYPTED
|
||||||
else:
|
|
||||||
FLAGS = b"\x08\x10\x02\x40"
|
|
||||||
|
|
||||||
full_msg = bytearray()
|
full_msg = bytearray()
|
||||||
full_msg += RPC_PROXY_REQ + FLAGS + self.out_conn_id
|
full_msg += RPC_PROXY_REQ + int.to_bytes(flags, 4, "little") + self.out_conn_id
|
||||||
full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG
|
full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG
|
||||||
full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER
|
full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER
|
||||||
full_msg += msg
|
full_msg += msg
|
||||||
@@ -420,19 +472,19 @@ async def handle_handshake(reader, writer):
|
|||||||
|
|
||||||
decrypted = decryptor.decrypt(handshake)
|
decrypted = decryptor.decrypt(handshake)
|
||||||
|
|
||||||
check_val = decrypted[MAGIC_VAL_POS:MAGIC_VAL_POS+4]
|
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
|
||||||
if check_val != MAGIC_VAL_TO_CHECK:
|
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||||
|
|
||||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||||
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
||||||
return reader, writer, user, dc_idx, enc_key + enc_iv
|
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def do_direct_handshake(dc_idx, dec_key_and_iv=None):
|
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||||
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
||||||
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
||||||
b"\x47\x45\x54\x20", b"\xee\xee\xee\xee"]
|
b"\x47\x45\x54\x20", b"\xee\xee\xee\xee"]
|
||||||
@@ -469,7 +521,7 @@ async def do_direct_handshake(dc_idx, dec_key_and_iv=None):
|
|||||||
continue
|
continue
|
||||||
break
|
break
|
||||||
|
|
||||||
rnd[MAGIC_VAL_POS:MAGIC_VAL_POS+4] = MAGIC_VAL_TO_CHECK
|
rnd[PROTO_TAG_POS:PROTO_TAG_POS+4] = proto_tag
|
||||||
|
|
||||||
if dec_key_and_iv:
|
if dec_key_and_iv:
|
||||||
rnd[SKIP_LEN:SKIP_LEN+KEY_LEN+IV_LEN] = dec_key_and_iv[::-1]
|
rnd[SKIP_LEN:SKIP_LEN+KEY_LEN+IV_LEN] = dec_key_and_iv[::-1]
|
||||||
@@ -484,7 +536,7 @@ async def do_direct_handshake(dc_idx, dec_key_and_iv=None):
|
|||||||
enc_key, enc_iv = enc_key_and_iv[:KEY_LEN], enc_key_and_iv[KEY_LEN:]
|
enc_key, enc_iv = enc_key_and_iv[:KEY_LEN], enc_key_and_iv[KEY_LEN:]
|
||||||
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
|
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
|
||||||
|
|
||||||
rnd_enc = rnd[:MAGIC_VAL_POS] + encryptor.encrypt(rnd)[MAGIC_VAL_POS:]
|
rnd_enc = rnd[:PROTO_TAG_POS] + encryptor.encrypt(rnd)[PROTO_TAG_POS:]
|
||||||
|
|
||||||
writer_tgt.write(rnd_enc)
|
writer_tgt.write(rnd_enc)
|
||||||
await writer_tgt.drain()
|
await writer_tgt.drain()
|
||||||
@@ -536,7 +588,7 @@ def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
|
|||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||||
|
|
||||||
|
|
||||||
async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||||
START_SEQ_NO = -2
|
START_SEQ_NO = -2
|
||||||
NONCE_LEN = 16
|
NONCE_LEN = 16
|
||||||
|
|
||||||
@@ -661,7 +713,7 @@ async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
|
|||||||
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port)
|
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)
|
||||||
reader_tgt = ProxyReqStreamReader(reader_tgt)
|
reader_tgt = ProxyReqStreamReader(reader_tgt)
|
||||||
|
|
||||||
return reader_tgt, writer_tgt
|
return reader_tgt, writer_tgt
|
||||||
@@ -676,18 +728,18 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
writer_clt.transport.abort()
|
writer_clt.transport.abort()
|
||||||
return
|
return
|
||||||
|
|
||||||
reader_clt, writer_clt, user, dc_idx, enc_key_and_iv = clt_data
|
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
|
||||||
|
|
||||||
update_stats(user, connects=1)
|
update_stats(user, connects=1)
|
||||||
|
|
||||||
if not USE_MIDDLE_PROXY:
|
if not USE_MIDDLE_PROXY:
|
||||||
if FAST_MODE:
|
if FAST_MODE:
|
||||||
tg_data = await do_direct_handshake(dc_idx, dec_key_and_iv=enc_key_and_iv)
|
tg_data = await do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=enc_key_and_iv)
|
||||||
else:
|
else:
|
||||||
tg_data = await do_direct_handshake(dc_idx)
|
tg_data = await do_direct_handshake(proto_tag, dc_idx)
|
||||||
else:
|
else:
|
||||||
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
|
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
|
||||||
tg_data = await do_middleproxy_handshake(dc_idx, cl_ip, cl_port)
|
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
||||||
|
|
||||||
if not tg_data:
|
if not tg_data:
|
||||||
writer_clt.transport.abort()
|
writer_clt.transport.abort()
|
||||||
@@ -708,14 +760,25 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
writer_clt.encryptor = FakeEncryptor()
|
writer_clt.encryptor = FakeEncryptor()
|
||||||
|
|
||||||
if USE_MIDDLE_PROXY:
|
if USE_MIDDLE_PROXY:
|
||||||
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
||||||
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
||||||
|
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
||||||
|
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
|
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
||||||
|
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
async def connect_reader_to_writer(rd, wr, user):
|
async def connect_reader_to_writer(rd, wr, user):
|
||||||
update_stats(user, curr_connects_x2=1)
|
update_stats(user, curr_connects_x2=1)
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
data = await rd.read(READ_BUF_SIZE)
|
data = await rd.read(READ_BUF_SIZE)
|
||||||
|
if isinstance(data, tuple):
|
||||||
|
data, extra = data
|
||||||
|
else:
|
||||||
|
extra = {}
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
wr.write_eof()
|
wr.write_eof()
|
||||||
await wr.drain()
|
await wr.drain()
|
||||||
@@ -723,7 +786,7 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
update_stats(user, octets=len(data))
|
update_stats(user, octets=len(data))
|
||||||
wr.write(data)
|
wr.write(data, extra)
|
||||||
await wr.drain()
|
await wr.drain()
|
||||||
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e:
|
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e:
|
||||||
# print_err(e)
|
# print_err(e)
|
||||||
|
|||||||
Reference in New Issue
Block a user