add Prometheus metrics exporter

This commit is contained in:
Alexander Bersenev
2018-07-01 01:35:50 +05:00
parent 675d5a6aba
commit 7fcc854e80

View File

@@ -13,6 +13,7 @@ import sys
import re
import runpy
import signal
import http.server
try:
import uvloop
@@ -153,6 +154,11 @@ TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
# set it to false value to disable.
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
TG_DATACENTER_PORT = 443
@@ -897,6 +903,68 @@ async def handle_client(reader_clt, writer_clt):
writer_tg.transport.abort()
async def http_reply(writer, line, body=b"", eof=False):
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
msg = (
"HTTP/1.1 {}\r\n"
"Server: mtprotoproxy\r\n"
"Date: {}\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: {:d}\r\n"
).format(
line,
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
len(body)
).encode("ascii")
if eof:
msg += b"Connection: close\r\n"
msg += b"\r\n" + body
writer.write(msg)
await writer.drain()
if eof:
writer.write_eof()
writer.close()
async def handle_promstats(reader, writer):
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
return
while True: # Keep-Alive
request = await reader.readuntil(b"\r\n\r\n")
if request.startswith(b"GET /metrics HTTP/1."):
promstat = (
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
"# TYPE mtproxy_pump_bytes counter\n"
) + "".join(
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
for u in stats
) + (
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
"# TYPE mtproxy_connections gauge\n"
) + "".join(
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
for u in stats
) + (
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
"# TYPE mtproxy_connections_total counter\n"
) + "".join(
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
for u in stats
)
await http_reply(writer, "200 OK", promstat.encode("ascii"))
else:
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
return
async def handle_promstats_wrapper(reader, writer):
try:
await handle_promstats(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort()
async def handle_client_wrapper(reader, writer):
try:
await handle_client(reader, writer)
@@ -1084,6 +1152,17 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if PROMETHEUS_PORT:
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
limit=4096, # http request is quite small
backlog=8, # there are few prometheus collectors
reuse_address=True, # that's still server, TIME_WAIT should not block restart
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
loop=loop)
server_promstats = loop.run_until_complete(task_promstats)
else:
server_promstats = None
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
@@ -1109,6 +1188,10 @@ def main():
server_v6.close()
loop.run_until_complete(server_v6.wait_closed())
if server_promstats is not None:
server_promstats.close()
loop.run_until_complete(server_promstats.wait_closed())
loop.close()