mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66ac871a74 | ||
|
|
c5344df0eb | ||
|
|
93ad268d48 | ||
|
|
1c29465b6e | ||
|
|
d41b4abf35 | ||
|
|
7f19b3f78d | ||
|
|
0549fd7200 | ||
|
|
fd75ca3cf9 | ||
|
|
522b0cfe75 | ||
|
|
4a4d449a34 | ||
|
|
8c15fc8fe0 | ||
|
|
e436792992 | ||
|
|
07759f67cb | ||
|
|
f525cc9611 | ||
|
|
c010543889 | ||
|
|
0a41479054 | ||
|
|
5f206361f2 | ||
|
|
6980bfd3be |
@@ -7,6 +7,8 @@ RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
|
||||
RUN chown -R tgproxy:tgproxy /home/tgproxy
|
||||
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.7
|
||||
|
||||
COPY mtprotoproxy.py config.py /home/tgproxy/
|
||||
|
||||
USER tgproxy
|
||||
|
||||
WORKDIR /home/tgproxy/
|
||||
|
||||
@@ -20,6 +20,11 @@ To advertise a channel get a tag from **@MTProxybot** and put it to *config.py*.
|
||||
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
|
||||
the VDS instance with 1 CPU core and 1024MB RAM.
|
||||
|
||||
## More Instructions ##
|
||||
|
||||
- [Running without Docker](https://github.com/alexbers/mtprotoproxy/wiki/Running-Without-Docker)
|
||||
- [Optimization and fine tuning](https://github.com/alexbers/mtprotoproxy/wiki/Optimization-and-Fine-Tuning)
|
||||
|
||||
## Advanced Usage ##
|
||||
|
||||
The proxy can be launched:
|
||||
|
||||
25
config.py
25
config.py
@@ -1,20 +1,25 @@
|
||||
PORT = 3256
|
||||
PORT = 443
|
||||
|
||||
# name -> secret (32 hex chars)
|
||||
USERS = {
|
||||
"tg": "00000000000000000000000000000000",
|
||||
"tg2": "0123456789abcdef0123456789abcdef"
|
||||
"tg": "00000000000000000000000000000001",
|
||||
# "tg2": "0123456789abcdef0123456789abcdef",
|
||||
}
|
||||
|
||||
# Makes the proxy harder to detect
|
||||
# Can be incompatible with very old clients
|
||||
SECURE_ONLY = True
|
||||
MODES = {
|
||||
# Classic mode, easy to detect
|
||||
"classic": False,
|
||||
|
||||
# Makes the proxy even more hard to detect
|
||||
# Compatible only with the recent clients
|
||||
TLS_ONLY = True
|
||||
# Makes the proxy harder to detect
|
||||
# Can be incompatible with very old clients
|
||||
"secure": False,
|
||||
|
||||
# The domain for TLS, bad clients are proxied there
|
||||
# Makes the proxy even more hard to detect
|
||||
# Can be incompatible with old clients
|
||||
"tls": True
|
||||
}
|
||||
|
||||
# The domain for TLS mode, bad clients are proxied there
|
||||
# Use random existing domain, proxy checks it on start
|
||||
# TLS_DOMAIN = "www.google.com"
|
||||
|
||||
|
||||
335
mtprotoproxy.py
335
mtprotoproxy.py
@@ -66,6 +66,8 @@ TLS_HANDSHAKE_LEN = 1 + 2 + 2 + 512
|
||||
PROTO_TAG_POS = 56
|
||||
DC_IDX_POS = 60
|
||||
|
||||
MIN_CERT_LEN = 1024
|
||||
|
||||
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
||||
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
||||
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
||||
@@ -80,12 +82,13 @@ STAT_DURATION_BUCKETS = [0.1, 0.5, 1, 2, 5, 15, 60, 300, 600, 1800, 2**31 - 1]
|
||||
|
||||
my_ip_info = {"ipv4": None, "ipv6": None}
|
||||
used_handshakes = collections.OrderedDict()
|
||||
client_ips = collections.OrderedDict()
|
||||
last_client_ips = {}
|
||||
disable_middle_proxy = False
|
||||
is_time_skewed = False
|
||||
fake_cert_len = random.randrange(1024, 4096)
|
||||
mask_host_cached_ip = None
|
||||
last_clients_with_time_skew = {}
|
||||
last_clients_with_first_pkt_error = collections.Counter()
|
||||
last_clients_with_same_handshake = collections.Counter()
|
||||
proxy_start_time = 0
|
||||
proxy_links = []
|
||||
@@ -107,12 +110,12 @@ def init_config():
|
||||
conf_dict["PORT"] = int(sys.argv[1])
|
||||
secrets = sys.argv[2].split(",")
|
||||
conf_dict["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
||||
conf_dict["MODES"] = {"classic": False, "secure": True, "tls": True}
|
||||
if len(sys.argv) > 3:
|
||||
conf_dict["AD_TAG"] = sys.argv[3]
|
||||
if len(sys.argv) > 4:
|
||||
conf_dict["TLS_DOMAIN"] = sys.argv[4]
|
||||
conf_dict["TLS_ONLY"] = True
|
||||
conf_dict["SECURE_ONLY"] = True
|
||||
conf_dict["MODES"] = {"classic": False, "secure": False, "tls": True}
|
||||
|
||||
conf_dict = {k: v for k, v in conf_dict.items() if k.isupper()}
|
||||
|
||||
@@ -131,11 +134,43 @@ def init_config():
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
conf_dict.setdefault("FAST_MODE", True)
|
||||
|
||||
# doesn't allow to connect in not-secure mode
|
||||
conf_dict.setdefault("SECURE_ONLY", False)
|
||||
# enables some working modes
|
||||
modes = conf_dict.get("MODES", {})
|
||||
|
||||
# allows to connect in tls mode only
|
||||
conf_dict.setdefault("TLS_ONLY", False)
|
||||
if "MODES" not in conf_dict:
|
||||
modes.setdefault("classic", True)
|
||||
modes.setdefault("secure", True)
|
||||
modes.setdefault("tls", True)
|
||||
else:
|
||||
modes.setdefault("classic", False)
|
||||
modes.setdefault("secure", False)
|
||||
modes.setdefault("tls", False)
|
||||
|
||||
legacy_warning = False
|
||||
if "SECURE_ONLY" in conf_dict:
|
||||
legacy_warning = True
|
||||
modes["classic"] = not bool(conf_dict["SECURE_ONLY"])
|
||||
|
||||
if "TLS_ONLY" in conf_dict:
|
||||
legacy_warning = True
|
||||
if conf_dict["TLS_ONLY"]:
|
||||
modes["classic"] = False
|
||||
modes["secure"] = False
|
||||
|
||||
if not modes["classic"] and not modes["secure"] and not modes["tls"]:
|
||||
print_err("No known modes enabled, enabling tls-only mode")
|
||||
modes["tls"] = True
|
||||
|
||||
if legacy_warning:
|
||||
print_err("Legacy options SECURE_ONLY or TLS_ONLY detected")
|
||||
print_err("Please use MODES in your config instead:")
|
||||
print_err("MODES = {")
|
||||
print_err(' "classic": %s,' % modes["classic"])
|
||||
print_err(' "secure": %s,' % modes["secure"])
|
||||
print_err(' "tls": %s' % modes["tls"])
|
||||
print_err("}")
|
||||
|
||||
conf_dict["MODES"] = modes
|
||||
|
||||
# accept incoming connections only with proxy protocol v1/v2, useful for nginx and haproxy
|
||||
conf_dict.setdefault("PROXY_PROTOCOL", False)
|
||||
@@ -178,8 +213,8 @@ def init_config():
|
||||
# length of used handshake randoms for active fingerprinting protection, zero to disable
|
||||
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
|
||||
|
||||
# block bad first packets to even more protect against replay-based fingerprinting
|
||||
conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", not conf_dict["TLS_ONLY"])
|
||||
# length of last client ip addresses for logging
|
||||
conf_dict.setdefault("CLIENT_IPS_LEN", 131072)
|
||||
|
||||
# delay in seconds between stats printing
|
||||
conf_dict.setdefault("STATS_PRINT_PERIOD", 600)
|
||||
@@ -236,6 +271,9 @@ def init_config():
|
||||
# export proxy link to prometheus
|
||||
conf_dict.setdefault("METRICS_EXPORT_LINKS", False)
|
||||
|
||||
# default prefix for metrics
|
||||
conf_dict.setdefault("METRICS_PREFIX", "mtprotoproxy_")
|
||||
|
||||
# allow access to config by attributes
|
||||
config = type("config", (dict,), conf_dict)(conf_dict)
|
||||
|
||||
@@ -431,6 +469,60 @@ class MyRandom(random.Random):
|
||||
myrandom = MyRandom()
|
||||
|
||||
|
||||
class TgConnectionPool:
|
||||
MAX_CONNS_IN_POOL = 64
|
||||
|
||||
def __init__(self):
|
||||
self.pools = {}
|
||||
|
||||
async def open_tg_connection(self, host, port, init_func=None):
|
||||
task = asyncio.open_connection(host, port, limit=get_to_clt_bufsize())
|
||||
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=config.TG_CONNECT_TIMEOUT)
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
||||
|
||||
if init_func:
|
||||
return await asyncio.wait_for(init_func(host, port, reader_tgt, writer_tgt),
|
||||
timeout=config.TG_CONNECT_TIMEOUT)
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
def register_host_port(self, host, port, init_func):
|
||||
if (host, port, init_func) not in self.pools:
|
||||
self.pools[(host, port, init_func)] = []
|
||||
|
||||
while len(self.pools[(host, port, init_func)]) < TgConnectionPool.MAX_CONNS_IN_POOL:
|
||||
connect_task = asyncio.ensure_future(self.open_tg_connection(host, port, init_func))
|
||||
self.pools[(host, port, init_func)].append(connect_task)
|
||||
|
||||
async def get_connection(self, host, port, init_func=None):
|
||||
self.register_host_port(host, port, init_func)
|
||||
|
||||
ret = None
|
||||
for task in self.pools[(host, port, init_func)][::]:
|
||||
if task.done():
|
||||
if task.exception():
|
||||
self.pools[(host, port, init_func)].remove(task)
|
||||
continue
|
||||
|
||||
reader, writer, *other = task.result()
|
||||
if writer.transport.is_closing():
|
||||
self.pools[(host, port, init_func)].remove(task)
|
||||
continue
|
||||
|
||||
if not ret:
|
||||
self.pools[(host, port, init_func)].remove(task)
|
||||
ret = (reader, writer, *other)
|
||||
|
||||
self.register_host_port(host, port, init_func)
|
||||
if ret:
|
||||
return ret
|
||||
return await self.open_tg_connection(host, port, init_func)
|
||||
|
||||
|
||||
tg_connection_pool = TgConnectionPool()
|
||||
|
||||
|
||||
class LayeredStreamReaderBase:
|
||||
__slots__ = ("upstream", )
|
||||
|
||||
@@ -972,6 +1064,8 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
|
||||
async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
global used_handshakes
|
||||
global client_ips
|
||||
global last_client_ips
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_same_handshake
|
||||
global fake_cert_len
|
||||
@@ -1044,6 +1138,13 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
used_handshakes.popitem(last=False)
|
||||
used_handshakes[digest[:DIGEST_HALFLEN]] = True
|
||||
|
||||
if config.CLIENT_IPS_LEN > 0:
|
||||
while len(client_ips) >= config.CLIENT_IPS_LEN:
|
||||
client_ips.popitem(last=False)
|
||||
if peer[0] not in client_ips:
|
||||
client_ips[peer[0]] = True
|
||||
last_client_ips[peer[0]] = True
|
||||
|
||||
reader = FakeTLSStreamReader(reader)
|
||||
writer = FakeTLSStreamWriter(writer)
|
||||
return reader, writer
|
||||
@@ -1108,6 +1209,8 @@ async def handle_proxy_protocol(reader, peer=None):
|
||||
|
||||
async def handle_handshake(reader, writer):
|
||||
global used_handshakes
|
||||
global client_ips
|
||||
global last_client_ips
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03"
|
||||
@@ -1143,7 +1246,7 @@ async def handle_handshake(reader, writer):
|
||||
reader, writer = tls_handshake_result
|
||||
handshake = await reader.readexactly(HANDSHAKE_LEN)
|
||||
else:
|
||||
if config.TLS_ONLY:
|
||||
if not config.MODES["classic"] and not config.MODES["secure"]:
|
||||
await handle_bad_client(reader, writer, handshake)
|
||||
return False
|
||||
handshake += await reader.readexactly(HANDSHAKE_LEN - len(handshake))
|
||||
@@ -1173,8 +1276,14 @@ async def handle_handshake(reader, writer):
|
||||
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
||||
continue
|
||||
|
||||
if config.SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
|
||||
continue
|
||||
if proto_tag == PROTO_TAG_SECURE:
|
||||
if is_tls_handshake and not config.MODES["tls"]:
|
||||
continue
|
||||
if not is_tls_handshake and not config.MODES["secure"]:
|
||||
continue
|
||||
else:
|
||||
if not config.MODES["classic"]:
|
||||
continue
|
||||
|
||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||
|
||||
@@ -1183,6 +1292,13 @@ async def handle_handshake(reader, writer):
|
||||
used_handshakes.popitem(last=False)
|
||||
used_handshakes[dec_prekey_and_iv] = True
|
||||
|
||||
if config.CLIENT_IPS_LEN > 0:
|
||||
while len(client_ips) >= config.CLIENT_IPS_LEN:
|
||||
client_ips.popitem(last=False)
|
||||
if peer[0] not in client_ips:
|
||||
client_ips[peer[0]] = True
|
||||
last_client_ips[peer[0]] = True
|
||||
|
||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
||||
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv, peer
|
||||
@@ -1191,21 +1307,6 @@ async def handle_handshake(reader, writer):
|
||||
return False
|
||||
|
||||
|
||||
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
|
||||
for attempt in range(max_attempts-1):
|
||||
try:
|
||||
task = asyncio.open_connection(addr, port, limit=limit)
|
||||
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||
return reader_tgt, writer_tgt
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
continue
|
||||
|
||||
# the last attempt
|
||||
task = asyncio.open_connection(addr, port, limit=limit)
|
||||
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
|
||||
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
||||
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
||||
@@ -1214,6 +1315,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"]
|
||||
|
||||
global my_ip_info
|
||||
global tg_connection_pool
|
||||
|
||||
dc_idx = abs(dc_idx) - 1
|
||||
|
||||
@@ -1227,18 +1329,17 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
dc = TG_DATACENTERS_V4[dc_idx]
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(
|
||||
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=config.TG_CONNECT_TIMEOUT)
|
||||
reader_tgt, writer_tgt = await tg_connection_pool.get_connection(dc, TG_DATACENTER_PORT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
except ConnectionAbortedError as E:
|
||||
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
|
||||
return False
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
||||
|
||||
while True:
|
||||
rnd = bytearray(myrandom.getrandbytes(HANDSHAKE_LEN))
|
||||
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
|
||||
@@ -1301,49 +1402,21 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
|
||||
return key, iv
|
||||
|
||||
|
||||
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
async def middleproxy_handshake(host, port, reader_tgt, writer_tgt):
|
||||
""" The most logic of middleproxy handshake, launched in pool """
|
||||
START_SEQ_NO = -2
|
||||
NONCE_LEN = 16
|
||||
|
||||
RPC_NONCE = b"\xaa\x87\xcb\x7a"
|
||||
RPC_HANDSHAKE = b"\xf5\xee\x82\x76"
|
||||
RPC_NONCE = b"\xaa\x87\xcb\x7a"
|
||||
# pass as consts to simplify code
|
||||
RPC_FLAGS = b"\x00\x00\x00\x00"
|
||||
CRYPTO_AES = b"\x01\x00\x00\x00"
|
||||
|
||||
RPC_NONCE_ANS_LEN = 32
|
||||
RPC_HANDSHAKE_ANS_LEN = 32
|
||||
|
||||
# pass as consts to simplify code
|
||||
RPC_FLAGS = b"\x00\x00\x00\x00"
|
||||
|
||||
global my_ip_info
|
||||
|
||||
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
|
||||
use_ipv6_clt = (":" in cl_ip)
|
||||
|
||||
if use_ipv6_tg:
|
||||
if dc_idx not in TG_MIDDLE_PROXIES_V6:
|
||||
return False
|
||||
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
|
||||
else:
|
||||
if dc_idx not in TG_MIDDLE_PROXIES_V4:
|
||||
return False
|
||||
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
|
||||
timeout=config.TG_CONNECT_TIMEOUT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
|
||||
return False
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
||||
|
||||
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
||||
|
||||
key_selector = PROXY_SECRET[:4]
|
||||
crypto_ts = int.to_bytes(int(time.time()) % (256**4), 4, "little")
|
||||
|
||||
@@ -1354,24 +1427,25 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
writer_tgt.write(msg)
|
||||
await writer_tgt.drain()
|
||||
|
||||
old_reader = reader_tgt
|
||||
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
||||
ans = await reader_tgt.read(get_to_clt_bufsize())
|
||||
|
||||
if len(ans) != RPC_NONCE_ANS_LEN:
|
||||
return False
|
||||
raise ConnectionAbortedError("bad rpc answer length")
|
||||
|
||||
rpc_type, rpc_key_selector, rpc_schema, rpc_crypto_ts, rpc_nonce = (
|
||||
ans[:4], ans[4:8], ans[8:12], ans[12:16], ans[16:32]
|
||||
)
|
||||
|
||||
if rpc_type != RPC_NONCE or rpc_key_selector != key_selector or rpc_schema != CRYPTO_AES:
|
||||
return False
|
||||
raise ConnectionAbortedError("bad rpc answer")
|
||||
|
||||
# get keys
|
||||
tg_ip, tg_port = writer_tgt.upstream.get_extra_info('peername')[:2]
|
||||
my_ip, my_port = writer_tgt.upstream.get_extra_info('sockname')[:2]
|
||||
|
||||
use_ipv6_tg = (":" in tg_ip)
|
||||
|
||||
if not use_ipv6_tg:
|
||||
if my_ip_info["ipv4"]:
|
||||
# prefer global ip settings to work behind NAT
|
||||
@@ -1422,11 +1496,42 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
|
||||
handshake_ans = await reader_tgt.read(1)
|
||||
if len(handshake_ans) != RPC_HANDSHAKE_ANS_LEN:
|
||||
return False
|
||||
raise ConnectionAbortedError("bad rpc handshake answer length")
|
||||
|
||||
handshake_type, handshake_flags, handshake_sender_pid, handshake_peer_pid = (
|
||||
handshake_ans[:4], handshake_ans[4:8], handshake_ans[8:20], handshake_ans[20:32])
|
||||
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
|
||||
raise ConnectionAbortedError("bad rpc handshake answer")
|
||||
|
||||
return reader_tgt, writer_tgt, my_ip, my_port
|
||||
|
||||
|
||||
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
global my_ip_info
|
||||
global tg_connection_pool
|
||||
|
||||
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
|
||||
|
||||
if use_ipv6_tg:
|
||||
if dc_idx not in TG_MIDDLE_PROXIES_V6:
|
||||
return False
|
||||
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
|
||||
else:
|
||||
if dc_idx not in TG_MIDDLE_PROXIES_V4:
|
||||
return False
|
||||
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
||||
|
||||
try:
|
||||
ret = await tg_connection_pool.get_connection(addr, port, middleproxy_handshake)
|
||||
reader_tgt, writer_tgt, my_ip, my_port = ret
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
|
||||
return False
|
||||
except ConnectionAbortedError as E:
|
||||
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
|
||||
return False
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
|
||||
return False
|
||||
|
||||
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)
|
||||
@@ -1497,9 +1602,7 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_if_first_pkt_bad=False):
|
||||
global last_clients_with_first_pkt_error
|
||||
is_first_pkt = True
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(rd_buf_size)
|
||||
@@ -1508,18 +1611,6 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
# protection against replay-based fingerprinting
|
||||
if is_first_pkt:
|
||||
is_first_pkt = False
|
||||
|
||||
ERR_PKT_DATA = b'l\xfe\xff\xff'
|
||||
if block_if_first_pkt_bad and data == ERR_PKT_DATA:
|
||||
last_clients_with_first_pkt_error[cl_ip] += 1
|
||||
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
@@ -1528,12 +1619,11 @@ async def handle_client(reader_clt, writer_clt):
|
||||
update_user_stats(user, octets=len(data), msgs=1)
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
|
||||
block_if_first_pkt_bad=config.BLOCK_IF_FIRST_PKT_BAD)
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize())
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
|
||||
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
||||
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
||||
@@ -1586,6 +1676,7 @@ def make_metrics_pkt(metrics):
|
||||
used_names = set()
|
||||
|
||||
for name, m_type, desc, val in metrics:
|
||||
name = config.METRICS_PREFIX + name
|
||||
if name not in used_names:
|
||||
pkt_body_list.append("# HELP %s %s" % (name, desc))
|
||||
pkt_body_list.append("# TYPE %s %s" % (name, m_type))
|
||||
@@ -1622,7 +1713,6 @@ async def handle_metrics(reader, writer):
|
||||
global proxy_start_time
|
||||
global proxy_links
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_first_pkt_error
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
client_ip = writer.get_extra_info("peername")[0]
|
||||
@@ -1680,8 +1770,8 @@ async def handle_metrics(reader, writer):
|
||||
|
||||
async def stats_printer():
|
||||
global user_stats
|
||||
global last_client_ips
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_first_pkt_error
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
while True:
|
||||
@@ -1694,18 +1784,19 @@ async def stats_printer():
|
||||
stat["octets"] / 1000000, stat["msgs"]))
|
||||
print(flush=True)
|
||||
|
||||
if last_client_ips:
|
||||
print("New IPs:")
|
||||
for ip in last_client_ips:
|
||||
print(ip)
|
||||
print(flush=True)
|
||||
last_client_ips.clear()
|
||||
|
||||
if last_clients_with_time_skew:
|
||||
print("Clients with time skew (possible replay-attackers):")
|
||||
for ip, skew_minutes in last_clients_with_time_skew.items():
|
||||
print("%s, clocks were %d minutes behind" % (ip, skew_minutes))
|
||||
print(flush=True)
|
||||
last_clients_with_time_skew.clear()
|
||||
if last_clients_with_first_pkt_error:
|
||||
print("Clients with error on the first packet (possible replay-attackers):")
|
||||
for ip, times in last_clients_with_first_pkt_error.items():
|
||||
print("%s, %d times" % (ip, times))
|
||||
print(flush=True)
|
||||
last_clients_with_first_pkt_error.clear()
|
||||
if last_clients_with_same_handshake:
|
||||
print("Clients with duplicate handshake (likely replay-attackers):")
|
||||
for ip, times in last_clients_with_same_handshake.items():
|
||||
@@ -1803,7 +1894,11 @@ async def get_mask_host_cert_len():
|
||||
task = get_encrypted_cert(config.MASK_HOST, config.MASK_PORT, config.TLS_DOMAIN)
|
||||
cert = await asyncio.wait_for(task, timeout=GET_CERT_TIMEOUT)
|
||||
if cert:
|
||||
if len(cert) != fake_cert_len:
|
||||
if len(cert) < MIN_CERT_LEN:
|
||||
msg = ("The MASK_HOST %s returned several TLS records, this is not supported" %
|
||||
config.MASK_HOST)
|
||||
print_err(msg)
|
||||
elif len(cert) != fake_cert_len:
|
||||
fake_cert_len = len(cert)
|
||||
print_err("Got cert from the MASK_HOST %s, its length is %d" %
|
||||
(config.MASK_HOST, fake_cert_len))
|
||||
@@ -1965,7 +2060,7 @@ def print_tg_info():
|
||||
|
||||
if config.PORT == 3256:
|
||||
print("The default port 3256 is used, this is not recommended", flush=True)
|
||||
if config.TLS_ONLY:
|
||||
if not config.MODES["classic"] and not config.MODES["secure"]:
|
||||
print("Since you have TLS only mode enabled the best port is 443", flush=True)
|
||||
print_default_warning = True
|
||||
|
||||
@@ -1977,31 +2072,33 @@ def print_tg_info():
|
||||
|
||||
for user, secret in sorted(config.USERS.items(), key=lambda x: x[0]):
|
||||
for ip in ip_addrs:
|
||||
if not config.TLS_ONLY:
|
||||
if not config.SECURE_ONLY:
|
||||
params = {"server": ip, "port": config.PORT, "secret": secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": classic_link})
|
||||
print("{}: {}".format(user, classic_link), flush=True)
|
||||
if config.MODES["classic"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": classic_link})
|
||||
print("{}: {}".format(user, classic_link), flush=True)
|
||||
|
||||
if config.MODES["secure"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
dd_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": dd_link})
|
||||
print("{}: {}".format(user, dd_link), flush=True)
|
||||
|
||||
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
|
||||
# the base64 links is buggy on ios
|
||||
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
|
||||
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
||||
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": tls_link})
|
||||
print("{}: {}".format(user, tls_link), flush=True)
|
||||
if config.MODES["tls"]:
|
||||
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
|
||||
# the base64 links is buggy on ios
|
||||
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
|
||||
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
||||
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": tls_link})
|
||||
print("{}: {}".format(user, tls_link), flush=True)
|
||||
|
||||
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef"]:
|
||||
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef",
|
||||
"00000000000000000000000000000001"]:
|
||||
msg = "The default secret {} is used, this is not recommended".format(secret)
|
||||
print(msg, flush=True)
|
||||
random_secret = "".join(myrandom.choice("0123456789abcdef") for i in range(32))
|
||||
@@ -2133,29 +2230,29 @@ def main():
|
||||
|
||||
if config.LISTEN_ADDR_IPV4:
|
||||
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
|
||||
if config.LISTEN_ADDR_IPV6 and socket.has_ipv6:
|
||||
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV6, config.PORT,
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
|
||||
if config.LISTEN_UNIX_SOCK and has_unix:
|
||||
remove_unix_socket(config.LISTEN_UNIX_SOCK)
|
||||
task = asyncio.start_unix_server(handle_client_wrapper, config.LISTEN_UNIX_SOCK,
|
||||
limit=get_to_tg_bufsize(), loop=loop)
|
||||
limit=get_to_tg_bufsize())
|
||||
servers.append(loop.run_until_complete(task))
|
||||
os.chmod(config.LISTEN_UNIX_SOCK, 0o666)
|
||||
|
||||
if config.METRICS_PORT is not None:
|
||||
if config.METRICS_LISTEN_ADDR_IPV4:
|
||||
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4,
|
||||
config.METRICS_PORT, loop=loop)
|
||||
config.METRICS_PORT)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6:
|
||||
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6,
|
||||
config.METRICS_PORT, loop=loop)
|
||||
config.METRICS_PORT)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user