mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-18 16:45:50 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b082d06f9b | ||
|
|
5187725088 | ||
|
|
dd1d0a6262 | ||
|
|
d5daf8bbdf | ||
|
|
780dbc5866 | ||
|
|
298614b1f6 | ||
|
|
f5c30c6115 | ||
|
|
e4473d6374 | ||
|
|
c2278501bf | ||
|
|
534b26cd04 | ||
|
|
c1bef68602 | ||
|
|
8e79dacd26 | ||
|
|
520a26aa89 | ||
|
|
647b6f6edd | ||
|
|
c2ad0de665 |
@@ -1,4 +1,4 @@
|
|||||||
FROM alpine:3.6
|
FROM alpine:3.8
|
||||||
|
|
||||||
RUN adduser tgproxy -u 10000 -D
|
RUN adduser tgproxy -u 10000 -D
|
||||||
|
|
||||||
|
|||||||
@@ -8,3 +8,7 @@ USERS = {
|
|||||||
|
|
||||||
# Tag for advertising, obtainable from @MTProxybot
|
# Tag for advertising, obtainable from @MTProxybot
|
||||||
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
|
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
|
||||||
|
|
||||||
|
# Uncommenting this do make a proxy harder to detect
|
||||||
|
# But it can be incompatible with old clients
|
||||||
|
# SECURE_ONLY = True
|
||||||
|
|||||||
120
mtprotoproxy.py
120
mtprotoproxy.py
@@ -144,16 +144,32 @@ USERS = config["USERS"]
|
|||||||
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||||
|
|
||||||
# load advanced settings
|
# load advanced settings
|
||||||
|
# if IPv6 avaliable, use it by default
|
||||||
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||||
# disables tg->client trafic reencryption, faster but less secure
|
# disables tg->client trafic reencryption, faster but less secure
|
||||||
FAST_MODE = config.get("FAST_MODE", True)
|
FAST_MODE = config.get("FAST_MODE", True)
|
||||||
|
# doesn't allow to connect in not-secure mode
|
||||||
|
SECURE_ONLY = config.get("SECURE_ONLY", False)
|
||||||
|
# delay in seconds between stats printing
|
||||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||||
|
# delay in seconds between middle proxy info updates
|
||||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
|
||||||
|
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
|
||||||
|
# max socket buffer size to the telegram servers direction
|
||||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||||
|
# keepalive period for clients in secs
|
||||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||||
|
# drop client after this timeout if the handshake fail
|
||||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||||
|
# if client doesn't confirm data for this number of seconds, it is dropped
|
||||||
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||||
|
# telegram servers connect timeout in seconds
|
||||||
|
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
|
||||||
|
# listen address for IPv4
|
||||||
|
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
|
||||||
|
# listen address for IPv6
|
||||||
|
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
|
||||||
|
|
||||||
TG_DATACENTER_PORT = 443
|
TG_DATACENTER_PORT = 443
|
||||||
|
|
||||||
@@ -430,11 +446,6 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
|||||||
msg_len -= 0x80000000
|
msg_len -= 0x80000000
|
||||||
|
|
||||||
data = await self.upstream.readexactly(msg_len)
|
data = await self.upstream.readexactly(msg_len)
|
||||||
|
|
||||||
if msg_len % 4 != 0:
|
|
||||||
cut_border = msg_len - (msg_len % 4)
|
|
||||||
data = data[:cut_border]
|
|
||||||
|
|
||||||
return data, extra
|
return data, extra
|
||||||
|
|
||||||
|
|
||||||
@@ -446,6 +457,38 @@ class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
|||||||
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
|
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
||||||
|
async def read(self, buf_size):
|
||||||
|
msg_len_bytes = await self.upstream.readexactly(4)
|
||||||
|
msg_len = int.from_bytes(msg_len_bytes, "little")
|
||||||
|
|
||||||
|
extra = {}
|
||||||
|
if msg_len > 0x80000000:
|
||||||
|
extra["QUICKACK_FLAG"] = True
|
||||||
|
msg_len -= 0x80000000
|
||||||
|
|
||||||
|
data = await self.upstream.readexactly(msg_len)
|
||||||
|
|
||||||
|
if msg_len % 4 != 0:
|
||||||
|
cut_border = msg_len - (msg_len % 4)
|
||||||
|
data = data[:cut_border]
|
||||||
|
|
||||||
|
return data, extra
|
||||||
|
|
||||||
|
|
||||||
|
class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
||||||
|
def write(self, data, extra={}):
|
||||||
|
MAX_PADDING_LEN = 4
|
||||||
|
if extra.get("SIMPLE_ACK"):
|
||||||
|
# TODO: make this unpredictable
|
||||||
|
return self.upstream.write(data)
|
||||||
|
else:
|
||||||
|
padding_len = random.randrange(MAX_PADDING_LEN)
|
||||||
|
padding = bytearray([random.randrange(256) for i in range(padding_len)])
|
||||||
|
padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little')
|
||||||
|
return self.upstream.write(padded_data_len_bytes + data + padding)
|
||||||
|
|
||||||
|
|
||||||
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||||
async def read(self, msg):
|
async def read(self, msg):
|
||||||
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
||||||
@@ -504,6 +547,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
|||||||
FLAG_HAS_AD_TAG = 0x8
|
FLAG_HAS_AD_TAG = 0x8
|
||||||
FLAG_MAGIC = 0x1000
|
FLAG_MAGIC = 0x1000
|
||||||
FLAG_EXTMODE2 = 0x20000
|
FLAG_EXTMODE2 = 0x20000
|
||||||
|
FLAG_PAD = 0x8000000
|
||||||
FLAG_INTERMEDIATE = 0x20000000
|
FLAG_INTERMEDIATE = 0x20000000
|
||||||
FLAG_ABRIDGED = 0x40000000
|
FLAG_ABRIDGED = 0x40000000
|
||||||
FLAG_QUICKACK = 0x80000000
|
FLAG_QUICKACK = 0x80000000
|
||||||
@@ -518,6 +562,8 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
|||||||
flags |= FLAG_ABRIDGED
|
flags |= FLAG_ABRIDGED
|
||||||
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
|
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
flags |= FLAG_INTERMEDIATE
|
flags |= FLAG_INTERMEDIATE
|
||||||
|
elif self.proto_tag == PROTO_TAG_SECURE:
|
||||||
|
flags |= FLAG_INTERMEDIATE | FLAG_PAD
|
||||||
|
|
||||||
if extra.get("QUICKACK_FLAG"):
|
if extra.get("QUICKACK_FLAG"):
|
||||||
flags |= FLAG_QUICKACK
|
flags |= FLAG_QUICKACK
|
||||||
@@ -557,6 +603,9 @@ async def handle_handshake(reader, writer):
|
|||||||
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
|
||||||
|
continue
|
||||||
|
|
||||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||||
|
|
||||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||||
@@ -591,6 +640,21 @@ def set_bufsizes(sock, recv_buf, send_buf):
|
|||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||||
|
|
||||||
|
|
||||||
|
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
|
||||||
|
for attempt in range(max_attempts-1):
|
||||||
|
try:
|
||||||
|
task = asyncio.open_connection(addr, port, limit=limit)
|
||||||
|
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||||
|
return reader_tgt, writer_tgt
|
||||||
|
except (OSError, asyncio.TimeoutError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# the last attempt
|
||||||
|
task = asyncio.open_connection(addr, port, limit=limit)
|
||||||
|
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||||
|
return reader_tgt, writer_tgt
|
||||||
|
|
||||||
|
|
||||||
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||||
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
||||||
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
||||||
@@ -609,18 +673,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|||||||
dc = TG_DATACENTERS_V4[dc_idx]
|
dc = TG_DATACENTERS_V4[dc_idx]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
|
reader_tgt, writer_tgt = await open_connection_tryer(
|
||||||
limit=TO_CLT_BUFSIZE)
|
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
|
||||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
||||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
|
||||||
|
|
||||||
except ConnectionRefusedError as E:
|
except ConnectionRefusedError as E:
|
||||||
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
||||||
return False
|
return False
|
||||||
except OSError as E:
|
except (OSError, asyncio.TimeoutError) as E:
|
||||||
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||||
|
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
|
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
|
||||||
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
|
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
|
||||||
@@ -710,16 +774,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|||||||
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE)
|
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
|
||||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
timeout=TG_CONNECT_TIMEOUT)
|
||||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
|
||||||
except ConnectionRefusedError as E:
|
except ConnectionRefusedError as E:
|
||||||
print_err("Got connection refused while trying to connect to", addr, port)
|
print_err("Got connection refused while trying to connect to", addr, port)
|
||||||
return False
|
return False
|
||||||
except OSError as E:
|
except (OSError, asyncio.TimeoutError) as E:
|
||||||
print_err("Unable to connect to", addr, port)
|
print_err("Unable to connect to", addr, port)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||||
|
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||||
|
|
||||||
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
||||||
|
|
||||||
key_selector = PROXY_SECRET[:4]
|
key_selector = PROXY_SECRET[:4]
|
||||||
@@ -862,9 +928,12 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
if proto_tag == PROTO_TAG_ABRIDGED:
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
||||||
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
||||||
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
||||||
elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||||
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
||||||
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
||||||
|
elif proto_tag == PROTO_TAG_SECURE:
|
||||||
|
reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt)
|
||||||
|
writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt)
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -1012,7 +1081,7 @@ def init_ip_info():
|
|||||||
TIMEOUT = 5
|
TIMEOUT = 5
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f:
|
with urllib.request.urlopen('http://ipv4.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||||
if f.status != 200:
|
if f.status != 200:
|
||||||
raise Exception("Invalid status code")
|
raise Exception("Invalid status code")
|
||||||
my_ip_info["ipv4"] = f.read().decode().strip()
|
my_ip_info["ipv4"] = f.read().decode().strip()
|
||||||
@@ -1021,7 +1090,7 @@ def init_ip_info():
|
|||||||
|
|
||||||
if PREFER_IPV6:
|
if PREFER_IPV6:
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f:
|
with urllib.request.urlopen('http://ipv6.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||||
if f.status != 200:
|
if f.status != 200:
|
||||||
raise Exception("Invalid status code")
|
raise Exception("Invalid status code")
|
||||||
my_ip_info["ipv6"] = f.read().decode().strip()
|
my_ip_info["ipv6"] = f.read().decode().strip()
|
||||||
@@ -1046,13 +1115,14 @@ def print_tg_info():
|
|||||||
|
|
||||||
for user, secret in sorted(USERS.items(), key=lambda x: x[0]):
|
for user, secret in sorted(USERS.items(), key=lambda x: x[0]):
|
||||||
for ip in ip_addrs:
|
for ip in ip_addrs:
|
||||||
params = {"server": ip, "port": PORT, "secret": secret}
|
if not SECURE_ONLY:
|
||||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
params = {"server": ip, "port": PORT, "secret": secret}
|
||||||
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||||
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
||||||
|
|
||||||
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
|
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
|
||||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||||
print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True)
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
||||||
|
|
||||||
|
|
||||||
def loop_exception_handler(loop, context):
|
def loop_exception_handler(loop, context):
|
||||||
@@ -1102,12 +1172,12 @@ def main():
|
|||||||
|
|
||||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||||
|
|
||||||
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
|
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
|
||||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||||
server_v4 = loop.run_until_complete(task_v4)
|
server_v4 = loop.run_until_complete(task_v4)
|
||||||
|
|
||||||
if socket.has_ipv6:
|
if socket.has_ipv6:
|
||||||
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
|
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
|
||||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||||
server_v6 = loop.run_until_complete(task_v6)
|
server_v6 = loop.run_until_complete(task_v6)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user