mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-17 00:09:44 +00:00
Compare commits
1 Commits
v1.0.0
...
prometheus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fcc854e80 |
@@ -24,4 +24,3 @@ The proxy can be launched:
|
||||
- with a custom config: `python3 mtprotoproxy.py [configfile]`
|
||||
- several times, clients will be automaticaly balanced between instances
|
||||
- using *PyPy* interprteter
|
||||
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch
|
||||
|
||||
119
mtprotoproxy.py
119
mtprotoproxy.py
@@ -13,6 +13,7 @@ import sys
|
||||
import re
|
||||
import runpy
|
||||
import signal
|
||||
import http.server
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
@@ -148,12 +149,16 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
FAST_MODE = config.get("FAST_MODE", True)
|
||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
|
||||
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
|
||||
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
|
||||
# set it to false value to disable.
|
||||
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
|
||||
|
||||
TG_DATACENTER_PORT = 443
|
||||
|
||||
@@ -224,14 +229,14 @@ def init_stats():
|
||||
stats = {user: collections.Counter() for user in USERS}
|
||||
|
||||
|
||||
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
|
||||
def update_stats(user, connects=0, curr_connects=0, octets=0):
|
||||
global stats
|
||||
|
||||
if user not in stats:
|
||||
stats[user] = collections.Counter()
|
||||
|
||||
stats[user].update(connects=connects, curr_connects=curr_connects,
|
||||
octets=octets, msgs=msgs)
|
||||
octets=octets)
|
||||
|
||||
|
||||
class LayeredStreamReaderBase:
|
||||
@@ -581,11 +586,6 @@ def set_keepalive(sock, interval=40, attempts=5):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||
|
||||
|
||||
def set_ack_timeout(sock, timeout):
|
||||
if hasattr(socket, "TCP_USER_TIMEOUT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
|
||||
|
||||
|
||||
def set_bufsizes(sock, recv_buf, send_buf):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||
@@ -815,8 +815,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
|
||||
|
||||
async def handle_client(reader_clt, writer_clt):
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
|
||||
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
|
||||
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
||||
|
||||
try:
|
||||
@@ -882,7 +881,7 @@ async def handle_client(reader_clt, writer_clt):
|
||||
await wr.drain()
|
||||
return
|
||||
else:
|
||||
update_stats(user, octets=len(data), msgs=1)
|
||||
update_stats(user, octets=len(data))
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
||||
@@ -902,6 +901,68 @@ async def handle_client(reader_clt, writer_clt):
|
||||
task_clt_to_tg.cancel()
|
||||
|
||||
writer_tg.transport.abort()
|
||||
|
||||
|
||||
async def http_reply(writer, line, body=b"", eof=False):
|
||||
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
|
||||
msg = (
|
||||
"HTTP/1.1 {}\r\n"
|
||||
"Server: mtprotoproxy\r\n"
|
||||
"Date: {}\r\n"
|
||||
"Content-Type: text/plain\r\n"
|
||||
"Content-Length: {:d}\r\n"
|
||||
).format(
|
||||
line,
|
||||
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
|
||||
len(body)
|
||||
).encode("ascii")
|
||||
if eof:
|
||||
msg += b"Connection: close\r\n"
|
||||
msg += b"\r\n" + body
|
||||
writer.write(msg)
|
||||
await writer.drain()
|
||||
if eof:
|
||||
writer.write_eof()
|
||||
writer.close()
|
||||
|
||||
async def handle_promstats(reader, writer):
|
||||
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
|
||||
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
|
||||
return
|
||||
while True: # Keep-Alive
|
||||
request = await reader.readuntil(b"\r\n\r\n")
|
||||
if request.startswith(b"GET /metrics HTTP/1."):
|
||||
promstat = (
|
||||
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
|
||||
"# TYPE mtproxy_pump_bytes counter\n"
|
||||
) + "".join(
|
||||
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
|
||||
for u in stats
|
||||
) + (
|
||||
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
|
||||
"# TYPE mtproxy_connections gauge\n"
|
||||
) + "".join(
|
||||
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
|
||||
for u in stats
|
||||
) + (
|
||||
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
|
||||
"# TYPE mtproxy_connections_total counter\n"
|
||||
) + "".join(
|
||||
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
|
||||
for u in stats
|
||||
)
|
||||
await http_reply(writer, "200 OK", promstat.encode("ascii"))
|
||||
else:
|
||||
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
|
||||
return
|
||||
|
||||
async def handle_promstats_wrapper(reader, writer):
|
||||
try:
|
||||
await handle_promstats(reader, writer)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
||||
pass
|
||||
finally:
|
||||
writer.transport.abort()
|
||||
|
||||
|
||||
async def handle_client_wrapper(reader, writer):
|
||||
@@ -920,9 +981,9 @@ async def stats_printer():
|
||||
|
||||
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
||||
for user, stat in stats.items():
|
||||
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
||||
print("%s: %d connects (%d current), %.2f MB" % (
|
||||
user, stat["connects"], stat["curr_connects"],
|
||||
stat["octets"] / 1000000, stat["msgs"]))
|
||||
stat["octets"] / 1000000))
|
||||
print(flush=True)
|
||||
|
||||
|
||||
@@ -1061,24 +1122,15 @@ def loop_exception_handler(loop, context):
|
||||
if exception:
|
||||
if isinstance(exception, TimeoutError):
|
||||
if transport:
|
||||
print_err("Timeout, killing transport")
|
||||
transport.abort()
|
||||
return
|
||||
if isinstance(exception, OSError):
|
||||
IGNORE_ERRNO = {
|
||||
10038, # operation on non-socket on Windows, likely because fd == -1
|
||||
121, # the semaphore timeout period has expired on Windows
|
||||
}
|
||||
|
||||
FORCE_CLOSE_ERRNO = {
|
||||
113, # no route to host
|
||||
|
||||
10038 # operation on non-socket on Windows, likely because fd == -1
|
||||
}
|
||||
if exception.errno in IGNORE_ERRNO:
|
||||
return
|
||||
elif exception.errno in FORCE_CLOSE_ERRNO:
|
||||
if transport:
|
||||
transport.abort()
|
||||
return
|
||||
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
@@ -1100,6 +1152,17 @@ def main():
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
if PROMETHEUS_PORT:
|
||||
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
|
||||
limit=4096, # http request is quite small
|
||||
backlog=8, # there are few prometheus collectors
|
||||
reuse_address=True, # that's still server, TIME_WAIT should not block restart
|
||||
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
|
||||
loop=loop)
|
||||
server_promstats = loop.run_until_complete(task_promstats)
|
||||
else:
|
||||
server_promstats = None
|
||||
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
|
||||
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
|
||||
@@ -1125,6 +1188,10 @@ def main():
|
||||
server_v6.close()
|
||||
loop.run_until_complete(server_v6.wait_closed())
|
||||
|
||||
if server_promstats is not None:
|
||||
server_promstats.close()
|
||||
loop.run_until_complete(server_promstats.wait_closed())
|
||||
|
||||
loop.close()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user