mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0614c35020 | ||
|
|
368669a546 | ||
|
|
949ee12ed2 | ||
|
|
1f7ce9e977 | ||
|
|
8920faf650 | ||
|
|
34f743858c | ||
|
|
375fee1535 | ||
|
|
e0ea17978c | ||
|
|
8bb885ada5 | ||
|
|
bc841cff48 | ||
|
|
c89479000f | ||
|
|
74711c4212 | ||
|
|
51b2482dec | ||
|
|
87f4370927 | ||
|
|
a978eae922 | ||
|
|
88c8c57a44 | ||
|
|
446682521b | ||
|
|
b26873176a | ||
|
|
6e8e8b63b2 | ||
|
|
3b4f239cc1 | ||
|
|
0283d6264a | ||
|
|
15a8f607ca | ||
|
|
6076db9f8c | ||
|
|
6560a6c1d2 | ||
|
|
24479e68ab | ||
|
|
6ecf0ec9ac | ||
|
|
18a80e52cd | ||
|
|
ea3b8a44c3 | ||
|
|
37d570f8dc | ||
|
|
8f48e9ef65 | ||
|
|
76bc2253eb | ||
|
|
07bd9b795a | ||
|
|
1cad031947 | ||
|
|
923dac842b | ||
|
|
1a63fdae11 | ||
|
|
c7b6dcf3c2 | ||
|
|
a95b1ec3c1 | ||
|
|
49bc5d1f3b | ||
|
|
c2414c3487 | ||
|
|
8b26cc843d | ||
|
|
639dea5e8d | ||
|
|
c48cacce83 | ||
|
|
2bb0ef0b1f | ||
|
|
f5ee5db86f | ||
|
|
9c50cab94e | ||
|
|
199eaeb7c4 |
15
Dockerfile
15
Dockerfile
@@ -1,15 +1,14 @@
|
||||
FROM alpine:3.10
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN adduser tgproxy -u 10000 -D
|
||||
RUN apt-get update && apt-get install --no-install-recommends -y python3 python3-uvloop python3-cryptography python3-socks libcap2-bin ca-certificates && rm -rf /var/lib/apt/lists/*
|
||||
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.12
|
||||
|
||||
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
|
||||
|
||||
RUN chown -R tgproxy:tgproxy /home/tgproxy
|
||||
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.7
|
||||
|
||||
COPY mtprotoproxy.py config.py /home/tgproxy/
|
||||
RUN useradd tgproxy -u 10000
|
||||
|
||||
USER tgproxy
|
||||
|
||||
WORKDIR /home/tgproxy/
|
||||
|
||||
COPY --chown=tgproxy mtprotoproxy.py config.py /home/tgproxy/
|
||||
|
||||
CMD ["python3", "mtprotoproxy.py"]
|
||||
|
||||
@@ -7,4 +7,10 @@ services:
|
||||
volumes:
|
||||
- ./config.py:/home/tgproxy/config.py
|
||||
- ./mtprotoproxy.py:/home/tgproxy/mtprotoproxy.py
|
||||
- /etc/localtime:/etc/localtime:ro
|
||||
logging:
|
||||
driver: "json-file"
|
||||
options:
|
||||
max-file: "10"
|
||||
max-size: "10m"
|
||||
# mem_limit: 1024m
|
||||
|
||||
365
mtprotoproxy.py
365
mtprotoproxy.py
@@ -36,10 +36,10 @@ TG_DATACENTERS_V6 = [
|
||||
# This list will be updated in the runtime
|
||||
TG_MIDDLE_PROXIES_V4 = {
|
||||
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
|
||||
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
|
||||
2: [("149.154.161.144", 8888)], -2: [("149.154.161.144", 8888)],
|
||||
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
|
||||
4: [("91.108.4.136", 8888)], -4: [("149.154.165.109", 8888)],
|
||||
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
|
||||
5: [("91.108.56.183", 8888)], -5: [("91.108.56.183", 8888)]
|
||||
}
|
||||
|
||||
TG_MIDDLE_PROXIES_V6 = {
|
||||
@@ -47,7 +47,7 @@ TG_MIDDLE_PROXIES_V6 = {
|
||||
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
|
||||
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
|
||||
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
|
||||
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
|
||||
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:b28:f23f:f005::d", 8888)]
|
||||
}
|
||||
|
||||
PROXY_SECRET = bytes.fromhex(
|
||||
@@ -62,7 +62,6 @@ PREKEY_LEN = 32
|
||||
KEY_LEN = 32
|
||||
IV_LEN = 16
|
||||
HANDSHAKE_LEN = 64
|
||||
TLS_HANDSHAKE_LEN = 1 + 2 + 2 + 512
|
||||
PROTO_TAG_POS = 56
|
||||
DC_IDX_POS = 60
|
||||
|
||||
@@ -93,6 +92,9 @@ last_clients_with_same_handshake = collections.Counter()
|
||||
proxy_start_time = 0
|
||||
proxy_links = []
|
||||
|
||||
stats = collections.Counter()
|
||||
user_stats = collections.defaultdict(collections.Counter)
|
||||
|
||||
config = {}
|
||||
|
||||
|
||||
@@ -123,15 +125,24 @@ def init_config():
|
||||
conf_dict.setdefault("USERS", {"tg": "00000000000000000000000000000000"})
|
||||
conf_dict["AD_TAG"] = bytes.fromhex(conf_dict.get("AD_TAG", ""))
|
||||
|
||||
for user, secret in conf_dict["USERS"].items():
|
||||
if not re.fullmatch("[0-9a-fA-F]{32}", secret):
|
||||
fixed_secret = re.sub(r"[^0-9a-fA-F]", "", secret).zfill(32)[:32]
|
||||
|
||||
print_err("Bad secret for user %s, should be 32 hex chars, got %s. " % (user, secret))
|
||||
print_err("Changing it to %s" % fixed_secret)
|
||||
|
||||
conf_dict["USERS"][user] = fixed_secret
|
||||
|
||||
# load advanced settings
|
||||
|
||||
# use middle proxy, necessary to show ad
|
||||
conf_dict.setdefault("USE_MIDDLE_PROXY", len(conf_dict["AD_TAG"]) == 16)
|
||||
|
||||
# if IPv6 avaliable, use it by default
|
||||
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6)
|
||||
# if IPv6 available, use it by default, IPv6 with middle proxies is unstable now
|
||||
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6 and not conf_dict["USE_MIDDLE_PROXY"])
|
||||
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
# disables tg->client traffic reencryption, faster but less secure
|
||||
conf_dict.setdefault("FAST_MODE", True)
|
||||
|
||||
# enables some working modes
|
||||
@@ -184,6 +195,9 @@ def init_config():
|
||||
# the next host to forward bad clients
|
||||
conf_dict.setdefault("MASK_HOST", conf_dict["TLS_DOMAIN"])
|
||||
|
||||
# set the home domain for the proxy, has an influence only on the log message
|
||||
conf_dict.setdefault("MY_DOMAIN", False)
|
||||
|
||||
# the next host's port to forward bad clients
|
||||
conf_dict.setdefault("MASK_PORT", 443)
|
||||
|
||||
@@ -213,6 +227,9 @@ def init_config():
|
||||
# length of used handshake randoms for active fingerprinting protection, zero to disable
|
||||
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
|
||||
|
||||
# accept clients with bad clocks. This reduces the protection against replay attacks
|
||||
conf_dict.setdefault("IGNORE_TIME_SKEW", False)
|
||||
|
||||
# length of last client ip addresses for logging
|
||||
conf_dict.setdefault("CLIENT_IPS_LEN", 131072)
|
||||
|
||||
@@ -247,6 +264,9 @@ def init_config():
|
||||
# telegram servers connect timeout in seconds
|
||||
conf_dict.setdefault("TG_CONNECT_TIMEOUT", 10)
|
||||
|
||||
# drop connection if no data from telegram server for this many seconds
|
||||
conf_dict.setdefault("TG_READ_TIMEOUT", 60)
|
||||
|
||||
# listen address for IPv4
|
||||
conf_dict.setdefault("LISTEN_ADDR_IPV4", "0.0.0.0")
|
||||
|
||||
@@ -380,12 +400,11 @@ def print_err(*params):
|
||||
print(*params, file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def init_stats():
|
||||
global stats
|
||||
def ensure_users_in_user_stats():
|
||||
global user_stats
|
||||
|
||||
stats = collections.Counter()
|
||||
user_stats = {user: collections.Counter() for user in config.USERS}
|
||||
for user in config.USERS:
|
||||
user_stats[user].update()
|
||||
|
||||
|
||||
def init_proxy_start_time():
|
||||
@@ -400,9 +419,6 @@ def update_stats(**kw_stats):
|
||||
|
||||
def update_user_stats(user, **kw_stats):
|
||||
global user_stats
|
||||
|
||||
if user not in user_stats:
|
||||
user_stats[user] = collections.Counter()
|
||||
user_stats[user].update(**kw_stats)
|
||||
|
||||
|
||||
@@ -470,7 +486,7 @@ myrandom = MyRandom()
|
||||
|
||||
|
||||
class TgConnectionPool:
|
||||
MAX_CONNS_IN_POOL = 64
|
||||
MAX_CONNS_IN_POOL = 16
|
||||
|
||||
def __init__(self):
|
||||
self.pools = {}
|
||||
@@ -487,6 +503,16 @@ class TgConnectionPool:
|
||||
timeout=config.TG_CONNECT_TIMEOUT)
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
def is_conn_dead(self, reader, writer):
|
||||
if writer.transport.is_closing():
|
||||
return True
|
||||
raw_reader = reader
|
||||
while hasattr(raw_reader, 'upstream'):
|
||||
raw_reader = raw_reader.upstream
|
||||
if raw_reader.at_eof():
|
||||
return True
|
||||
return False
|
||||
|
||||
def register_host_port(self, host, port, init_func):
|
||||
if (host, port, init_func) not in self.pools:
|
||||
self.pools[(host, port, init_func)] = []
|
||||
@@ -499,15 +525,16 @@ class TgConnectionPool:
|
||||
self.register_host_port(host, port, init_func)
|
||||
|
||||
ret = None
|
||||
for task in self.pools[(host, port, init_func)][::]:
|
||||
for task in self.pools[(host, port, init_func)][:]:
|
||||
if task.done():
|
||||
if task.exception():
|
||||
self.pools[(host, port, init_func)].remove(task)
|
||||
continue
|
||||
|
||||
reader, writer, *other = task.result()
|
||||
if writer.transport.is_closing():
|
||||
if self.is_conn_dead(reader, writer):
|
||||
self.pools[(host, port, init_func)].remove(task)
|
||||
writer.transport.abort()
|
||||
continue
|
||||
|
||||
if not ret:
|
||||
@@ -644,7 +671,7 @@ class CryptoWrappedStreamReader(LayeredStreamReaderBase):
|
||||
|
||||
needed_till_full_block = -len(data) % self.block_size
|
||||
if needed_till_full_block > 0:
|
||||
data += self.upstream.readexactly(needed_till_full_block)
|
||||
data += await self.upstream.readexactly(needed_till_full_block)
|
||||
return self.decryptor.decrypt(data)
|
||||
|
||||
async def readexactly(self, n):
|
||||
@@ -765,7 +792,7 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
||||
|
||||
def write(self, data, extra={}):
|
||||
SMALL_PKT_BORDER = 0x7f
|
||||
LARGE_PKT_BORGER = 256 ** 3
|
||||
LARGE_PKT_BORDER = 256 ** 3
|
||||
|
||||
if len(data) % 4 != 0:
|
||||
print_err("BUG: MTProtoFrameStreamWriter attempted to send msg with len", len(data))
|
||||
@@ -778,7 +805,7 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
|
||||
|
||||
if len_div_four < SMALL_PKT_BORDER:
|
||||
return self.upstream.write(bytes([len_div_four]) + data)
|
||||
elif len_div_four < LARGE_PKT_BORGER:
|
||||
elif len_div_four < LARGE_PKT_BORDER:
|
||||
return self.upstream.write(b'\x7f' + int.to_bytes(len_div_four, 3, 'little') + data)
|
||||
else:
|
||||
print_err("Attempted to send too large pkt len =", len(data))
|
||||
@@ -854,6 +881,7 @@ class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
||||
RPC_CLOSE_EXT = b"\xa2\x34\xb6\x5e"
|
||||
RPC_SIMPLE_ACK = b"\x9b\x40\xac\x3b"
|
||||
RPC_UNKNOWN = b'\xdf\xa2\x30\x57'
|
||||
|
||||
data = await self.upstream.read(1)
|
||||
|
||||
@@ -872,8 +900,11 @@ class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||
conn_id, confirm = data[4:12], data[12:16]
|
||||
return confirm, {"SIMPLE_ACK": True}
|
||||
|
||||
if ans_type == RPC_UNKNOWN:
|
||||
return b"", {"SKIP_SEND": True}
|
||||
|
||||
print_err("unknown rpc ans type:", ans_type)
|
||||
return b""
|
||||
return b"", {"SKIP_SEND": True}
|
||||
|
||||
|
||||
class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||
@@ -982,6 +1013,24 @@ def gen_x25519_public_key():
|
||||
return int.to_bytes((n*n) % P, length=32, byteorder="little")
|
||||
|
||||
|
||||
async def connect_reader_to_writer(reader, writer):
|
||||
BUF_SIZE = 8192
|
||||
try:
|
||||
while True:
|
||||
data = await reader.read(BUF_SIZE)
|
||||
|
||||
if not data:
|
||||
if not writer.transport.is_closing():
|
||||
writer.write_eof()
|
||||
await writer.drain()
|
||||
return
|
||||
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
pass
|
||||
|
||||
|
||||
async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
BUF_SIZE = 8192
|
||||
CONNECT_TIMEOUT = 5
|
||||
@@ -1001,22 +1050,6 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
pass
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(reader, writer):
|
||||
try:
|
||||
while True:
|
||||
data = await reader.read(BUF_SIZE)
|
||||
|
||||
if not data:
|
||||
if not writer.transport.is_closing():
|
||||
writer.write_eof()
|
||||
await writer.drain()
|
||||
return
|
||||
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
writer_srv = None
|
||||
try:
|
||||
host = mask_host_cached_ip or config.MASK_HOST
|
||||
@@ -1088,14 +1121,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
tls_extensions = b"\x00\x2e" + b"\x00\x33\x00\x24" + b"\x00\x1d\x00\x20"
|
||||
tls_extensions += gen_x25519_public_key() + b"\x00\x2b\x00\x02\x03\x04"
|
||||
|
||||
digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN]
|
||||
digest = handshake[DIGEST_POS:DIGEST_POS+DIGEST_LEN]
|
||||
|
||||
if digest[:DIGEST_HALFLEN] in used_handshakes:
|
||||
last_clients_with_same_handshake[peer[0]] += 1
|
||||
return False
|
||||
|
||||
sess_id_len = handshake[SESSION_ID_LEN_POS]
|
||||
sess_id = handshake[SESSION_ID_POS: SESSION_ID_POS + sess_id_len]
|
||||
sess_id = handshake[SESSION_ID_POS:SESSION_ID_POS+sess_id_len]
|
||||
|
||||
for user in config.USERS:
|
||||
secret = bytes.fromhex(config.USERS[user])
|
||||
@@ -1111,9 +1144,12 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
|
||||
timestamp = int.from_bytes(xored_digest[-4:], "little")
|
||||
client_time_is_ok = TIME_SKEW_MIN < time.time() - timestamp < TIME_SKEW_MAX
|
||||
|
||||
# some clients fail to read unix time and send the time since boot instead
|
||||
client_time_is_small = timestamp < 60*60*24*1000
|
||||
if not client_time_is_ok and not is_time_skewed and not client_time_is_small:
|
||||
accept_bad_time = config.IGNORE_TIME_SKEW or is_time_skewed or client_time_is_small
|
||||
|
||||
if not client_time_is_ok and not accept_bad_time:
|
||||
last_clients_with_time_skew[peer[0]] = (time.time() - timestamp) // 60
|
||||
continue
|
||||
|
||||
@@ -1213,15 +1249,17 @@ async def handle_handshake(reader, writer):
|
||||
global last_client_ips
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03"
|
||||
TLS_START_BYTES = b"\x16\x03\x01"
|
||||
|
||||
if writer.transport.is_closing() or writer.get_extra_info("peername") is None:
|
||||
return False
|
||||
|
||||
peer = writer.get_extra_info("peername")[:2]
|
||||
if not peer:
|
||||
peer = ("unknown ip", 0)
|
||||
|
||||
if config.PROXY_PROTOCOL:
|
||||
ip = peer[0] if peer else "unknown address"
|
||||
ip = peer[0] if peer else "unknown ip"
|
||||
peer = await handle_proxy_protocol(reader, peer)
|
||||
if not peer:
|
||||
print_err("Client from %s sent bad proxy protocol headers" % ip)
|
||||
@@ -1237,7 +1275,13 @@ async def handle_handshake(reader, writer):
|
||||
break
|
||||
|
||||
if is_tls_handshake:
|
||||
handshake += await reader.readexactly(TLS_HANDSHAKE_LEN - len(handshake))
|
||||
handshake += await reader.readexactly(2)
|
||||
tls_handshake_len = int.from_bytes(handshake[-2:], "big")
|
||||
if tls_handshake_len < 512:
|
||||
is_tls_handshake = False
|
||||
|
||||
if is_tls_handshake:
|
||||
handshake += await reader.readexactly(tls_handshake_len)
|
||||
tls_handshake_result = await handle_fake_tls_handshake(handshake, reader, writer, peer)
|
||||
|
||||
if not tls_handshake_result:
|
||||
@@ -1334,7 +1378,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
except ConnectionAbortedError as E:
|
||||
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, addr, port, E))
|
||||
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, dc, TG_DATACENTER_PORT, E))
|
||||
return False
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
||||
@@ -1540,6 +1584,39 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
|
||||
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
|
||||
try:
|
||||
while True:
|
||||
if not is_upstream:
|
||||
data = await asyncio.wait_for(rd.read(rd_buf_size),
|
||||
timeout=config.TG_READ_TIMEOUT)
|
||||
else:
|
||||
data = await rd.read(rd_buf_size)
|
||||
if isinstance(data, tuple):
|
||||
data, extra = data
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
if extra.get("SKIP_SEND"):
|
||||
continue
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
else:
|
||||
if is_upstream:
|
||||
update_user_stats(user, octets_from_client=len(data), msgs_from_client=1)
|
||||
else:
|
||||
update_user_stats(user, octets_to_client=len(data), msgs_to_client=1)
|
||||
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.IncompleteReadError, asyncio.TimeoutError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
|
||||
async def handle_client(reader_clt, writer_clt):
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), config.CLIENT_KEEPALIVE, attempts=3)
|
||||
set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT)
|
||||
@@ -1602,29 +1679,10 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(rd_buf_size)
|
||||
if isinstance(data, tuple):
|
||||
data, extra = data
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
else:
|
||||
update_user_stats(user, octets=len(data), msgs=1)
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize())
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
|
||||
tg_to_clt = tg_connect_reader_to_writer(reader_tg, writer_clt, user,
|
||||
get_to_clt_bufsize(), False)
|
||||
clt_to_tg = tg_connect_reader_to_writer(reader_clt, writer_tg,
|
||||
user, get_to_tg_bufsize(), True)
|
||||
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
||||
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
||||
|
||||
@@ -1642,7 +1700,8 @@ async def handle_client(reader_clt, writer_clt):
|
||||
|
||||
user_data_quota_hit = (
|
||||
user in config.USER_DATA_QUOTA and
|
||||
user_stats[user]["octets"] > config.USER_DATA_QUOTA[user]
|
||||
(user_stats[user]["octets_to_client"] +
|
||||
user_stats[user]["octets_from_client"] > config.USER_DATA_QUOTA[user])
|
||||
)
|
||||
|
||||
if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit):
|
||||
@@ -1663,7 +1722,7 @@ async def handle_client_wrapper(reader, writer):
|
||||
await handle_client(reader, writer)
|
||||
except (asyncio.IncompleteReadError, asyncio.CancelledError):
|
||||
pass
|
||||
except (ConnectionResetError, TimeoutError):
|
||||
except (ConnectionResetError, TimeoutError, BrokenPipeError):
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
@@ -1696,6 +1755,7 @@ def make_metrics_pkt(metrics):
|
||||
|
||||
pkt_header_list = []
|
||||
pkt_header_list.append("HTTP/1.1 200 OK")
|
||||
pkt_header_list.append("Connection: close")
|
||||
pkt_header_list.append("Content-Length: %d" % len(pkt_body))
|
||||
pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8")
|
||||
pkt_header_list.append("Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
|
||||
@@ -1749,13 +1809,25 @@ async def handle_metrics(reader, writer):
|
||||
user_metrics_desc = [
|
||||
["user_connects", "counter", "user connects", "connects"],
|
||||
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
|
||||
["user_octets", "counter", "octets proxied for user", "octets"],
|
||||
["user_msgs", "counter", "msgs proxied for user", "msgs"],
|
||||
["user_octets", "counter", "octets proxied for user",
|
||||
"octets_from_client+octets_to_client"],
|
||||
["user_msgs", "counter", "msgs proxied for user",
|
||||
"msgs_from_client+msgs_to_client"],
|
||||
["user_octets_from", "counter", "octets proxied from user", "octets_from_client"],
|
||||
["user_octets_to", "counter", "octets proxied to user", "octets_to_client"],
|
||||
["user_msgs_from", "counter", "msgs proxied from user", "msgs_from_client"],
|
||||
["user_msgs_to", "counter", "msgs proxied to user", "msgs_to_client"],
|
||||
]
|
||||
|
||||
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
|
||||
for user, stat in user_stats.items():
|
||||
metric = {"user": user, "val": stat[stat_key]}
|
||||
if "+" in stat_key:
|
||||
val = 0
|
||||
for key_part in stat_key.split("+"):
|
||||
val += stat[key_part]
|
||||
else:
|
||||
val = stat[stat_key]
|
||||
metric = {"user": user, "val": val}
|
||||
metrics.append([m_name, m_type, m_desc, metric])
|
||||
|
||||
pkt = make_metrics_pkt(metrics)
|
||||
@@ -1781,7 +1853,8 @@ async def stats_printer():
|
||||
for user, stat in user_stats.items():
|
||||
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
||||
user, stat["connects"], stat["curr_connects"],
|
||||
stat["octets"] / 1000000, stat["msgs"]))
|
||||
(stat["octets_from_client"] + stat["octets_to_client"]) / 1000000,
|
||||
stat["msgs_from_client"] + stat["msgs_to_client"]))
|
||||
print(flush=True)
|
||||
|
||||
if last_client_ips:
|
||||
@@ -1875,6 +1948,16 @@ async def get_encrypted_cert(host, port, server_name):
|
||||
if record3_type != 23:
|
||||
return b""
|
||||
|
||||
if len(record3) < MIN_CERT_LEN:
|
||||
record4_type, record4 = await get_tls_record(reader)
|
||||
if record4_type != 23:
|
||||
return b""
|
||||
msg = ("The MASK_HOST %s sent some TLS record before certificate record, this makes the " +
|
||||
"proxy more detectable") % config.MASK_HOST
|
||||
print_err(msg)
|
||||
|
||||
return record4
|
||||
|
||||
return record3
|
||||
|
||||
|
||||
@@ -1935,7 +2018,8 @@ async def get_srv_time():
|
||||
continue
|
||||
line = line[len("Date: "):].decode()
|
||||
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
|
||||
now_time = datetime.datetime.utcnow()
|
||||
srv_time = srv_time.replace(tzinfo=datetime.timezone.utc)
|
||||
now_time = datetime.datetime.now(datetime.timezone.utc)
|
||||
is_time_skewed = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW
|
||||
if is_time_skewed and config.USE_MIDDLE_PROXY and not disable_middle_proxy:
|
||||
print_err("Time skew detected, please set the clock")
|
||||
@@ -2043,6 +2127,10 @@ def init_ip_info():
|
||||
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
|
||||
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
|
||||
|
||||
# the server can return ipv4 address instead of ipv6
|
||||
if my_ip_info["ipv6"] and ":" not in my_ip_info["ipv6"]:
|
||||
my_ip_info["ipv6"] = None
|
||||
|
||||
if my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]):
|
||||
print_err("IPv6 found, using it for external communication")
|
||||
|
||||
@@ -2064,9 +2152,12 @@ def print_tg_info():
|
||||
print("Since you have TLS only mode enabled the best port is 443", flush=True)
|
||||
print_default_warning = True
|
||||
|
||||
ip_addrs = [ip for ip in my_ip_info.values() if ip]
|
||||
if not ip_addrs:
|
||||
ip_addrs = ["YOUR_IP"]
|
||||
if not config.MY_DOMAIN:
|
||||
ip_addrs = [ip for ip in my_ip_info.values() if ip]
|
||||
if not ip_addrs:
|
||||
ip_addrs = ["YOUR_IP"]
|
||||
else:
|
||||
ip_addrs = [config.MY_DOMAIN]
|
||||
|
||||
proxy_links = []
|
||||
|
||||
@@ -2074,15 +2165,15 @@ def print_tg_info():
|
||||
for ip in ip_addrs:
|
||||
if config.MODES["classic"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encodeded)
|
||||
params_encoded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encoded)
|
||||
proxy_links.append({"user": user, "link": classic_link})
|
||||
print("{}: {}".format(user, classic_link), flush=True)
|
||||
|
||||
if config.MODES["secure"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
dd_link = "tg://proxy?{}".format(params_encodeded)
|
||||
params_encoded = urllib.parse.urlencode(params, safe=':')
|
||||
dd_link = "tg://proxy?{}".format(params_encoded)
|
||||
proxy_links.append({"user": user, "link": dd_link})
|
||||
print("{}: {}".format(user, dd_link), flush=True)
|
||||
|
||||
@@ -2092,8 +2183,8 @@ def print_tg_info():
|
||||
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
|
||||
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
||||
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encodeded)
|
||||
params_encoded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encoded)
|
||||
proxy_links.append({"user": user, "link": tls_link})
|
||||
print("{}: {}".format(user, tls_link), flush=True)
|
||||
|
||||
@@ -2126,6 +2217,11 @@ def setup_files_limit():
|
||||
pass
|
||||
|
||||
|
||||
def setup_asyncio():
|
||||
# get rid of annoying "socket.send() raised exception" log messages
|
||||
asyncio.constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES = 100
|
||||
|
||||
|
||||
def setup_signals():
|
||||
if hasattr(signal, 'SIGUSR1'):
|
||||
def debug_signal(signum, frame):
|
||||
@@ -2137,6 +2233,7 @@ def setup_signals():
|
||||
if hasattr(signal, 'SIGUSR2'):
|
||||
def reload_signal(signum, frame):
|
||||
init_config()
|
||||
ensure_users_in_user_stats()
|
||||
apply_upstream_proxy_settings()
|
||||
print("Config reloaded", flush=True, file=sys.stderr)
|
||||
print_tg_info()
|
||||
@@ -2192,41 +2289,11 @@ def loop_exception_handler(loop, context):
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
|
||||
def main():
|
||||
setup_files_limit()
|
||||
setup_signals()
|
||||
try_setup_uvloop()
|
||||
|
||||
init_stats()
|
||||
init_proxy_start_time()
|
||||
|
||||
if sys.platform == "win32":
|
||||
loop = asyncio.ProactorEventLoop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_exception_handler(loop_exception_handler)
|
||||
|
||||
stats_printer_task = asyncio.Task(stats_printer())
|
||||
asyncio.ensure_future(stats_printer_task)
|
||||
|
||||
if config.USE_MIDDLE_PROXY:
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
if config.GET_TIME_PERIOD:
|
||||
time_get_task = asyncio.Task(get_srv_time())
|
||||
asyncio.ensure_future(time_get_task)
|
||||
|
||||
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
|
||||
asyncio.ensure_future(get_cert_len_task)
|
||||
|
||||
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
|
||||
asyncio.ensure_future(clear_resolving_cache_task)
|
||||
def create_servers(loop):
|
||||
servers = []
|
||||
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
has_unix = hasattr(socket, "AF_UNIX")
|
||||
servers = []
|
||||
|
||||
if config.LISTEN_ADDR_IPV4:
|
||||
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
|
||||
@@ -2255,18 +2322,80 @@ def main():
|
||||
config.METRICS_PORT)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
def create_utilitary_tasks(loop):
|
||||
tasks = []
|
||||
|
||||
stats_printer_task = asyncio.Task(stats_printer(), loop=loop)
|
||||
tasks.append(stats_printer_task)
|
||||
|
||||
if config.USE_MIDDLE_PROXY:
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info(), loop=loop)
|
||||
tasks.append(middle_proxy_updater_task)
|
||||
|
||||
if config.GET_TIME_PERIOD:
|
||||
time_get_task = asyncio.Task(get_srv_time(), loop=loop)
|
||||
tasks.append(time_get_task)
|
||||
|
||||
get_cert_len_task = asyncio.Task(get_mask_host_cert_len(), loop=loop)
|
||||
tasks.append(get_cert_len_task)
|
||||
|
||||
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache(), loop=loop)
|
||||
tasks.append(clear_resolving_cache_task)
|
||||
|
||||
return tasks
|
||||
|
||||
|
||||
def main():
|
||||
init_config()
|
||||
ensure_users_in_user_stats()
|
||||
apply_upstream_proxy_settings()
|
||||
init_ip_info()
|
||||
print_tg_info()
|
||||
|
||||
setup_asyncio()
|
||||
setup_files_limit()
|
||||
setup_signals()
|
||||
try_setup_uvloop()
|
||||
|
||||
init_proxy_start_time()
|
||||
|
||||
if sys.platform == "win32":
|
||||
loop = asyncio.ProactorEventLoop()
|
||||
else:
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.set_exception_handler(loop_exception_handler)
|
||||
|
||||
utilitary_tasks = create_utilitary_tasks(loop)
|
||||
for task in utilitary_tasks:
|
||||
asyncio.ensure_future(task)
|
||||
|
||||
servers = create_servers(loop)
|
||||
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
for task in asyncio.Task.all_tasks():
|
||||
if hasattr(asyncio, "all_tasks"):
|
||||
tasks = asyncio.all_tasks(loop)
|
||||
else:
|
||||
# for compatibility with Python 3.6
|
||||
tasks = asyncio.Task.all_tasks(loop)
|
||||
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
for server in servers:
|
||||
server.close()
|
||||
loop.run_until_complete(server.wait_closed())
|
||||
|
||||
has_unix = hasattr(socket, "AF_UNIX")
|
||||
|
||||
if config.LISTEN_UNIX_SOCK and has_unix:
|
||||
remove_unix_socket(config.LISTEN_UNIX_SOCK)
|
||||
|
||||
@@ -2274,8 +2403,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_config()
|
||||
apply_upstream_proxy_settings()
|
||||
init_ip_info()
|
||||
print_tg_info()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user