diff --git a/mtprotoproxy.py b/mtprotoproxy.py index d9b6c42..d68c922 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -13,6 +13,7 @@ import sys import re import runpy import signal +import http.server try: import uvloop @@ -153,6 +154,11 @@ TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192) TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536) CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10) +PROMETHEUS_HOST = config.get("PROMETHEUS_HOST") +PROMETHEUS_PORT = config.get("PROMETHEUS_PORT") +# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall, +# set it to false value to disable. +PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'}) TG_DATACENTER_PORT = 443 @@ -895,6 +901,68 @@ async def handle_client(reader_clt, writer_clt): task_clt_to_tg.cancel() writer_tg.transport.abort() + + +async def http_reply(writer, line, body=b"", eof=False): + BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler + msg = ( + "HTTP/1.1 {}\r\n" + "Server: mtprotoproxy\r\n" + "Date: {}\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: {:d}\r\n" + ).format( + line, + BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler), + len(body) + ).encode("ascii") + if eof: + msg += b"Connection: close\r\n" + msg += b"\r\n" + body + writer.write(msg) + await writer.drain() + if eof: + writer.write_eof() + writer.close() + +async def handle_promstats(reader, writer): + set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time + if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS: + return + while True: # Keep-Alive + request = await reader.readuntil(b"\r\n\r\n") + if request.startswith(b"GET /metrics HTTP/1."): + promstat = ( + "# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n" + "# TYPE mtproxy_pump_bytes counter\n" + ) + "".join( + "mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"]) + for u in stats + ) + ( + "# HELP mtproxy_connections Current number of post-handshake client connections.\n" + "# TYPE mtproxy_connections gauge\n" + ) + "".join( + "mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"]) + for u in stats + ) + ( + "# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n" + "# TYPE mtproxy_connections_total counter\n" + ) + "".join( + "mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"]) + for u in stats + ) + await http_reply(writer, "200 OK", promstat.encode("ascii")) + else: + await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True) + return + +async def handle_promstats_wrapper(reader, writer): + try: + await handle_promstats(reader, writer) + except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError): + pass + finally: + writer.transport.abort() async def handle_client_wrapper(reader, writer): @@ -1084,6 +1152,17 @@ def main(): middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) asyncio.ensure_future(middle_proxy_updater_task) + if PROMETHEUS_PORT: + task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT, + limit=4096, # http request is quite small + backlog=8, # there are few prometheus collectors + reuse_address=True, # that's still server, TIME_WAIT should not block restart + reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong! + loop=loop) + server_promstats = loop.run_until_complete(task_promstats) + else: + server_promstats = None + reuse_port = hasattr(socket, "SO_REUSEPORT") task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT, @@ -1109,6 +1188,10 @@ def main(): server_v6.close() loop.run_until_complete(server_v6.wait_closed()) + if server_promstats is not None: + server_promstats.close() + loop.run_until_complete(server_promstats.wait_closed()) + loop.close()