mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Compare commits
1 Commits
v1.0.5
...
prometheus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fcc854e80 |
@@ -13,6 +13,7 @@ import sys
|
||||
import re
|
||||
import runpy
|
||||
import signal
|
||||
import http.server
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
@@ -153,6 +154,11 @@ TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
|
||||
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
|
||||
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
|
||||
# set it to false value to disable.
|
||||
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
|
||||
|
||||
TG_DATACENTER_PORT = 443
|
||||
|
||||
@@ -895,6 +901,68 @@ async def handle_client(reader_clt, writer_clt):
|
||||
task_clt_to_tg.cancel()
|
||||
|
||||
writer_tg.transport.abort()
|
||||
|
||||
|
||||
async def http_reply(writer, line, body=b"", eof=False):
|
||||
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
|
||||
msg = (
|
||||
"HTTP/1.1 {}\r\n"
|
||||
"Server: mtprotoproxy\r\n"
|
||||
"Date: {}\r\n"
|
||||
"Content-Type: text/plain\r\n"
|
||||
"Content-Length: {:d}\r\n"
|
||||
).format(
|
||||
line,
|
||||
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
|
||||
len(body)
|
||||
).encode("ascii")
|
||||
if eof:
|
||||
msg += b"Connection: close\r\n"
|
||||
msg += b"\r\n" + body
|
||||
writer.write(msg)
|
||||
await writer.drain()
|
||||
if eof:
|
||||
writer.write_eof()
|
||||
writer.close()
|
||||
|
||||
async def handle_promstats(reader, writer):
|
||||
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
|
||||
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
|
||||
return
|
||||
while True: # Keep-Alive
|
||||
request = await reader.readuntil(b"\r\n\r\n")
|
||||
if request.startswith(b"GET /metrics HTTP/1."):
|
||||
promstat = (
|
||||
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
|
||||
"# TYPE mtproxy_pump_bytes counter\n"
|
||||
) + "".join(
|
||||
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
|
||||
for u in stats
|
||||
) + (
|
||||
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
|
||||
"# TYPE mtproxy_connections gauge\n"
|
||||
) + "".join(
|
||||
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
|
||||
for u in stats
|
||||
) + (
|
||||
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
|
||||
"# TYPE mtproxy_connections_total counter\n"
|
||||
) + "".join(
|
||||
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
|
||||
for u in stats
|
||||
)
|
||||
await http_reply(writer, "200 OK", promstat.encode("ascii"))
|
||||
else:
|
||||
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
|
||||
return
|
||||
|
||||
async def handle_promstats_wrapper(reader, writer):
|
||||
try:
|
||||
await handle_promstats(reader, writer)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
||||
pass
|
||||
finally:
|
||||
writer.transport.abort()
|
||||
|
||||
|
||||
async def handle_client_wrapper(reader, writer):
|
||||
@@ -1084,6 +1152,17 @@ def main():
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
if PROMETHEUS_PORT:
|
||||
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
|
||||
limit=4096, # http request is quite small
|
||||
backlog=8, # there are few prometheus collectors
|
||||
reuse_address=True, # that's still server, TIME_WAIT should not block restart
|
||||
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
|
||||
loop=loop)
|
||||
server_promstats = loop.run_until_complete(task_promstats)
|
||||
else:
|
||||
server_promstats = None
|
||||
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
|
||||
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
|
||||
@@ -1109,6 +1188,10 @@ def main():
|
||||
server_v6.close()
|
||||
loop.run_until_complete(server_v6.wait_closed())
|
||||
|
||||
if server_promstats is not None:
|
||||
server_promstats.close()
|
||||
loop.run_until_complete(server_promstats.wait_closed())
|
||||
|
||||
loop.close()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user