36 Commits

Author SHA1 Message Date
Alexander Bersenev
37d570f8dc handle unknown ips, e.g. from unix sockets 2020-03-05 13:55:10 +05:00
Alexander Bersenev
8f48e9ef65 fix the missing constant 2020-02-27 19:17:15 +05:00
Alexander Bersenev
76bc2253eb small code style fixes 2020-02-25 20:41:08 +05:00
Alexander Bersenev
07bd9b795a handle bad secrets in configs 2020-02-25 02:41:49 +05:00
Alexander Bersenev
1cad031947 make dataflow functions top level functions for better speed and memory consumption 2020-02-23 03:03:15 +05:00
Alexander Bersenev
923dac842b handle broken pipe error 2020-02-17 12:18:13 +05:00
Alexander Bersenev
1a63fdae11 add an option to ignore time skew 2020-02-15 17:12:15 +05:00
Alexander Bersenev
c7b6dcf3c2 save the utilitary task as a variable to prevent early garbage collecting 2020-02-14 18:58:25 +05:00
Alexander Bersenev
a95b1ec3c1 fix typo 2020-02-13 18:21:09 +05:00
Alexander Bersenev
49bc5d1f3b get rid of "socket.send() raised exception" messages 2020-02-13 18:14:37 +05:00
Alexander Bersenev
c2414c3487 simplify dockerfile 2020-02-13 15:08:01 +05:00
Alexander Bersenev
8b26cc843d catch IncompleteReadError while handling a bad client 2020-02-13 14:22:32 +05:00
Alexander Bersenev
639dea5e8d use debian image by default 2020-02-13 04:13:49 +05:00
Alexander Bersenev
c48cacce83 add statisctics about up/down traffic 2020-02-12 16:28:18 +05:00
Alexander Bersenev
2bb0ef0b1f simplify initialization and stats 2020-02-12 15:41:05 +05:00
Alexander Bersenev
f5ee5db86f use asyncio.all_tasks on new pythons 2020-02-11 19:10:43 +05:00
Peter Dave Hello
9c50cab94e Fix file permission in Docker image (#189)
`chown` needs to be done "after" the file copy, otherwise there is no
meaning to do it as /home/tgproxy is default owned by tgproxy already.
2020-02-11 13:50:48 +02:00
Boris Klimenko
199eaeb7c4 Alpine 3.11, Python 3.8 (#185) 2020-01-22 20:04:29 +02:00
Alexander Bersenev
66ac871a74 remove the hackish logic against replay attacks because it stopped to work with new android clients
see https://github.com/alexbers/mtprotoproxy/issues/183
2020-01-16 19:05:18 +05:00
Alexander Bersenev
c5344df0eb Add wiki links to readme 2019-12-14 14:55:10 +05:00
Alexander Bersenev
93ad268d48 add ip logging 2019-11-29 17:30:10 +05:00
Alexander Bersenev
1c29465b6e change comment in config 2019-11-22 02:11:41 +05:00
Alexander Bersenev
d41b4abf35 MODES option instead of SECURE_ONLY and TLS_ONLY 2019-11-22 02:05:05 +05:00
Alexander Bersenev
7f19b3f78d more secure defaul settings 2019-11-19 05:22:49 +05:00
Alexander Bersenev
0549fd7200 increase the connection pool size 2019-11-19 05:00:53 +05:00
Alexander Bersenev
fd75ca3cf9 remove loop argument from create server for compatibility with Python 3.8 2019-11-14 02:43:10 +05:00
Alexander Bersenev
522b0cfe75 move more logic to pooled connection 2019-11-13 02:56:05 +05:00
Alexander Bersenev
4a4d449a34 init pooled connections to save one more round trip time 2019-11-13 02:31:51 +05:00
Alexander Bersenev
8c15fc8fe0 use socket from pool in the creation order 2019-11-12 04:31:47 +05:00
Alexander Bersenev
e436792992 introduce connection pool to reduce pings 2019-11-12 03:53:10 +05:00
K900
07759f67cb Fix running on Python 3.8 (#161)
3.8 removes the asyncio.streams re-export of `IncompleteReadError`, so just access it directly from `asyncio`, like everywhere else in the code already does.
2019-11-11 16:04:14 +05:00
Alexander Bersenev
f525cc9611 more strict validation of tls domain 2019-10-30 22:27:46 +05:00
dasmfm
c010543889 Prefix for Prometheus metrics (#151)
Added default prefix for Prometheus metrics.
2019-10-09 14:35:02 +05:00
Alexander Bersenev
0a41479054 add copy instruction to docker-compose 2019-10-09 01:56:48 +05:00
Alexander Bersenev
5f206361f2 revert the last commit 2019-10-09 01:56:06 +05:00
Alexander Bersenev
6980bfd3be add copy instruction to docker-compose 2019-10-09 01:52:09 +05:00
4 changed files with 415 additions and 233 deletions

View File

@@ -1,11 +1,14 @@
FROM alpine:3.10
FROM python:3.8-slim-buster
RUN adduser tgproxy -u 10000 -D
RUN apt-get update && apt-get install -y libcap2-bin && rm -rf /var/lib/apt/lists/*
RUN setcap cap_net_bind_service=+ep /usr/local/bin/python3.8
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
RUN pip3 --no-cache-dir install cryptography uvloop
COPY mtprotoproxy.py config.py /home/tgproxy/
RUN useradd tgproxy -u 10000
RUN chown -R tgproxy:tgproxy /home/tgproxy
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.7
USER tgproxy

View File

@@ -20,6 +20,11 @@ To advertise a channel get a tag from **@MTProxybot** and put it to *config.py*.
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
the VDS instance with 1 CPU core and 1024MB RAM.
## More Instructions ##
- [Running without Docker](https://github.com/alexbers/mtprotoproxy/wiki/Running-Without-Docker)
- [Optimization and fine tuning](https://github.com/alexbers/mtprotoproxy/wiki/Optimization-and-Fine-Tuning)
## Advanced Usage ##
The proxy can be launched:

View File

@@ -1,20 +1,25 @@
PORT = 3256
PORT = 443
# name -> secret (32 hex chars)
USERS = {
"tg": "00000000000000000000000000000000",
"tg2": "0123456789abcdef0123456789abcdef"
"tg": "00000000000000000000000000000001",
# "tg2": "0123456789abcdef0123456789abcdef",
}
# Makes the proxy harder to detect
# Can be incompatible with very old clients
SECURE_ONLY = True
MODES = {
# Classic mode, easy to detect
"classic": False,
# Makes the proxy even more hard to detect
# Compatible only with the recent clients
TLS_ONLY = True
# Makes the proxy harder to detect
# Can be incompatible with very old clients
"secure": False,
# The domain for TLS, bad clients are proxied there
# Makes the proxy even more hard to detect
# Can be incompatible with old clients
"tls": True
}
# The domain for TLS mode, bad clients are proxied there
# Use random existing domain, proxy checks it on start
# TLS_DOMAIN = "www.google.com"

View File

@@ -66,6 +66,8 @@ TLS_HANDSHAKE_LEN = 1 + 2 + 2 + 512
PROTO_TAG_POS = 56
DC_IDX_POS = 60
MIN_CERT_LEN = 1024
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
@@ -80,16 +82,20 @@ STAT_DURATION_BUCKETS = [0.1, 0.5, 1, 2, 5, 15, 60, 300, 600, 1800, 2**31 - 1]
my_ip_info = {"ipv4": None, "ipv6": None}
used_handshakes = collections.OrderedDict()
client_ips = collections.OrderedDict()
last_client_ips = {}
disable_middle_proxy = False
is_time_skewed = False
fake_cert_len = random.randrange(1024, 4096)
mask_host_cached_ip = None
last_clients_with_time_skew = {}
last_clients_with_first_pkt_error = collections.Counter()
last_clients_with_same_handshake = collections.Counter()
proxy_start_time = 0
proxy_links = []
stats = collections.Counter()
user_stats = collections.defaultdict(collections.Counter)
config = {}
@@ -107,12 +113,12 @@ def init_config():
conf_dict["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
conf_dict["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
conf_dict["MODES"] = {"classic": False, "secure": True, "tls": True}
if len(sys.argv) > 3:
conf_dict["AD_TAG"] = sys.argv[3]
if len(sys.argv) > 4:
conf_dict["TLS_DOMAIN"] = sys.argv[4]
conf_dict["TLS_ONLY"] = True
conf_dict["SECURE_ONLY"] = True
conf_dict["MODES"] = {"classic": False, "secure": False, "tls": True}
conf_dict = {k: v for k, v in conf_dict.items() if k.isupper()}
@@ -120,6 +126,15 @@ def init_config():
conf_dict.setdefault("USERS", {"tg": "00000000000000000000000000000000"})
conf_dict["AD_TAG"] = bytes.fromhex(conf_dict.get("AD_TAG", ""))
for user, secret in conf_dict["USERS"].items():
if not re.fullmatch("[0-9a-fA-F]{32}", secret):
fixed_secret = re.sub(r"[^0-9a-fA-F]", "", secret).zfill(32)[:32]
print_err("Bad secret for user %s, should be 32 hex chars, got %s. " % (user, secret))
print_err("Changing it to %s" % fixed_secret)
conf_dict["USERS"][user] = fixed_secret
# load advanced settings
# use middle proxy, necessary to show ad
@@ -131,11 +146,43 @@ def init_config():
# disables tg->client trafic reencryption, faster but less secure
conf_dict.setdefault("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
conf_dict.setdefault("SECURE_ONLY", False)
# enables some working modes
modes = conf_dict.get("MODES", {})
# allows to connect in tls mode only
conf_dict.setdefault("TLS_ONLY", False)
if "MODES" not in conf_dict:
modes.setdefault("classic", True)
modes.setdefault("secure", True)
modes.setdefault("tls", True)
else:
modes.setdefault("classic", False)
modes.setdefault("secure", False)
modes.setdefault("tls", False)
legacy_warning = False
if "SECURE_ONLY" in conf_dict:
legacy_warning = True
modes["classic"] = not bool(conf_dict["SECURE_ONLY"])
if "TLS_ONLY" in conf_dict:
legacy_warning = True
if conf_dict["TLS_ONLY"]:
modes["classic"] = False
modes["secure"] = False
if not modes["classic"] and not modes["secure"] and not modes["tls"]:
print_err("No known modes enabled, enabling tls-only mode")
modes["tls"] = True
if legacy_warning:
print_err("Legacy options SECURE_ONLY or TLS_ONLY detected")
print_err("Please use MODES in your config instead:")
print_err("MODES = {")
print_err(' "classic": %s,' % modes["classic"])
print_err(' "secure": %s,' % modes["secure"])
print_err(' "tls": %s' % modes["tls"])
print_err("}")
conf_dict["MODES"] = modes
# accept incoming connections only with proxy protocol v1/v2, useful for nginx and haproxy
conf_dict.setdefault("PROXY_PROTOCOL", False)
@@ -178,8 +225,11 @@ def init_config():
# length of used handshake randoms for active fingerprinting protection, zero to disable
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
# block bad first packets to even more protect against replay-based fingerprinting
conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", not conf_dict["TLS_ONLY"])
# accept clients with bad clocks. This reduces the protection against replay attacks
conf_dict.setdefault("IGNORE_TIME_SKEW", False)
# length of last client ip addresses for logging
conf_dict.setdefault("CLIENT_IPS_LEN", 131072)
# delay in seconds between stats printing
conf_dict.setdefault("STATS_PRINT_PERIOD", 600)
@@ -236,6 +286,9 @@ def init_config():
# export proxy link to prometheus
conf_dict.setdefault("METRICS_EXPORT_LINKS", False)
# default prefix for metrics
conf_dict.setdefault("METRICS_PREFIX", "mtprotoproxy_")
# allow access to config by attributes
config = type("config", (dict,), conf_dict)(conf_dict)
@@ -342,12 +395,11 @@ def print_err(*params):
print(*params, file=sys.stderr, flush=True)
def init_stats():
global stats
def ensure_users_in_user_stats():
global user_stats
stats = collections.Counter()
user_stats = {user: collections.Counter() for user in config.USERS}
for user in config.USERS:
user_stats[user].update()
def init_proxy_start_time():
@@ -362,9 +414,6 @@ def update_stats(**kw_stats):
def update_user_stats(user, **kw_stats):
global user_stats
if user not in user_stats:
user_stats[user] = collections.Counter()
user_stats[user].update(**kw_stats)
@@ -431,6 +480,60 @@ class MyRandom(random.Random):
myrandom = MyRandom()
class TgConnectionPool:
MAX_CONNS_IN_POOL = 64
def __init__(self):
self.pools = {}
async def open_tg_connection(self, host, port, init_func=None):
task = asyncio.open_connection(host, port, limit=get_to_clt_bufsize())
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=config.TG_CONNECT_TIMEOUT)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
if init_func:
return await asyncio.wait_for(init_func(host, port, reader_tgt, writer_tgt),
timeout=config.TG_CONNECT_TIMEOUT)
return reader_tgt, writer_tgt
def register_host_port(self, host, port, init_func):
if (host, port, init_func) not in self.pools:
self.pools[(host, port, init_func)] = []
while len(self.pools[(host, port, init_func)]) < TgConnectionPool.MAX_CONNS_IN_POOL:
connect_task = asyncio.ensure_future(self.open_tg_connection(host, port, init_func))
self.pools[(host, port, init_func)].append(connect_task)
async def get_connection(self, host, port, init_func=None):
self.register_host_port(host, port, init_func)
ret = None
for task in self.pools[(host, port, init_func)][::]:
if task.done():
if task.exception():
self.pools[(host, port, init_func)].remove(task)
continue
reader, writer, *other = task.result()
if writer.transport.is_closing():
self.pools[(host, port, init_func)].remove(task)
continue
if not ret:
self.pools[(host, port, init_func)].remove(task)
ret = (reader, writer, *other)
self.register_host_port(host, port, init_func)
if ret:
return ret
return await self.open_tg_connection(host, port, init_func)
tg_connection_pool = TgConnectionPool()
class LayeredStreamReaderBase:
__slots__ = ("upstream", )
@@ -890,6 +993,24 @@ def gen_x25519_public_key():
return int.to_bytes((n*n) % P, length=32, byteorder="little")
async def connect_reader_to_writer(reader, writer):
BUF_SIZE = 8192
try:
while True:
data = await reader.read(BUF_SIZE)
if not data:
if not writer.transport.is_closing():
writer.write_eof()
await writer.drain()
return
writer.write(data)
await writer.drain()
except (OSError, asyncio.IncompleteReadError) as e:
pass
async def handle_bad_client(reader_clt, writer_clt, handshake):
BUF_SIZE = 8192
CONNECT_TIMEOUT = 5
@@ -909,22 +1030,6 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
pass
return
async def connect_reader_to_writer(reader, writer):
try:
while True:
data = await reader.read(BUF_SIZE)
if not data:
if not writer.transport.is_closing():
writer.write_eof()
await writer.drain()
return
writer.write(data)
await writer.drain()
except OSError:
pass
writer_srv = None
try:
host = mask_host_cached_ip or config.MASK_HOST
@@ -972,6 +1077,8 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
async def handle_fake_tls_handshake(handshake, reader, writer, peer):
global used_handshakes
global client_ips
global last_client_ips
global last_clients_with_time_skew
global last_clients_with_same_handshake
global fake_cert_len
@@ -994,14 +1101,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
tls_extensions = b"\x00\x2e" + b"\x00\x33\x00\x24" + b"\x00\x1d\x00\x20"
tls_extensions += gen_x25519_public_key() + b"\x00\x2b\x00\x02\x03\x04"
digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN]
digest = handshake[DIGEST_POS:DIGEST_POS+DIGEST_LEN]
if digest[:DIGEST_HALFLEN] in used_handshakes:
last_clients_with_same_handshake[peer[0]] += 1
return False
sess_id_len = handshake[SESSION_ID_LEN_POS]
sess_id = handshake[SESSION_ID_POS: SESSION_ID_POS + sess_id_len]
sess_id = handshake[SESSION_ID_POS:SESSION_ID_POS+sess_id_len]
for user in config.USERS:
secret = bytes.fromhex(config.USERS[user])
@@ -1017,9 +1124,12 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
timestamp = int.from_bytes(xored_digest[-4:], "little")
client_time_is_ok = TIME_SKEW_MIN < time.time() - timestamp < TIME_SKEW_MAX
# some clients fail to read unix time and send the time since boot instead
client_time_is_small = timestamp < 60*60*24*1000
if not client_time_is_ok and not is_time_skewed and not client_time_is_small:
accept_bad_time = config.IGNORE_TIME_SKEW or is_time_skewed or client_time_is_small
if not client_time_is_ok and not accept_bad_time:
last_clients_with_time_skew[peer[0]] = (time.time() - timestamp) // 60
continue
@@ -1044,6 +1154,13 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
used_handshakes.popitem(last=False)
used_handshakes[digest[:DIGEST_HALFLEN]] = True
if config.CLIENT_IPS_LEN > 0:
while len(client_ips) >= config.CLIENT_IPS_LEN:
client_ips.popitem(last=False)
if peer[0] not in client_ips:
client_ips[peer[0]] = True
last_client_ips[peer[0]] = True
reader = FakeTLSStreamReader(reader)
writer = FakeTLSStreamWriter(writer)
return reader, writer
@@ -1108,6 +1225,8 @@ async def handle_proxy_protocol(reader, peer=None):
async def handle_handshake(reader, writer):
global used_handshakes
global client_ips
global last_client_ips
global last_clients_with_same_handshake
TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03"
@@ -1116,9 +1235,11 @@ async def handle_handshake(reader, writer):
return False
peer = writer.get_extra_info("peername")[:2]
if not peer:
peer = ("unknown ip", 0)
if config.PROXY_PROTOCOL:
ip = peer[0] if peer else "unknown address"
ip = peer[0] if peer else "unknown ip"
peer = await handle_proxy_protocol(reader, peer)
if not peer:
print_err("Client from %s sent bad proxy protocol headers" % ip)
@@ -1143,7 +1264,7 @@ async def handle_handshake(reader, writer):
reader, writer = tls_handshake_result
handshake = await reader.readexactly(HANDSHAKE_LEN)
else:
if config.TLS_ONLY:
if not config.MODES["classic"] and not config.MODES["secure"]:
await handle_bad_client(reader, writer, handshake)
return False
handshake += await reader.readexactly(HANDSHAKE_LEN - len(handshake))
@@ -1173,8 +1294,14 @@ async def handle_handshake(reader, writer):
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
continue
if config.SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
continue
if proto_tag == PROTO_TAG_SECURE:
if is_tls_handshake and not config.MODES["tls"]:
continue
if not is_tls_handshake and not config.MODES["secure"]:
continue
else:
if not config.MODES["classic"]:
continue
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
@@ -1183,6 +1310,13 @@ async def handle_handshake(reader, writer):
used_handshakes.popitem(last=False)
used_handshakes[dec_prekey_and_iv] = True
if config.CLIENT_IPS_LEN > 0:
while len(client_ips) >= config.CLIENT_IPS_LEN:
client_ips.popitem(last=False)
if peer[0] not in client_ips:
client_ips[peer[0]] = True
last_client_ips[peer[0]] = True
reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv, peer
@@ -1191,21 +1325,6 @@ async def handle_handshake(reader, writer):
return False
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
for attempt in range(max_attempts-1):
try:
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
except (OSError, asyncio.TimeoutError):
continue
# the last attempt
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
@@ -1214,6 +1333,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_CONTINUES = [b"\x00\x00\x00\x00"]
global my_ip_info
global tg_connection_pool
dc_idx = abs(dc_idx) - 1
@@ -1227,18 +1347,17 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
dc = TG_DATACENTERS_V4[dc_idx]
try:
reader_tgt, writer_tgt = await open_connection_tryer(
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=config.TG_CONNECT_TIMEOUT)
reader_tgt, writer_tgt = await tg_connection_pool.get_connection(dc, TG_DATACENTER_PORT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
except ConnectionAbortedError as E:
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
while True:
rnd = bytearray(myrandom.getrandbytes(HANDSHAKE_LEN))
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
@@ -1301,49 +1420,21 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
return key, iv
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def middleproxy_handshake(host, port, reader_tgt, writer_tgt):
""" The most logic of middleproxy handshake, launched in pool """
START_SEQ_NO = -2
NONCE_LEN = 16
RPC_NONCE = b"\xaa\x87\xcb\x7a"
RPC_HANDSHAKE = b"\xf5\xee\x82\x76"
RPC_NONCE = b"\xaa\x87\xcb\x7a"
# pass as consts to simplify code
RPC_FLAGS = b"\x00\x00\x00\x00"
CRYPTO_AES = b"\x01\x00\x00\x00"
RPC_NONCE_ANS_LEN = 32
RPC_HANDSHAKE_ANS_LEN = 32
# pass as consts to simplify code
RPC_FLAGS = b"\x00\x00\x00\x00"
global my_ip_info
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
use_ipv6_clt = (":" in cl_ip)
if use_ipv6_tg:
if dc_idx not in TG_MIDDLE_PROXIES_V6:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
else:
if dc_idx not in TG_MIDDLE_PROXIES_V4:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
timeout=config.TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E:
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
key_selector = PROXY_SECRET[:4]
crypto_ts = int.to_bytes(int(time.time()) % (256**4), 4, "little")
@@ -1354,24 +1445,25 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
writer_tgt.write(msg)
await writer_tgt.drain()
old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(get_to_clt_bufsize())
if len(ans) != RPC_NONCE_ANS_LEN:
return False
raise ConnectionAbortedError("bad rpc answer length")
rpc_type, rpc_key_selector, rpc_schema, rpc_crypto_ts, rpc_nonce = (
ans[:4], ans[4:8], ans[8:12], ans[12:16], ans[16:32]
)
if rpc_type != RPC_NONCE or rpc_key_selector != key_selector or rpc_schema != CRYPTO_AES:
return False
raise ConnectionAbortedError("bad rpc answer")
# get keys
tg_ip, tg_port = writer_tgt.upstream.get_extra_info('peername')[:2]
my_ip, my_port = writer_tgt.upstream.get_extra_info('sockname')[:2]
use_ipv6_tg = (":" in tg_ip)
if not use_ipv6_tg:
if my_ip_info["ipv4"]:
# prefer global ip settings to work behind NAT
@@ -1422,11 +1514,42 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
handshake_ans = await reader_tgt.read(1)
if len(handshake_ans) != RPC_HANDSHAKE_ANS_LEN:
return False
raise ConnectionAbortedError("bad rpc handshake answer length")
handshake_type, handshake_flags, handshake_sender_pid, handshake_peer_pid = (
handshake_ans[:4], handshake_ans[4:8], handshake_ans[8:20], handshake_ans[20:32])
if handshake_type != RPC_HANDSHAKE or handshake_peer_pid != SENDER_PID:
raise ConnectionAbortedError("bad rpc handshake answer")
return reader_tgt, writer_tgt, my_ip, my_port
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
global my_ip_info
global tg_connection_pool
use_ipv6_tg = (my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]))
if use_ipv6_tg:
if dc_idx not in TG_MIDDLE_PROXIES_V6:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V6[dc_idx])
else:
if dc_idx not in TG_MIDDLE_PROXIES_V4:
return False
addr, port = myrandom.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
ret = await tg_connection_pool.get_connection(addr, port, middleproxy_handshake)
reader_tgt, writer_tgt, my_ip, my_port = ret
except ConnectionRefusedError as E:
print_err("The Telegram server %d (%s %s) is refusing connections" % (dc_idx, addr, port))
return False
except ConnectionAbortedError as E:
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to the Telegram server %d (%s %s)" % (dc_idx, addr, port))
return False
writer_tgt = ProxyReqStreamWriter(writer_tgt, cl_ip, cl_port, my_ip, my_port, proto_tag)
@@ -1435,6 +1558,32 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
return reader_tgt, writer_tgt
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
try:
while True:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
extra = {}
if not data:
wr.write_eof()
await wr.drain()
return
else:
if is_upstream:
update_user_stats(user, octets_from_client=len(data), msgs_from_client=1)
else:
update_user_stats(user, octets_to_client=len(data), msgs_to_client=1)
wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.IncompleteReadError) as e:
# print_err(e)
pass
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), config.CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT)
@@ -1497,44 +1646,10 @@ async def handle_client(reader_clt, writer_clt):
else:
return
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_if_first_pkt_bad=False):
global last_clients_with_first_pkt_error
is_first_pkt = True
try:
while True:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
extra = {}
# protection against replay-based fingerprinting
if is_first_pkt:
is_first_pkt = False
ERR_PKT_DATA = b'l\xfe\xff\xff'
if block_if_first_pkt_bad and data == ERR_PKT_DATA:
last_clients_with_first_pkt_error[cl_ip] += 1
wr.write_eof()
await wr.drain()
return
if not data:
wr.write_eof()
await wr.drain()
return
else:
update_user_stats(user, octets=len(data), msgs=1)
wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.streams.IncompleteReadError) as e:
# print_err(e)
pass
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
block_if_first_pkt_bad=config.BLOCK_IF_FIRST_PKT_BAD)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
tg_to_clt = tg_connect_reader_to_writer(reader_tg, writer_clt, user,
get_to_clt_bufsize(), False)
clt_to_tg = tg_connect_reader_to_writer(reader_clt, writer_tg,
user, get_to_tg_bufsize(), True)
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
@@ -1552,7 +1667,8 @@ async def handle_client(reader_clt, writer_clt):
user_data_quota_hit = (
user in config.USER_DATA_QUOTA and
user_stats[user]["octets"] > config.USER_DATA_QUOTA[user]
(user_stats[user]["octets_to_client"] +
user_stats[user]["octets_from_client"] > config.USER_DATA_QUOTA[user])
)
if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit):
@@ -1573,7 +1689,7 @@ async def handle_client_wrapper(reader, writer):
await handle_client(reader, writer)
except (asyncio.IncompleteReadError, asyncio.CancelledError):
pass
except (ConnectionResetError, TimeoutError):
except (ConnectionResetError, TimeoutError, BrokenPipeError):
pass
except Exception:
traceback.print_exc()
@@ -1586,6 +1702,7 @@ def make_metrics_pkt(metrics):
used_names = set()
for name, m_type, desc, val in metrics:
name = config.METRICS_PREFIX + name
if name not in used_names:
pkt_body_list.append("# HELP %s %s" % (name, desc))
pkt_body_list.append("# TYPE %s %s" % (name, m_type))
@@ -1622,7 +1739,6 @@ async def handle_metrics(reader, writer):
global proxy_start_time
global proxy_links
global last_clients_with_time_skew
global last_clients_with_first_pkt_error
global last_clients_with_same_handshake
client_ip = writer.get_extra_info("peername")[0]
@@ -1659,13 +1775,25 @@ async def handle_metrics(reader, writer):
user_metrics_desc = [
["user_connects", "counter", "user connects", "connects"],
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
["user_octets", "counter", "octets proxied for user", "octets"],
["user_msgs", "counter", "msgs proxied for user", "msgs"],
["user_octets", "counter", "octets proxied for user",
"octets_from_client+octets_to_client"],
["user_msgs", "counter", "msgs proxied for user",
"msgs_from_client+msgs_to_client"],
["user_octets_from", "counter", "octets proxied from user", "octets_from_client"],
["user_octets_to", "counter", "octets proxied to user", "octets_to_client"],
["user_msgs_from", "counter", "msgs proxied from user", "msgs_from_client"],
["user_msgs_to", "counter", "msgs proxied to user", "msgs_to_client"],
]
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
for user, stat in user_stats.items():
metric = {"user": user, "val": stat[stat_key]}
if "+" in stat_key:
val = 0
for key_part in stat_key.split("+"):
val += stat[key_part]
else:
val = stat[stat_key]
metric = {"user": user, "val": val}
metrics.append([m_name, m_type, m_desc, metric])
pkt = make_metrics_pkt(metrics)
@@ -1680,8 +1808,8 @@ async def handle_metrics(reader, writer):
async def stats_printer():
global user_stats
global last_client_ips
global last_clients_with_time_skew
global last_clients_with_first_pkt_error
global last_clients_with_same_handshake
while True:
@@ -1691,21 +1819,23 @@ async def stats_printer():
for user, stat in user_stats.items():
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
user, stat["connects"], stat["curr_connects"],
stat["octets"] / 1000000, stat["msgs"]))
(stat["octets_from_client"] + stat["octets_to_client"]) / 1000000,
stat["msgs_from_client"] + stat["msgs_to_client"]))
print(flush=True)
if last_client_ips:
print("New IPs:")
for ip in last_client_ips:
print(ip)
print(flush=True)
last_client_ips.clear()
if last_clients_with_time_skew:
print("Clients with time skew (possible replay-attackers):")
for ip, skew_minutes in last_clients_with_time_skew.items():
print("%s, clocks were %d minutes behind" % (ip, skew_minutes))
print(flush=True)
last_clients_with_time_skew.clear()
if last_clients_with_first_pkt_error:
print("Clients with error on the first packet (possible replay-attackers):")
for ip, times in last_clients_with_first_pkt_error.items():
print("%s, %d times" % (ip, times))
print(flush=True)
last_clients_with_first_pkt_error.clear()
if last_clients_with_same_handshake:
print("Clients with duplicate handshake (likely replay-attackers):")
for ip, times in last_clients_with_same_handshake.items():
@@ -1803,7 +1933,11 @@ async def get_mask_host_cert_len():
task = get_encrypted_cert(config.MASK_HOST, config.MASK_PORT, config.TLS_DOMAIN)
cert = await asyncio.wait_for(task, timeout=GET_CERT_TIMEOUT)
if cert:
if len(cert) != fake_cert_len:
if len(cert) < MIN_CERT_LEN:
msg = ("The MASK_HOST %s returned several TLS records, this is not supported" %
config.MASK_HOST)
print_err(msg)
elif len(cert) != fake_cert_len:
fake_cert_len = len(cert)
print_err("Got cert from the MASK_HOST %s, its length is %d" %
(config.MASK_HOST, fake_cert_len))
@@ -1965,7 +2099,7 @@ def print_tg_info():
if config.PORT == 3256:
print("The default port 3256 is used, this is not recommended", flush=True)
if config.TLS_ONLY:
if not config.MODES["classic"] and not config.MODES["secure"]:
print("Since you have TLS only mode enabled the best port is 443", flush=True)
print_default_warning = True
@@ -1977,31 +2111,33 @@ def print_tg_info():
for user, secret in sorted(config.USERS.items(), key=lambda x: x[0]):
for ip in ip_addrs:
if not config.TLS_ONLY:
if not config.SECURE_ONLY:
params = {"server": ip, "port": config.PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
classic_link = "tg://proxy?{}".format(params_encodeded)
proxy_links.append({"user": user, "link": classic_link})
print("{}: {}".format(user, classic_link), flush=True)
if config.MODES["classic"]:
params = {"server": ip, "port": config.PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
classic_link = "tg://proxy?{}".format(params_encodeded)
proxy_links.append({"user": user, "link": classic_link})
print("{}: {}".format(user, classic_link), flush=True)
if config.MODES["secure"]:
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
dd_link = "tg://proxy?{}".format(params_encodeded)
proxy_links.append({"user": user, "link": dd_link})
print("{}: {}".format(user, dd_link), flush=True)
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
# the base64 links is buggy on ios
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
tls_link = "tg://proxy?{}".format(params_encodeded)
proxy_links.append({"user": user, "link": tls_link})
print("{}: {}".format(user, tls_link), flush=True)
if config.MODES["tls"]:
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
# the base64 links is buggy on ios
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
tls_link = "tg://proxy?{}".format(params_encodeded)
proxy_links.append({"user": user, "link": tls_link})
print("{}: {}".format(user, tls_link), flush=True)
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef"]:
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef",
"00000000000000000000000000000001"]:
msg = "The default secret {} is used, this is not recommended".format(secret)
print(msg, flush=True)
random_secret = "".join(myrandom.choice("0123456789abcdef") for i in range(32))
@@ -2029,6 +2165,11 @@ def setup_files_limit():
pass
def setup_asyncio():
# get rid of annoying "socket.send() raised exception" log messages
asyncio.constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES = 100
def setup_signals():
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
@@ -2040,6 +2181,7 @@ def setup_signals():
if hasattr(signal, 'SIGUSR2'):
def reload_signal(signum, frame):
init_config()
ensure_users_in_user_stats()
apply_upstream_proxy_settings()
print("Config reloaded", flush=True, file=sys.stderr)
print_tg_info()
@@ -2095,12 +2237,77 @@ def loop_exception_handler(loop, context):
loop.default_exception_handler(context)
def create_servers(loop):
servers = []
reuse_port = hasattr(socket, "SO_REUSEPORT")
has_unix = hasattr(socket, "AF_UNIX")
if config.LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
servers.append(loop.run_until_complete(task))
if config.LISTEN_ADDR_IPV6 and socket.has_ipv6:
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV6, config.PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port)
servers.append(loop.run_until_complete(task))
if config.LISTEN_UNIX_SOCK and has_unix:
remove_unix_socket(config.LISTEN_UNIX_SOCK)
task = asyncio.start_unix_server(handle_client_wrapper, config.LISTEN_UNIX_SOCK,
limit=get_to_tg_bufsize())
servers.append(loop.run_until_complete(task))
os.chmod(config.LISTEN_UNIX_SOCK, 0o666)
if config.METRICS_PORT is not None:
if config.METRICS_LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4,
config.METRICS_PORT)
servers.append(loop.run_until_complete(task))
if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6,
config.METRICS_PORT)
servers.append(loop.run_until_complete(task))
return servers
def create_utilitary_tasks(loop):
tasks = []
stats_printer_task = asyncio.Task(stats_printer())
tasks.append(stats_printer_task)
if config.USE_MIDDLE_PROXY:
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
tasks.append(middle_proxy_updater_task)
if config.GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
tasks.append(time_get_task)
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
tasks.append(get_cert_len_task)
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
tasks.append(clear_resolving_cache_task)
return tasks
def main():
init_config()
ensure_users_in_user_stats()
apply_upstream_proxy_settings()
init_ip_info()
print_tg_info()
setup_asyncio()
setup_files_limit()
setup_signals()
try_setup_uvloop()
init_stats()
init_proxy_start_time()
if sys.platform == "win32":
@@ -2110,66 +2317,32 @@ def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(loop_exception_handler)
stats_printer_task = asyncio.Task(stats_printer())
asyncio.ensure_future(stats_printer_task)
utilitary_tasks = create_utilitary_tasks(loop)
for task in utilitary_tasks:
asyncio.ensure_future(task)
if config.USE_MIDDLE_PROXY:
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if config.GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
asyncio.ensure_future(time_get_task)
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
asyncio.ensure_future(get_cert_len_task)
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
asyncio.ensure_future(clear_resolving_cache_task)
reuse_port = hasattr(socket, "SO_REUSEPORT")
has_unix = hasattr(socket, "AF_UNIX")
servers = []
if config.LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
servers.append(loop.run_until_complete(task))
if config.LISTEN_ADDR_IPV6 and socket.has_ipv6:
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV6, config.PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
servers.append(loop.run_until_complete(task))
if config.LISTEN_UNIX_SOCK and has_unix:
remove_unix_socket(config.LISTEN_UNIX_SOCK)
task = asyncio.start_unix_server(handle_client_wrapper, config.LISTEN_UNIX_SOCK,
limit=get_to_tg_bufsize(), loop=loop)
servers.append(loop.run_until_complete(task))
os.chmod(config.LISTEN_UNIX_SOCK, 0o666)
if config.METRICS_PORT is not None:
if config.METRICS_LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4,
config.METRICS_PORT, loop=loop)
servers.append(loop.run_until_complete(task))
if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6,
config.METRICS_PORT, loop=loop)
servers.append(loop.run_until_complete(task))
servers = create_servers(loop)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
for task in asyncio.Task.all_tasks():
if hasattr(asyncio, "all_tasks"):
tasks = asyncio.all_tasks(loop)
else:
# for compatibility with Python 3.6
tasks = asyncio.Task.all_tasks(loop)
for task in tasks:
task.cancel()
for server in servers:
server.close()
loop.run_until_complete(server.wait_closed())
has_unix = hasattr(socket, "AF_UNIX")
if config.LISTEN_UNIX_SOCK and has_unix:
remove_unix_socket(config.LISTEN_UNIX_SOCK)
@@ -2177,8 +2350,4 @@ def main():
if __name__ == "__main__":
init_config()
apply_upstream_proxy_settings()
init_ip_info()
print_tg_info()
main()