30 Commits

Author SHA1 Message Date
Alexander Bersenev
446682521b use newer Ubuntu and Python in Dockerfile 2022-05-09 00:05:24 +05:00
Alexander Bersenev
b26873176a compat with python3.10 2022-05-09 00:00:14 +05:00
Alexander Bersenev
6e8e8b63b2 add check if returned ipv6 address is correct 2021-03-02 17:04:54 +05:00
AlisonTNT
3b4f239cc1 Add Pysocks in Dockerfile (#239)
It's necessary if using upstream SOCKS5 proxy.
2020-09-29 18:29:49 +05:00
Erfan
0283d6264a set the home domain instead of IP (#231)
* set the home domain

set the home domain for the proxy, has an influence only on the log message

* fixed a bug
2020-08-09 13:51:43 +05:00
Erfan
15a8f607ca added ability to load time from local time set on original server (#230)
using this command the local time setting will be pushed to docker as a read-only file so you can use your local time
2020-06-20 15:59:25 +05:00
Alexander Bersenev
6076db9f8c add certificates to Dockerfile 2020-06-17 15:45:47 +05:00
Alexander Bersenev
6560a6c1d2 use new ubuntu 20.04 as base image 2020-04-28 16:20:33 +05:00
Peter Dave Hello
24479e68ab Add --no-install-recommends to apt-get in Dockerfile (#221)
Prevent to install unnecessary recommended packages, make it lighter.
2020-04-28 15:25:38 +05:00
Peter Dave Hello
6ecf0ec9ac Refactor Dockerfile with less layer and improved layer cache (#220) 2020-04-27 17:22:57 +05:00
Allineer
18a80e52cd metrics: broken pipe fix (#210)
I think, this fixes my issue #208.
2020-03-29 22:18:33 +05:00
Alexander Bersenev
ea3b8a44c3 restrict the logs size with 100MB 2020-03-17 03:11:13 +05:00
Alexander Bersenev
37d570f8dc handle unknown ips, e.g. from unix sockets 2020-03-05 13:55:10 +05:00
Alexander Bersenev
8f48e9ef65 fix the missing constant 2020-02-27 19:17:15 +05:00
Alexander Bersenev
76bc2253eb small code style fixes 2020-02-25 20:41:08 +05:00
Alexander Bersenev
07bd9b795a handle bad secrets in configs 2020-02-25 02:41:49 +05:00
Alexander Bersenev
1cad031947 make dataflow functions top level functions for better speed and memory consumption 2020-02-23 03:03:15 +05:00
Alexander Bersenev
923dac842b handle broken pipe error 2020-02-17 12:18:13 +05:00
Alexander Bersenev
1a63fdae11 add an option to ignore time skew 2020-02-15 17:12:15 +05:00
Alexander Bersenev
c7b6dcf3c2 save the utilitary task as a variable to prevent early garbage collecting 2020-02-14 18:58:25 +05:00
Alexander Bersenev
a95b1ec3c1 fix typo 2020-02-13 18:21:09 +05:00
Alexander Bersenev
49bc5d1f3b get rid of "socket.send() raised exception" messages 2020-02-13 18:14:37 +05:00
Alexander Bersenev
c2414c3487 simplify dockerfile 2020-02-13 15:08:01 +05:00
Alexander Bersenev
8b26cc843d catch IncompleteReadError while handling a bad client 2020-02-13 14:22:32 +05:00
Alexander Bersenev
639dea5e8d use debian image by default 2020-02-13 04:13:49 +05:00
Alexander Bersenev
c48cacce83 add statisctics about up/down traffic 2020-02-12 16:28:18 +05:00
Alexander Bersenev
2bb0ef0b1f simplify initialization and stats 2020-02-12 15:41:05 +05:00
Alexander Bersenev
f5ee5db86f use asyncio.all_tasks on new pythons 2020-02-11 19:10:43 +05:00
Peter Dave Hello
9c50cab94e Fix file permission in Docker image (#189)
`chown` needs to be done "after" the file copy, otherwise there is no
meaning to do it as /home/tgproxy is default owned by tgproxy already.
2020-02-11 13:50:48 +02:00
Boris Klimenko
199eaeb7c4 Alpine 3.11, Python 3.8 (#185) 2020-01-22 20:04:29 +02:00
3 changed files with 193 additions and 104 deletions

View File

@@ -1,15 +1,14 @@
FROM alpine:3.10
FROM ubuntu:22.04
RUN adduser tgproxy -u 10000 -D
RUN apt-get update && apt-get install --no-install-recommends -y python3 python3-uvloop python3-cryptography python3-socks libcap2-bin ca-certificates && rm -rf /var/lib/apt/lists/*
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.10
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
RUN chown -R tgproxy:tgproxy /home/tgproxy
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.7
COPY mtprotoproxy.py config.py /home/tgproxy/
RUN useradd tgproxy -u 10000
USER tgproxy
WORKDIR /home/tgproxy/
COPY --chown=tgproxy mtprotoproxy.py config.py /home/tgproxy/
CMD ["python3", "mtprotoproxy.py"]

View File

@@ -7,4 +7,10 @@ services:
volumes:
- ./config.py:/home/tgproxy/config.py
- ./mtprotoproxy.py:/home/tgproxy/mtprotoproxy.py
- /etc/localtime:/etc/localtime:ro
logging:
driver: "json-file"
options:
max-file: "10"
max-size: "10m"
# mem_limit: 1024m

View File

@@ -93,6 +93,9 @@ last_clients_with_same_handshake = collections.Counter()
proxy_start_time = 0
proxy_links = []
stats = collections.Counter()
user_stats = collections.defaultdict(collections.Counter)
config = {}
@@ -123,6 +126,15 @@ def init_config():
conf_dict.setdefault("USERS", {"tg": "00000000000000000000000000000000"})
conf_dict["AD_TAG"] = bytes.fromhex(conf_dict.get("AD_TAG", ""))
for user, secret in conf_dict["USERS"].items():
if not re.fullmatch("[0-9a-fA-F]{32}", secret):
fixed_secret = re.sub(r"[^0-9a-fA-F]", "", secret).zfill(32)[:32]
print_err("Bad secret for user %s, should be 32 hex chars, got %s. " % (user, secret))
print_err("Changing it to %s" % fixed_secret)
conf_dict["USERS"][user] = fixed_secret
# load advanced settings
# use middle proxy, necessary to show ad
@@ -184,6 +196,9 @@ def init_config():
# the next host to forward bad clients
conf_dict.setdefault("MASK_HOST", conf_dict["TLS_DOMAIN"])
# set the home domain for the proxy, has an influence only on the log message
conf_dict.setdefault("MY_DOMAIN", False)
# the next host's port to forward bad clients
conf_dict.setdefault("MASK_PORT", 443)
@@ -213,6 +228,9 @@ def init_config():
# length of used handshake randoms for active fingerprinting protection, zero to disable
conf_dict.setdefault("REPLAY_CHECK_LEN", 65536)
# accept clients with bad clocks. This reduces the protection against replay attacks
conf_dict.setdefault("IGNORE_TIME_SKEW", False)
# length of last client ip addresses for logging
conf_dict.setdefault("CLIENT_IPS_LEN", 131072)
@@ -380,12 +398,11 @@ def print_err(*params):
print(*params, file=sys.stderr, flush=True)
def init_stats():
global stats
def ensure_users_in_user_stats():
global user_stats
stats = collections.Counter()
user_stats = {user: collections.Counter() for user in config.USERS}
for user in config.USERS:
user_stats[user].update()
def init_proxy_start_time():
@@ -400,9 +417,6 @@ def update_stats(**kw_stats):
def update_user_stats(user, **kw_stats):
global user_stats
if user not in user_stats:
user_stats[user] = collections.Counter()
user_stats[user].update(**kw_stats)
@@ -982,6 +996,24 @@ def gen_x25519_public_key():
return int.to_bytes((n*n) % P, length=32, byteorder="little")
async def connect_reader_to_writer(reader, writer):
BUF_SIZE = 8192
try:
while True:
data = await reader.read(BUF_SIZE)
if not data:
if not writer.transport.is_closing():
writer.write_eof()
await writer.drain()
return
writer.write(data)
await writer.drain()
except (OSError, asyncio.IncompleteReadError) as e:
pass
async def handle_bad_client(reader_clt, writer_clt, handshake):
BUF_SIZE = 8192
CONNECT_TIMEOUT = 5
@@ -1001,22 +1033,6 @@ async def handle_bad_client(reader_clt, writer_clt, handshake):
pass
return
async def connect_reader_to_writer(reader, writer):
try:
while True:
data = await reader.read(BUF_SIZE)
if not data:
if not writer.transport.is_closing():
writer.write_eof()
await writer.drain()
return
writer.write(data)
await writer.drain()
except OSError:
pass
writer_srv = None
try:
host = mask_host_cached_ip or config.MASK_HOST
@@ -1088,14 +1104,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
tls_extensions = b"\x00\x2e" + b"\x00\x33\x00\x24" + b"\x00\x1d\x00\x20"
tls_extensions += gen_x25519_public_key() + b"\x00\x2b\x00\x02\x03\x04"
digest = handshake[DIGEST_POS: DIGEST_POS + DIGEST_LEN]
digest = handshake[DIGEST_POS:DIGEST_POS+DIGEST_LEN]
if digest[:DIGEST_HALFLEN] in used_handshakes:
last_clients_with_same_handshake[peer[0]] += 1
return False
sess_id_len = handshake[SESSION_ID_LEN_POS]
sess_id = handshake[SESSION_ID_POS: SESSION_ID_POS + sess_id_len]
sess_id = handshake[SESSION_ID_POS:SESSION_ID_POS+sess_id_len]
for user in config.USERS:
secret = bytes.fromhex(config.USERS[user])
@@ -1111,9 +1127,12 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
timestamp = int.from_bytes(xored_digest[-4:], "little")
client_time_is_ok = TIME_SKEW_MIN < time.time() - timestamp < TIME_SKEW_MAX
# some clients fail to read unix time and send the time since boot instead
client_time_is_small = timestamp < 60*60*24*1000
if not client_time_is_ok and not is_time_skewed and not client_time_is_small:
accept_bad_time = config.IGNORE_TIME_SKEW or is_time_skewed or client_time_is_small
if not client_time_is_ok and not accept_bad_time:
last_clients_with_time_skew[peer[0]] = (time.time() - timestamp) // 60
continue
@@ -1219,9 +1238,11 @@ async def handle_handshake(reader, writer):
return False
peer = writer.get_extra_info("peername")[:2]
if not peer:
peer = ("unknown ip", 0)
if config.PROXY_PROTOCOL:
ip = peer[0] if peer else "unknown address"
ip = peer[0] if peer else "unknown ip"
peer = await handle_proxy_protocol(reader, peer)
if not peer:
print_err("Client from %s sent bad proxy protocol headers" % ip)
@@ -1540,6 +1561,32 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
return reader_tgt, writer_tgt
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
try:
while True:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
extra = {}
if not data:
wr.write_eof()
await wr.drain()
return
else:
if is_upstream:
update_user_stats(user, octets_from_client=len(data), msgs_from_client=1)
else:
update_user_stats(user, octets_to_client=len(data), msgs_to_client=1)
wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.IncompleteReadError) as e:
# print_err(e)
pass
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), config.CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), config.CLIENT_ACK_TIMEOUT)
@@ -1602,29 +1649,10 @@ async def handle_client(reader_clt, writer_clt):
else:
return
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
try:
while True:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
extra = {}
if not data:
wr.write_eof()
await wr.drain()
return
else:
update_user_stats(user, octets=len(data), msgs=1)
wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.IncompleteReadError) as e:
# print_err(e)
pass
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize())
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
tg_to_clt = tg_connect_reader_to_writer(reader_tg, writer_clt, user,
get_to_clt_bufsize(), False)
clt_to_tg = tg_connect_reader_to_writer(reader_clt, writer_tg,
user, get_to_tg_bufsize(), True)
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
@@ -1642,7 +1670,8 @@ async def handle_client(reader_clt, writer_clt):
user_data_quota_hit = (
user in config.USER_DATA_QUOTA and
user_stats[user]["octets"] > config.USER_DATA_QUOTA[user]
(user_stats[user]["octets_to_client"] +
user_stats[user]["octets_from_client"] > config.USER_DATA_QUOTA[user])
)
if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit):
@@ -1663,7 +1692,7 @@ async def handle_client_wrapper(reader, writer):
await handle_client(reader, writer)
except (asyncio.IncompleteReadError, asyncio.CancelledError):
pass
except (ConnectionResetError, TimeoutError):
except (ConnectionResetError, TimeoutError, BrokenPipeError):
pass
except Exception:
traceback.print_exc()
@@ -1696,6 +1725,7 @@ def make_metrics_pkt(metrics):
pkt_header_list = []
pkt_header_list.append("HTTP/1.1 200 OK")
pkt_header_list.append("Connection: close")
pkt_header_list.append("Content-Length: %d" % len(pkt_body))
pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8")
pkt_header_list.append("Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
@@ -1749,13 +1779,25 @@ async def handle_metrics(reader, writer):
user_metrics_desc = [
["user_connects", "counter", "user connects", "connects"],
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
["user_octets", "counter", "octets proxied for user", "octets"],
["user_msgs", "counter", "msgs proxied for user", "msgs"],
["user_octets", "counter", "octets proxied for user",
"octets_from_client+octets_to_client"],
["user_msgs", "counter", "msgs proxied for user",
"msgs_from_client+msgs_to_client"],
["user_octets_from", "counter", "octets proxied from user", "octets_from_client"],
["user_octets_to", "counter", "octets proxied to user", "octets_to_client"],
["user_msgs_from", "counter", "msgs proxied from user", "msgs_from_client"],
["user_msgs_to", "counter", "msgs proxied to user", "msgs_to_client"],
]
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
for user, stat in user_stats.items():
metric = {"user": user, "val": stat[stat_key]}
if "+" in stat_key:
val = 0
for key_part in stat_key.split("+"):
val += stat[key_part]
else:
val = stat[stat_key]
metric = {"user": user, "val": val}
metrics.append([m_name, m_type, m_desc, metric])
pkt = make_metrics_pkt(metrics)
@@ -1781,7 +1823,8 @@ async def stats_printer():
for user, stat in user_stats.items():
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
user, stat["connects"], stat["curr_connects"],
stat["octets"] / 1000000, stat["msgs"]))
(stat["octets_from_client"] + stat["octets_to_client"]) / 1000000,
stat["msgs_from_client"] + stat["msgs_to_client"]))
print(flush=True)
if last_client_ips:
@@ -2043,6 +2086,10 @@ def init_ip_info():
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
# the server can return ipv4 address instead of ipv6
if my_ip_info["ipv6"] and ":" not in my_ip_info["ipv6"]:
my_ip_info["ipv6"] = None
if my_ip_info["ipv6"] and (config.PREFER_IPV6 or not my_ip_info["ipv4"]):
print_err("IPv6 found, using it for external communication")
@@ -2064,9 +2111,12 @@ def print_tg_info():
print("Since you have TLS only mode enabled the best port is 443", flush=True)
print_default_warning = True
ip_addrs = [ip for ip in my_ip_info.values() if ip]
if not ip_addrs:
ip_addrs = ["YOUR_IP"]
if not config.MY_DOMAIN:
ip_addrs = [ip for ip in my_ip_info.values() if ip]
if not ip_addrs:
ip_addrs = ["YOUR_IP"]
else:
ip_addrs = [config.MY_DOMAIN]
proxy_links = []
@@ -2126,6 +2176,11 @@ def setup_files_limit():
pass
def setup_asyncio():
# get rid of annoying "socket.send() raised exception" log messages
asyncio.constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES = 100
def setup_signals():
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
@@ -2137,6 +2192,7 @@ def setup_signals():
if hasattr(signal, 'SIGUSR2'):
def reload_signal(signum, frame):
init_config()
ensure_users_in_user_stats()
apply_upstream_proxy_settings()
print("Config reloaded", flush=True, file=sys.stderr)
print_tg_info()
@@ -2192,41 +2248,11 @@ def loop_exception_handler(loop, context):
loop.default_exception_handler(context)
def main():
setup_files_limit()
setup_signals()
try_setup_uvloop()
init_stats()
init_proxy_start_time()
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.set_exception_handler(loop_exception_handler)
stats_printer_task = asyncio.Task(stats_printer())
asyncio.ensure_future(stats_printer_task)
if config.USE_MIDDLE_PROXY:
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if config.GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
asyncio.ensure_future(time_get_task)
get_cert_len_task = asyncio.Task(get_mask_host_cert_len())
asyncio.ensure_future(get_cert_len_task)
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache())
asyncio.ensure_future(clear_resolving_cache_task)
def create_servers(loop):
servers = []
reuse_port = hasattr(socket, "SO_REUSEPORT")
has_unix = hasattr(socket, "AF_UNIX")
servers = []
if config.LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_client_wrapper, config.LISTEN_ADDR_IPV4, config.PORT,
@@ -2255,18 +2281,80 @@ def main():
config.METRICS_PORT)
servers.append(loop.run_until_complete(task))
return servers
def create_utilitary_tasks(loop):
tasks = []
stats_printer_task = asyncio.Task(stats_printer(), loop=loop)
tasks.append(stats_printer_task)
if config.USE_MIDDLE_PROXY:
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info(), loop=loop)
tasks.append(middle_proxy_updater_task)
if config.GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time(), loop=loop)
tasks.append(time_get_task)
get_cert_len_task = asyncio.Task(get_mask_host_cert_len(), loop=loop)
tasks.append(get_cert_len_task)
clear_resolving_cache_task = asyncio.Task(clear_ip_resolving_cache(), loop=loop)
tasks.append(clear_resolving_cache_task)
return tasks
def main():
init_config()
ensure_users_in_user_stats()
apply_upstream_proxy_settings()
init_ip_info()
print_tg_info()
setup_asyncio()
setup_files_limit()
setup_signals()
try_setup_uvloop()
init_proxy_start_time()
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_exception_handler(loop_exception_handler)
utilitary_tasks = create_utilitary_tasks(loop)
for task in utilitary_tasks:
asyncio.ensure_future(task)
servers = create_servers(loop)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
for task in asyncio.Task.all_tasks():
if hasattr(asyncio, "all_tasks"):
tasks = asyncio.all_tasks(loop)
else:
# for compatibility with Python 3.6
tasks = asyncio.Task.all_tasks(loop)
for task in tasks:
task.cancel()
for server in servers:
server.close()
loop.run_until_complete(server.wait_closed())
has_unix = hasattr(socket, "AF_UNIX")
if config.LISTEN_UNIX_SOCK and has_unix:
remove_unix_socket(config.LISTEN_UNIX_SOCK)
@@ -2274,8 +2362,4 @@ def main():
if __name__ == "__main__":
init_config()
apply_upstream_proxy_settings()
init_ip_info()
print_tg_info()
main()