mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-22 18:45:49 +00:00
merge using intermediate protocol for middle proxy
This commit is contained in:
@@ -314,16 +314,12 @@ class MTProtoCompactFrameStreamReader(LayeredStreamReaderBase):
|
|||||||
|
|
||||||
|
|
||||||
class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
||||||
def __init__(self, upstream, seq_no=0):
|
|
||||||
self.upstream = upstream
|
|
||||||
self.seq_no = seq_no
|
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data):
|
||||||
SMALL_PKT_BORDER = 0x7f
|
SMALL_PKT_BORDER = 0x7f
|
||||||
LARGE_PKT_BORGER = 256 ** 3
|
LARGE_PKT_BORGER = 256 ** 3
|
||||||
|
|
||||||
if len(data) % 4 != 0:
|
if len(data) % 4 != 0:
|
||||||
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len %d" % len(data))
|
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len", len(data))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
len_div_four = len(data) // 4
|
len_div_four = len(data) // 4
|
||||||
@@ -338,6 +334,24 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
||||||
|
async def read(self, buf_size):
|
||||||
|
msg_len_bytes = await self.upstream.readexactly(4)
|
||||||
|
msg_len = int.from_bytes(msg_len_bytes, "little")
|
||||||
|
|
||||||
|
if msg_len > 0x80000000:
|
||||||
|
msg_len -= 0x80000000
|
||||||
|
|
||||||
|
data = await self.upstream.readexactly(msg_len)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
||||||
|
def write(self, data):
|
||||||
|
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
|
||||||
|
|
||||||
|
|
||||||
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||||
async def read(self, msg):
|
async def read(self, msg):
|
||||||
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
||||||
@@ -360,7 +374,7 @@ class ProxyReqStreamReader(LayeredStreamReaderBase):
|
|||||||
|
|
||||||
|
|
||||||
class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||||
def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port):
|
def __init__(self, upstream, cl_ip, cl_port, my_ip, my_port, proto_tag):
|
||||||
self.upstream = upstream
|
self.upstream = upstream
|
||||||
|
|
||||||
if ":" not in cl_ip:
|
if ":" not in cl_ip:
|
||||||
@@ -378,6 +392,13 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
|||||||
self.our_ip_port += int.to_bytes(my_port, 4, "little")
|
self.our_ip_port += int.to_bytes(my_port, 4, "little")
|
||||||
self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)])
|
self.out_conn_id = bytearray([random.randrange(0, 256) for i in range(8)])
|
||||||
|
|
||||||
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
||||||
|
self.last_flag_byte = b"\x40"
|
||||||
|
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
|
self.last_flag_byte = b"\x20"
|
||||||
|
else:
|
||||||
|
self.last_flag_byte = b"\x00"
|
||||||
|
|
||||||
def write(self, msg):
|
def write(self, msg):
|
||||||
RPC_PROXY_REQ = b"\xee\xf1\xce\x36"
|
RPC_PROXY_REQ = b"\xee\xf1\xce\x36"
|
||||||
EXTRA_SIZE = b"\x18\x00\x00\x00"
|
EXTRA_SIZE = b"\x18\x00\x00\x00"
|
||||||
@@ -389,12 +410,12 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
if msg.startswith(b"\x00" * 8):
|
if msg.startswith(b"\x00" * 8):
|
||||||
FLAGS = b"\x0a\x10\x02\x40"
|
flags = b"\x0a\x10\x02" + self.last_flag_byte
|
||||||
else:
|
else:
|
||||||
FLAGS = b"\x08\x10\x02\x40"
|
flags = b"\x08\x10\x02" + self.last_flag_byte
|
||||||
|
|
||||||
full_msg = bytearray()
|
full_msg = bytearray()
|
||||||
full_msg += RPC_PROXY_REQ + FLAGS + self.out_conn_id
|
full_msg += RPC_PROXY_REQ + flags + self.out_conn_id
|
||||||
full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG
|
full_msg += self.remote_ip_port + self.our_ip_port + EXTRA_SIZE + PROXY_TAG
|
||||||
full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER
|
full_msg += bytes([len(AD_TAG)]) + AD_TAG + FOUR_BYTES_ALIGNER
|
||||||
full_msg += msg
|
full_msg += msg
|
||||||
@@ -537,7 +558,7 @@ def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
|
|||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||||
|
|
||||||
|
|
||||||
async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||||
START_SEQ_NO = -2
|
START_SEQ_NO = -2
|
||||||
NONCE_LEN = 16
|
NONCE_LEN = 16
|
||||||
|
|
||||||
@@ -662,7 +683,7 @@ async def do_middleproxy_handshake(dc_idx, cl_ip, cl_port):
|
|||||||
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port)
|
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)
|
||||||
reader_tgt = ProxyReqStreamReader(reader_tgt)
|
reader_tgt = ProxyReqStreamReader(reader_tgt)
|
||||||
|
|
||||||
return reader_tgt, writer_tgt
|
return reader_tgt, writer_tgt
|
||||||
@@ -688,7 +709,7 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
tg_data = await do_direct_handshake(proto_tag, dc_idx)
|
tg_data = await do_direct_handshake(proto_tag, dc_idx)
|
||||||
else:
|
else:
|
||||||
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
|
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
|
||||||
tg_data = await do_middleproxy_handshake(dc_idx, cl_ip, cl_port)
|
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
||||||
|
|
||||||
if not tg_data:
|
if not tg_data:
|
||||||
writer_clt.transport.abort()
|
writer_clt.transport.abort()
|
||||||
@@ -709,8 +730,14 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
writer_clt.encryptor = FakeEncryptor()
|
writer_clt.encryptor = FakeEncryptor()
|
||||||
|
|
||||||
if USE_MIDDLE_PROXY:
|
if USE_MIDDLE_PROXY:
|
||||||
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
||||||
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
||||||
|
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
||||||
|
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
|
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
||||||
|
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
async def connect_reader_to_writer(rd, wr, user):
|
async def connect_reader_to_writer(rd, wr, user):
|
||||||
update_stats(user, curr_connects_x2=1)
|
update_stats(user, curr_connects_x2=1)
|
||||||
|
|||||||
Reference in New Issue
Block a user