mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37d570f8dc | ||
|
|
8f48e9ef65 | ||
|
|
76bc2253eb | ||
|
|
07bd9b795a | ||
|
|
1cad031947 | ||
|
|
923dac842b | ||
|
|
1a63fdae11 | ||
|
|
c7b6dcf3c2 | ||
|
|
a95b1ec3c1 | ||
|
|
49bc5d1f3b | ||
|
|
c2414c3487 | ||
|
|
8b26cc843d | ||
|
|
639dea5e8d | ||
|
|
c48cacce83 | ||
|
|
2bb0ef0b1f | ||
|
|
f5ee5db86f | ||
|
|
9c50cab94e | ||
|
|
199eaeb7c4 | ||
|
|
66ac871a74 | ||
|
|
c5344df0eb | ||
|
|
93ad268d48 | ||
|
|
1c29465b6e | ||
|
|
d41b4abf35 |
13
Dockerfile
13
Dockerfile
@@ -1,14 +1,15 @@
|
||||
FROM alpine:3.10
|
||||
FROM python:3.8-slim-buster
|
||||
|
||||
RUN adduser tgproxy -u 10000 -D
|
||||
RUN apt-get update && apt-get install -y libcap2-bin && rm -rf /var/lib/apt/lists/*
|
||||
RUN setcap cap_net_bind_service=+ep /usr/local/bin/python3.8
|
||||
|
||||
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
|
||||
|
||||
RUN chown -R tgproxy:tgproxy /home/tgproxy
|
||||
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.7
|
||||
RUN pip3 --no-cache-dir install cryptography uvloop
|
||||
|
||||
COPY mtprotoproxy.py config.py /home/tgproxy/
|
||||
|
||||
RUN useradd tgproxy -u 10000
|
||||
RUN chown -R tgproxy:tgproxy /home/tgproxy
|
||||
|
||||
USER tgproxy
|
||||
|
||||
WORKDIR /home/tgproxy/
|
||||
|
||||
@@ -20,6 +20,11 @@ To advertise a channel get a tag from **@MTProxybot** and put it to *config.py*.
|
||||
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
|
||||
the VDS instance with 1 CPU core and 1024MB RAM.
|
||||
|
||||
## More Instructions ##
|
||||
|
||||
- [Running without Docker](https://github.com/alexbers/mtprotoproxy/wiki/Running-Without-Docker)
|
||||
- [Optimization and fine tuning](https://github.com/alexbers/mtprotoproxy/wiki/Optimization-and-Fine-Tuning)
|
||||
|
||||
## Advanced Usage ##
|
||||
|
||||
The proxy can be launched:
|
||||
|
||||
21
config.py
21
config.py
@@ -3,18 +3,23 @@ PORT = 443
|
||||
# name -> secret (32 hex chars)
|
||||
USERS = {
|
||||
"tg": "00000000000000000000000000000001",
|
||||
# "tg2": "0123456789abcdef0123456789abcdef",
|
||||
# "tg2": "0123456789abcdef0123456789abcdef",
|
||||
}
|
||||
|
||||
# Makes the proxy harder to detect
|
||||
# Can be incompatible with very old clients
|
||||
SECURE_ONLY = True
|
||||
MODES = {
|
||||
# Classic mode, easy to detect
|
||||
"classic": False,
|
||||
|
||||
# Makes the proxy even more hard to detect
|
||||
# Compatible only with the recent clients
|
||||
TLS_ONLY = True
|
||||
# Makes the proxy harder to detect
|
||||
# Can be incompatible with very old clients
|
||||
"secure": False,
|
||||
|
||||
# The domain for TLS, bad clients are proxied there
|
||||
# Makes the proxy even more hard to detect
|
||||
# Can be incompatible with old clients
|
||||
"tls": True
|
||||
}
|
||||
|
||||
# The domain for TLS mode, bad clients are proxied there
|
||||
# Use random existing domain, proxy checks it on start
|
||||
# TLS_DOMAIN = "www.google.com"
|
||||
|
||||
|
||||
405
mtprotoproxy.py
405
mtprotoproxy.py
@@ -82,16 +82,20 @@ STAT_DURATION_BUCKETS = [0.1, 0.5, 1, 2, 5, 15, 60, 300, 600, 1800, 2**31 - 1]
|
||||
|
||||
my_ip_info = {"ipv4": None, "ipv6": None}
|
||||
used_handshakes = collections.OrderedDict()
|
||||
client_ips = collections.OrderedDict()
|
||||
last_client_ips = {}
|
||||
disable_middle_proxy = False
|
||||
is_time_skewed = False
|
||||
fake_cert_len = random.randrange(1024, 4096)
|
||||
mask_host_cached_ip = None
|
||||
last_clients_with_time_skew = {}
|
||||
last_clients_with_first_pkt_error = collections.Counter()
|
||||
last_clients_with_same_handshake = collections.Counter()
|
||||
proxy_start_time = 0
|
||||
proxy_links = []
|
||||
|
||||
stats = collections.Counter()
|
||||
user_stats = collections.defaultdict(collections.Counter)
|
||||
|
||||
config = {}
|
||||
|
||||
|
||||
@@ -109,12 +113,12 @@ def init_config():
|
||||
conf_dict["PORT"] = int(sys.argv[1])
|
||||
secrets = sys.argv[2].split(",")
|
||||
conf_dict["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
||||
conf_dict["MODES"] = {"classic": False, "secure": True, "tls": True}
|
||||
if len(sys.argv) > 3:
|
||||
conf_dict["AD_TAG"] = sys.argv[3]
|
||||
if len(sys.argv) > 4:
|
||||
conf_dict["TLS_DOMAIN"] = sys.argv[4]
|
||||
conf_dict["TLS_ONLY"] = True
|
||||
conf_dict["SECURE_ONLY"] = True
|
||||
conf_dict["MODES"] = {"classic": False, "secure": False, "tls": True}
|
||||
|
||||
conf_dict = {k: v for k, v in conf_dict.items() if k.isupper()}
|
||||
|
||||
@@ -122,6 +126,15 @@ def init_config():
|
||||
conf_dict.setdefault("USERS", {"tg": "00000000000000000000000000000000"})
|
||||
conf_dict["AD_TAG"] = bytes.fromhex(conf_dict.get("AD_TAG", ""))
|
||||
|
||||
for user, secret in conf_dict["USERS"].items():
|
||||
if not re.fullmatch("[0-9a-fA-F]{32}", secret):
|
||||
fixed_secret = re.sub(r"[^0-9a-fA-F]", "", secret).zfill(32)[:32]
|
||||
|
||||
print_err("Bad secret for user %s, should be 32 hex chars, got %s. " % (user, secret))
|
||||
print_err("Changing it to %s" % fixed_secret)
|
||||
|
||||
conf_dict["USERS"][user] = fixed_secret
|
||||
|
||||
# load advanced settings
|
||||
|
||||
# use middle proxy, necessary to show ad
|
||||
@@ -133,11 +146,43 @@ def init_config():
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
conf_dict.setdefault("FAST_MODE", True)
|
||||
|
||||
# doesn't allow to connect in not-secure mode
|
||||
conf_dict.setdefault("SECURE_ONLY", False)
|
||||
# enables some working modes
|
||||
modes = conf_dict.get("MODES", {})
|
||||
|
||||
# allows to connect in tls mode only
|
||||
conf_dict.setdefault("TLS_ONLY", False)
|
||||
if "MODES" not in conf_dict:
|
||||
modes.setdefault("classic", True)
|
||||
modes.setdefault("secure", True)
|
||||
modes.setdefault("tls", True)
|
||||
else:
|
||||
modes.setdefault("classic", False)
|
||||
modes.setdefault("secure", False)
|
||||
modes.setdefault("tls", False)
|
||||
|
||||
legacy_warning = False
|
||||
if "SECURE_ONLY" in conf_dict:
|
||||
legacy_warning = True
|
||||
modes["classic"] = not bool(conf_dict["SECURE_ONLY"])
|
||||
|
||||
if "TLS_ONLY" in conf_dict:
|
||||
legacy_warning = True
|
||||
if conf_dict["TLS_ONLY"]:
|
||||
modes["classic"] = False
|
||||
modes["secure"] = False
|
||||
|
||||
if not modes["classic"] and not modes["secure"] and not modes["tls"]:
|
||||
print_err("No known modes enabled, enabling tls-only mode")
|
||||
modes["tls"] = True
|
||||
|
||||
if legacy_warning:
|
||||
print_err("Legacy options SECURE_ONLY or TLS_ONLY detected")
|
||||
print_err("Please use MODES in your config instead:")
|
||||
print_err("MODES = {")
|
||||
print_err(' "classic": %s,' % modes["classic"])
|
||||
print_err(' "secure": %s,' % modes["secure"])
|
||||
print_err(' "tls": %s' % modes["tls"])
|
||||
print_err("}")
|
||||
|
||||
conf_dict["MODES"] = modes
|
||||
|
||||
# accept incoming connections only with proxy protocol v1/v2, useful for nginx and haproxy
|
||||
conf_dict.setdefault("PROXY_PROTOCOL", False)
|
||||
@@ -180,8 +225,11 @@ def init_config():
|
||||
# length of used handshake randoms for active fingerprinting protection, zero to disable
|
||||
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
|
||||
|
||||
# block bad first packets to even more protect against replay-based fingerprinting
|
||||
conf_dict.setdefault("BLOCK_IF_FIRST_PKT_BAD", not conf_dict["TLS_ONLY"])
|
||||
# accept clients with bad clocks. This reduces the protection against replay attacks
|
||||
conf_dict.setdefault("IGNORE_TIME_SKEW", False)
|
||||
|
||||
# length of last client ip addresses for logging
|
||||
conf_dict.setdefault("CLIENT_IPS_LEN", 131072)
|
||||
|
||||
# delay in seconds between stats printing
|
||||
conf_dict.setdefault("STATS_PRINT_PERIOD", 600)
|
||||
@@ -347,12 +395,11 @@ def print_err(*params):
|
||||
print(*params, file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def init_stats():
|
||||
global stats
|
||||
def ensure_users_in_user_stats():
|
||||
global user_stats
|
||||
|
||||
stats = collections.Counter()
|
||||
user_stats = {user: collections.Counter() for user in config.USERS}
|
||||
for user in config.USERS:
|
||||
user_stats[user].update()
|
||||
|
||||
|
||||
def init_proxy_start_time():
|
||||
@@ -367,9 +414,6 @@ def update_stats(**kw_stats):
|
||||
|
||||
def update_user_stats(user, **kw_stats):
|
||||
global user_stats
|
||||
|
||||
if user not in user_stats:
|
||||
user_stats[user] = collections.Counter()
|
||||
user_stats[user].update(**kw_stats)
|
||||
|
||||
|
||||
@@ -949,6 +993,24 @@ def gen_x25519_public_key():
|
||||
return int.to_bytes((n*n) % P, length=32, byteorder="little")
|
||||
|
||||
|
||||
async def connect_reader_to_writer(reader, writer):
|
||||
BUF_SIZE = 8192
|
||||
try:
|
||||
while True:
|
||||
data = await reader.read(BUF_SIZE)
|
||||
|
||||
if not data:
|
||||
if not writer.transport.is_closing():
|
||||
writer.write_eof()
|
||||
await writer.drain()
|
||||
return
|
||||
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
pass
|
||||
|
||||
|
||||
async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
BUF_SIZE = 8192
|
||||
CONNECT_TIMEOUT = 5
|
||||
@@ -968,22 +1030,6 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
pass
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(reader, writer):
|
||||
try:
|
||||
while True:
|
||||
data = await reader.read(BUF_SIZE)
|
||||
|
||||
if not data:
|
||||
if not writer.transport.is_closing():
|
||||
writer.write_eof()
|
||||
await writer.drain()
|
||||
return
|
||||
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
writer_srv = None
|
||||
try:
|
||||
host = mask_host_cached_ip or config.MASK_HOST
|
||||
@@ -1031,6 +1077,8 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
|
||||
|
||||
async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
global used_handshakes
|
||||
global client_ips
|
||||
global last_client_ips
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_same_handshake
|
||||
global fake_cert_len
|
||||
@@ -1053,14 +1101,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
tls_extensions = b"\x00\x2e" + b"\x00\x33\x00\x24" + b"\x00\x1d\x00\x20"
|
||||
tls_extensions += gen_x25519_public_key() + b"\x00\x2b\x00\x02\x03\x04"
|
||||
|
||||
digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN]
|
||||
digest = handshake[DIGEST_POS:DIGEST_POS+DIGEST_LEN]
|
||||
|
||||
if digest[:DIGEST_HALFLEN] in used_handshakes:
|
||||
last_clients_with_same_handshake[peer[0]] += 1
|
||||
return False
|
||||
|
||||
sess_id_len = handshake[SESSION_ID_LEN_POS]
|
||||
sess_id = handshake[SESSION_ID_POS: SESSION_ID_POS + sess_id_len]
|
||||
sess_id = handshake[SESSION_ID_POS:SESSION_ID_POS+sess_id_len]
|
||||
|
||||
for user in config.USERS:
|
||||
secret = bytes.fromhex(config.USERS[user])
|
||||
@@ -1076,9 +1124,12 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
|
||||
timestamp = int.from_bytes(xored_digest[-4:], "little")
|
||||
client_time_is_ok = TIME_SKEW_MIN < time.time() - timestamp < TIME_SKEW_MAX
|
||||
|
||||
# some clients fail to read unix time and send the time since boot instead
|
||||
client_time_is_small = timestamp < 60*60*24*1000
|
||||
if not client_time_is_ok and not is_time_skewed and not client_time_is_small:
|
||||
accept_bad_time = config.IGNORE_TIME_SKEW or is_time_skewed or client_time_is_small
|
||||
|
||||
if not client_time_is_ok and not accept_bad_time:
|
||||
last_clients_with_time_skew[peer[0]] = (time.time() - timestamp) // 60
|
||||
continue
|
||||
|
||||
@@ -1103,6 +1154,13 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
used_handshakes.popitem(last=False)
|
||||
used_handshakes[digest[:DIGEST_HALFLEN]] = True
|
||||
|
||||
if config.CLIENT_IPS_LEN > 0:
|
||||
while len(client_ips) >= config.CLIENT_IPS_LEN:
|
||||
client_ips.popitem(last=False)
|
||||
if peer[0] not in client_ips:
|
||||
client_ips[peer[0]] = True
|
||||
last_client_ips[peer[0]] = True
|
||||
|
||||
reader = FakeTLSStreamReader(reader)
|
||||
writer = FakeTLSStreamWriter(writer)
|
||||
return reader, writer
|
||||
@@ -1167,6 +1225,8 @@ async def handle_proxy_protocol(reader, peer=None):
|
||||
|
||||
async def handle_handshake(reader, writer):
|
||||
global used_handshakes
|
||||
global client_ips
|
||||
global last_client_ips
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
TLS_START_BYTES = b"\x16\x03\x01\x02\x00\x01\x00\x01\xfc\x03\x03"
|
||||
@@ -1175,9 +1235,11 @@ async def handle_handshake(reader, writer):
|
||||
return False
|
||||
|
||||
peer = writer.get_extra_info("peername")[:2]
|
||||
if not peer:
|
||||
peer = ("unknown ip", 0)
|
||||
|
||||
if config.PROXY_PROTOCOL:
|
||||
ip = peer[0] if peer else "unknown address"
|
||||
ip = peer[0] if peer else "unknown ip"
|
||||
peer = await handle_proxy_protocol(reader, peer)
|
||||
if not peer:
|
||||
print_err("Client from %s sent bad proxy protocol headers" % ip)
|
||||
@@ -1202,7 +1264,7 @@ async def handle_handshake(reader, writer):
|
||||
reader, writer = tls_handshake_result
|
||||
handshake = await reader.readexactly(HANDSHAKE_LEN)
|
||||
else:
|
||||
if config.TLS_ONLY:
|
||||
if not config.MODES["classic"] and not config.MODES["secure"]:
|
||||
await handle_bad_client(reader, writer, handshake)
|
||||
return False
|
||||
handshake += await reader.readexactly(HANDSHAKE_LEN - len(handshake))
|
||||
@@ -1232,8 +1294,14 @@ async def handle_handshake(reader, writer):
|
||||
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
||||
continue
|
||||
|
||||
if config.SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
|
||||
continue
|
||||
if proto_tag == PROTO_TAG_SECURE:
|
||||
if is_tls_handshake and not config.MODES["tls"]:
|
||||
continue
|
||||
if not is_tls_handshake and not config.MODES["secure"]:
|
||||
continue
|
||||
else:
|
||||
if not config.MODES["classic"]:
|
||||
continue
|
||||
|
||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||
|
||||
@@ -1242,6 +1310,13 @@ async def handle_handshake(reader, writer):
|
||||
used_handshakes.popitem(last=False)
|
||||
used_handshakes[dec_prekey_and_iv] = True
|
||||
|
||||
if config.CLIENT_IPS_LEN > 0:
|
||||
while len(client_ips) >= config.CLIENT_IPS_LEN:
|
||||
client_ips.popitem(last=False)
|
||||
if peer[0] not in client_ips:
|
||||
client_ips[peer[0]] = True
|
||||
last_client_ips[peer[0]] = True
|
||||
|
||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
||||
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv, peer
|
||||
@@ -1483,6 +1558,32 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
|
||||
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(rd_buf_size)
|
||||
if isinstance(data, tuple):
|
||||
data, extra = data
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
else:
|
||||
if is_upstream:
|
||||
update_user_stats(user, octets_from_client=len(data), msgs_from_client=1)
|
||||
else:
|
||||
update_user_stats(user, octets_to_client=len(data), msgs_to_client=1)
|
||||
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
|
||||
async def handle_client(reader_clt, writer_clt):
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), config.CLIENT_KEEPALIVE, attempts=3)
|
||||
set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT)
|
||||
@@ -1545,44 +1646,10 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_if_first_pkt_bad=False):
|
||||
global last_clients_with_first_pkt_error
|
||||
is_first_pkt = True
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(rd_buf_size)
|
||||
if isinstance(data, tuple):
|
||||
data, extra = data
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
# protection against replay-based fingerprinting
|
||||
if is_first_pkt:
|
||||
is_first_pkt = False
|
||||
|
||||
ERR_PKT_DATA = b'l\xfe\xff\xff'
|
||||
if block_if_first_pkt_bad and data == ERR_PKT_DATA:
|
||||
last_clients_with_first_pkt_error[cl_ip] += 1
|
||||
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
else:
|
||||
update_user_stats(user, octets=len(data), msgs=1)
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.IncompleteReadError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
|
||||
block_if_first_pkt_bad=config.BLOCK_IF_FIRST_PKT_BAD)
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
|
||||
tg_to_clt = tg_connect_reader_to_writer(reader_tg, writer_clt, user,
|
||||
get_to_clt_bufsize(), False)
|
||||
clt_to_tg = tg_connect_reader_to_writer(reader_clt, writer_tg,
|
||||
user, get_to_tg_bufsize(), True)
|
||||
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
||||
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
||||
|
||||
@@ -1600,7 +1667,8 @@ async def handle_client(reader_clt, writer_clt):
|
||||
|
||||
user_data_quota_hit = (
|
||||
user in config.USER_DATA_QUOTA and
|
||||
user_stats[user]["octets"] > config.USER_DATA_QUOTA[user]
|
||||
(user_stats[user]["octets_to_client"] +
|
||||
user_stats[user]["octets_from_client"] > config.USER_DATA_QUOTA[user])
|
||||
)
|
||||
|
||||
if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit):
|
||||
@@ -1621,7 +1689,7 @@ async def handle_client_wrapper(reader, writer):
|
||||
await handle_client(reader, writer)
|
||||
except (asyncio.IncompleteReadError, asyncio.CancelledError):
|
||||
pass
|
||||
except (ConnectionResetError, TimeoutError):
|
||||
except (ConnectionResetError, TimeoutError, BrokenPipeError):
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
@@ -1671,7 +1739,6 @@ async def handle_metrics(reader, writer):
|
||||
global proxy_start_time
|
||||
global proxy_links
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_first_pkt_error
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
client_ip = writer.get_extra_info("peername")[0]
|
||||
@@ -1708,13 +1775,25 @@ async def handle_metrics(reader, writer):
|
||||
user_metrics_desc = [
|
||||
["user_connects", "counter", "user connects", "connects"],
|
||||
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
|
||||
["user_octets", "counter", "octets proxied for user", "octets"],
|
||||
["user_msgs", "counter", "msgs proxied for user", "msgs"],
|
||||
["user_octets", "counter", "octets proxied for user",
|
||||
"octets_from_client+octets_to_client"],
|
||||
["user_msgs", "counter", "msgs proxied for user",
|
||||
"msgs_from_client+msgs_to_client"],
|
||||
["user_octets_from", "counter", "octets proxied from user", "octets_from_client"],
|
||||
["user_octets_to", "counter", "octets proxied to user", "octets_to_client"],
|
||||
["user_msgs_from", "counter", "msgs proxied from user", "msgs_from_client"],
|
||||
["user_msgs_to", "counter", "msgs proxied to user", "msgs_to_client"],
|
||||
]
|
||||
|
||||
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
|
||||
for user, stat in user_stats.items():
|
||||
metric = {"user": user, "val": stat[stat_key]}
|
||||
if "+" in stat_key:
|
||||
val = 0
|
||||
for key_part in stat_key.split("+"):
|
||||
val += stat[key_part]
|
||||
else:
|
||||
val = stat[stat_key]
|
||||
metric = {"user": user, "val": val}
|
||||
metrics.append([m_name, m_type, m_desc, metric])
|
||||
|
||||
pkt = make_metrics_pkt(metrics)
|
||||
@@ -1729,8 +1808,8 @@ async def handle_metrics(reader, writer):
|
||||
|
||||
async def stats_printer():
|
||||
global user_stats
|
||||
global last_client_ips
|
||||
global last_clients_with_time_skew
|
||||
global last_clients_with_first_pkt_error
|
||||
global last_clients_with_same_handshake
|
||||
|
||||
while True:
|
||||
@@ -1740,21 +1819,23 @@ async def stats_printer():
|
||||
for user, stat in user_stats.items():
|
||||
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
||||
user, stat["connects"], stat["curr_connects"],
|
||||
stat["octets"] / 1000000, stat["msgs"]))
|
||||
(stat["octets_from_client"] + stat["octets_to_client"]) / 1000000,
|
||||
stat["msgs_from_client"] + stat["msgs_to_client"]))
|
||||
print(flush=True)
|
||||
|
||||
if last_client_ips:
|
||||
print("New IPs:")
|
||||
for ip in last_client_ips:
|
||||
print(ip)
|
||||
print(flush=True)
|
||||
last_client_ips.clear()
|
||||
|
||||
if last_clients_with_time_skew:
|
||||
print("Clients with time skew (possible replay-attackers):")
|
||||
for ip, skew_minutes in last_clients_with_time_skew.items():
|
||||
print("%s, clocks were %d minutes behind" % (ip, skew_minutes))
|
||||
print(flush=True)
|
||||
last_clients_with_time_skew.clear()
|
||||
if last_clients_with_first_pkt_error:
|
||||
print("Clients with error on the first packet (possible replay-attackers):")
|
||||
for ip, times in last_clients_with_first_pkt_error.items():
|
||||
print("%s, %d times" % (ip, times))
|
||||
print(flush=True)
|
||||
last_clients_with_first_pkt_error.clear()
|
||||
if last_clients_with_same_handshake:
|
||||
print("Clients with duplicate handshake (likely replay-attackers):")
|
||||
for ip, times in last_clients_with_same_handshake.items():
|
||||
@@ -2018,7 +2099,7 @@ def print_tg_info():
|
||||
|
||||
if config.PORT == 3256:
|
||||
print("The default port 3256 is used, this is not recommended", flush=True)
|
||||
if config.TLS_ONLY:
|
||||
if not config.MODES["classic"] and not config.MODES["secure"]:
|
||||
print("Since you have TLS only mode enabled the best port is 443", flush=True)
|
||||
print_default_warning = True
|
||||
|
||||
@@ -2030,29 +2111,30 @@ def print_tg_info():
|
||||
|
||||
for user, secret in sorted(config.USERS.items(), key=lambda x: x[0]):
|
||||
for ip in ip_addrs:
|
||||
if not config.TLS_ONLY:
|
||||
if not config.SECURE_ONLY:
|
||||
params = {"server": ip, "port": config.PORT, "secret": secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": classic_link})
|
||||
print("{}: {}".format(user, classic_link), flush=True)
|
||||
if config.MODES["classic"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
classic_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": classic_link})
|
||||
print("{}: {}".format(user, classic_link), flush=True)
|
||||
|
||||
if config.MODES["secure"]:
|
||||
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
dd_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": dd_link})
|
||||
print("{}: {}".format(user, dd_link), flush=True)
|
||||
|
||||
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
|
||||
# the base64 links is buggy on ios
|
||||
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
|
||||
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
||||
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": tls_link})
|
||||
print("{}: {}".format(user, tls_link), flush=True)
|
||||
if config.MODES["tls"]:
|
||||
tls_secret = "ee" + secret + config.TLS_DOMAIN.encode().hex()
|
||||
# the base64 links is buggy on ios
|
||||
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
|
||||
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
|
||||
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
tls_link = "tg://proxy?{}".format(params_encodeded)
|
||||
proxy_links.append({"user": user, "link": tls_link})
|
||||
print("{}: {}".format(user, tls_link), flush=True)
|
||||
|
||||
if secret in ["00000000000000000000000000000000", "0123456789abcdef0123456789abcdef",
|
||||
"00000000000000000000000000000001"]:
|
||||
@@ -2083,6 +2165,11 @@ def setup_files_limit():
|
||||
pass
|
||||
|
||||
|
||||
def setup_asyncio():
|
||||
# get rid of annoying "socket.send() raised exception" log messages
|
||||
asyncio.constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES = 100
|
||||
|
||||
|
||||
def setup_signals():
|
||||
if hasattr(signal, 'SIGUSR1'):
|
||||
def debug_signal(signum, frame):
|
||||
@@ -2094,6 +2181,7 @@ def setup_signals():
|
||||
if hasattr(signal, 'SIGUSR2'):
|
||||
def reload_signal(signum, frame):
|
||||
init_config()
|
||||
ensure_users_in_user_stats()
|
||||
apply_upstream_proxy_settings()
|
||||
print("Config reloaded", flush=True, file=sys.stderr)
|
||||
print_tg_info()
|
||||
@@ -2149,41 +2237,11 @@ def loop_exception_handler(loop, context):
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
|
||||
def main():
|
||||
setup_files_limit()
|
||||
setup_signals()
|
||||
try_setup_uvloop()
|
||||
|
||||
init_stats()
|
||||
init_proxy_start_time()
|
||||
|
||||
if sys.platform == "win32":
|
||||
loop = asyncio.ProactorEventLoop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_exception_handler(loop_exception_handler)
|
||||
|
||||
stats_printer_task = asyncio.Task(stats_printer())
|
||||
asyncio.ensure_future(stats_printer_task)
|
||||
|
||||
if config.USE_MIDDLE_PROXY:
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
if config.GET_TIME_PERIOD:
|
||||
time_get_task = asyncio.Task(get_srv_time())
|
||||
asyncio.ensure_future(time_get_task)
|
||||
|
||||
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
|
||||
asyncio.ensure_future(get_cert_len_task)
|
||||
|
||||
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
|
||||
asyncio.ensure_future(clear_resolving_cache_task)
|
||||
def create_servers(loop):
|
||||
servers = []
|
||||
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
has_unix = hasattr(socket, "AF_UNIX")
|
||||
servers = []
|
||||
|
||||
if config.LISTEN_ADDR_IPV4:
|
||||
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
|
||||
@@ -2212,18 +2270,79 @@ def main():
|
||||
config.METRICS_PORT)
|
||||
servers.append(loop.run_until_complete(task))
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
def create_utilitary_tasks(loop):
|
||||
tasks = []
|
||||
|
||||
stats_printer_task = asyncio.Task(stats_printer())
|
||||
tasks.append(stats_printer_task)
|
||||
|
||||
if config.USE_MIDDLE_PROXY:
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
tasks.append(middle_proxy_updater_task)
|
||||
|
||||
if config.GET_TIME_PERIOD:
|
||||
time_get_task = asyncio.Task(get_srv_time())
|
||||
tasks.append(time_get_task)
|
||||
|
||||
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
|
||||
tasks.append(get_cert_len_task)
|
||||
|
||||
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
|
||||
tasks.append(clear_resolving_cache_task)
|
||||
|
||||
return tasks
|
||||
|
||||
|
||||
def main():
|
||||
init_config()
|
||||
ensure_users_in_user_stats()
|
||||
apply_upstream_proxy_settings()
|
||||
init_ip_info()
|
||||
print_tg_info()
|
||||
|
||||
setup_asyncio()
|
||||
setup_files_limit()
|
||||
setup_signals()
|
||||
try_setup_uvloop()
|
||||
|
||||
init_proxy_start_time()
|
||||
|
||||
if sys.platform == "win32":
|
||||
loop = asyncio.ProactorEventLoop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_exception_handler(loop_exception_handler)
|
||||
|
||||
utilitary_tasks = create_utilitary_tasks(loop)
|
||||
for task in utilitary_tasks:
|
||||
asyncio.ensure_future(task)
|
||||
|
||||
servers = create_servers(loop)
|
||||
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
for task in asyncio.Task.all_tasks():
|
||||
if hasattr(asyncio, "all_tasks"):
|
||||
tasks = asyncio.all_tasks(loop)
|
||||
else:
|
||||
# for compatibility with Python 3.6
|
||||
tasks = asyncio.Task.all_tasks(loop)
|
||||
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
for server in servers:
|
||||
server.close()
|
||||
loop.run_until_complete(server.wait_closed())
|
||||
|
||||
has_unix = hasattr(socket, "AF_UNIX")
|
||||
|
||||
if config.LISTEN_UNIX_SOCK and has_unix:
|
||||
remove_unix_socket(config.LISTEN_UNIX_SOCK)
|
||||
|
||||
@@ -2231,8 +2350,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_config()
|
||||
apply_upstream_proxy_settings()
|
||||
init_ip_info()
|
||||
print_tg_info()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user