change metrics pushes to pulls

This commit is contained in:
Alexander Bersenev
2019-09-19 02:27:57 +05:00
parent 781549f37f
commit 5fcd1c0158

View File

@@ -172,9 +172,6 @@ def init_config():
# delay in seconds between stats printing # delay in seconds between stats printing
conf_dict.setdefault("STATS_PRINT_PERIOD", 600) conf_dict.setdefault("STATS_PRINT_PERIOD", 600)
# delay in seconds between metrics sending, if enabled
conf_dict.setdefault("SEND_METRICS_PERIOD", 15)
# delay in seconds between middle proxy info updates # delay in seconds between middle proxy info updates
conf_dict.setdefault("PROXY_INFO_UPDATE_PERIOD", 24*60*60) conf_dict.setdefault("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
@@ -212,14 +209,17 @@ def init_config():
# listen unix socket # listen unix socket
conf_dict.setdefault("LISTEN_UNIX_SOCK", "") conf_dict.setdefault("LISTEN_UNIX_SOCK", "")
# prometheus push gateway addr to send metrics, disabled by default # prometheus exporter listen port, use some random port here
conf_dict.setdefault("METRICS_HOST", None) conf_dict.setdefault("METRICS_PORT", None)
# prometheus push gateway port # prometheus listen addr ipv4
conf_dict.setdefault("METRICS_PORT", 9091) conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV4", "0.0.0.0")
# prometheus push gateway identity string, by default proxy addr and port will be used # prometheus listen addr ipv6
conf_dict.setdefault("METRICS_ID", None) conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV6", None)
# prometheus scrapers whitelist
conf_dict.setdefault("METRICS_WHITELIST", ["127.0.0.1", "::1"])
# allow access to config by attributes # allow access to config by attributes
config = type("config", (dict,), conf_dict)(conf_dict) config = type("config", (dict,), conf_dict)(conf_dict)
@@ -1539,6 +1539,85 @@ async def handle_client_wrapper(reader, writer):
writer.transport.abort() writer.transport.abort()
def make_metrics_pkt(metrics):
pkt_body_list = []
used_names = set()
for name, m_type, desc, val in metrics:
if name not in used_names:
pkt_body_list.append("# HELP %s %s" % (name, desc))
pkt_body_list.append("# TYPE %s %s" % (name, m_type))
used_names.add(name)
if isinstance(val, dict):
tags = []
for tag, tag_val in val.items():
if tag == "val":
continue
tag_val = tag_val.replace('"', r'\"')
tags.append('%s="%s"' % (tag, tag_val))
pkt_body_list.append("%s{%s} %s" % (name, ",".join(tags), val["val"]))
else:
pkt_body_list.append("%s %s" % (name, val))
pkt_body = "\n".join(pkt_body_list) + "\n"
pkt_header_list = []
pkt_header_list.append("HTTP/1.1 200 OK")
pkt_header_list.append("Content-Length: %d" % len(pkt_body))
pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8")
pkt_header_list.append("Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
pkt_header = "\r\n".join(pkt_header_list)
pkt = pkt_header + "\r\n\r\n" + pkt_body
return pkt
async def handle_metrics(reader, writer):
global stats
global user_stats
global my_ip_info
global proxy_start_time
global last_clients_with_time_skew
global last_clients_with_first_pkt_error
global last_clients_with_same_handshake
client_ip = writer.get_extra_info("peername")[0]
if client_ip not in config.METRICS_WHITELIST:
writer.close()
return
try:
metrics = []
metrics.append(["uptime", "counter", "proxy uptime", time.time() - proxy_start_time])
metrics.append(["connects_bad", "counter", "connects with bad secret",
stats["connects_bad"]])
metrics.append(["connects_all", "counter", "incoming connects", stats["connects_all"]])
metrics.append(["handshake_timeouts", "counter", "number of timed out handshakes",
stats["handshake_timeouts"]])
user_metrics_desc = [
["user_connects", "counter", "user connects", "connects"],
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
["user_octets", "counter", "octets proxied for user", "octets"],
["user_msgs", "counter", "msgs proxied for user", "msgs"],
]
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
for user, stat in user_stats.items():
metric = {"user": user, "val": stat[stat_key]}
metrics.append([m_name, m_type, m_desc, metric])
pkt = make_metrics_pkt(metrics)
writer.write(pkt.encode())
await writer.drain()
except Exception:
traceback.print_exc()
finally:
writer.close()
async def stats_printer(): async def stats_printer():
global user_stats global user_stats
global last_clients_with_time_skew global last_clients_with_time_skew
@@ -1575,127 +1654,6 @@ async def stats_printer():
last_clients_with_same_handshake.clear() last_clients_with_same_handshake.clear()
def make_metrics_pkt(host, instance_name, metrics):
pkt_body_list = []
used_names = set()
for name, m_type, desc, val in metrics:
if name not in used_names:
pkt_body_list.append("# HELP %s %s" % (name, desc))
pkt_body_list.append("# TYPE %s %s" % (name, m_type))
used_names.add(name)
if isinstance(val, dict):
tags = []
for tag, tag_val in val.items():
if tag == "val":
continue
tag_val = tag_val.replace('"', r'\"')
tags.append('%s="%s"' % (tag, tag_val))
pkt_body_list.append("%s{%s} %s" % (name, ",".join(tags), val["val"]))
else:
pkt_body_list.append("%s %s" % (name, val))
pkt_body = "\n".join(pkt_body_list) + "\n"
instance_name = urllib.parse.quote_plus(instance_name)
pkt_header_list = []
pkt_header_list.append("PUT /metrics/job/mtprotoproxy/instance/%s HTTP/1.1" % instance_name)
pkt_header_list.append("Accept-Encoding: identity")
pkt_header_list.append("Content-Length: %d" % len(pkt_body))
pkt_header_list.append("Host: %s" % host)
pkt_header_list.append("User-Agent: Python-urllib/3.7")
pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8")
pkt_header_list.append("Connection: close")
pkt_header = "\r\n".join(pkt_header_list)
pkt = pkt_header + "\r\n\r\n" + pkt_body
return pkt
async def send_metrics(host, port):
global stats
global user_stats
global my_ip_info
global proxy_start_time
global last_clients_with_time_skew
global last_clients_with_first_pkt_error
global last_clients_with_same_handshake
instance_name = config.METRICS_ID
if not instance_name:
if my_ip_info.get("ipv4"):
instance_name = "%s:%d" % (my_ip_info["ipv4"], config.PORT)
elif my_ip_info.get("ipv6"):
instance_name = "%s:%d" % (my_ip_info["ipv6"], config.PORT)
else:
instance_name = "%s:%d" % ("unknown_ip", config.PORT)
metrics = []
metrics.append(["uptime", "counter", "proxy uptime", time.time() - proxy_start_time])
metrics.append(["connects_bad", "counter", "connects with bad secret", stats["connects_bad"]])
metrics.append(["connects_all", "counter", "incoming connects", stats["connects_all"]])
metrics.append(["handshake_timeouts", "counter", "number of timed out handshakes",
stats["handshake_timeouts"]])
user_metrics_desc = [
["user_connects", "counter", "user connects", "connects"],
["user_connects_curr", "gauge", "current user connects", "curr_connects"],
["user_octets", "counter", "octets proxied for user", "octets"],
["user_msgs", "counter", "msgs proxied for user", "msgs"],
]
for m_name, m_type, m_desc, stat_key in user_metrics_desc:
for user, stat in user_stats.items():
metric = {"user": user, "val": stat[stat_key]}
metrics.append([m_name, m_type, m_desc, metric])
metrics_host_hdr = "%s:%s" % (host, port)
pkt = make_metrics_pkt(metrics_host_hdr, instance_name, metrics)
reader, writer = await asyncio.open_connection(host, port)
writer.write(pkt.encode())
await writer.drain()
http_vers = (await reader.readuntil(b" "))[:-1]
http_statuscode = (await reader.readuntil(b" "))[:-1]
writer.close()
return http_vers == b"HTTP/1.1" and http_statuscode == b"202"
async def metrics_sender():
SEND_METRICS_ENABLED_CHECK_PERIOD = 60
SEND_METRICS_TIMEOUT = 10
last_error_msg = ""
while True:
if not config.METRICS_HOST:
await asyncio.sleep(config.SEND_METRICS_ENABLED_CHECK_PERIOD)
continue
await asyncio.sleep(config.SEND_METRICS_PERIOD)
error_msg = ""
try:
task = send_metrics(config.METRICS_HOST, config.METRICS_PORT)
sent = await asyncio.wait_for(task, timeout=SEND_METRICS_TIMEOUT)
if not sent:
error_msg = "The METRICS_HOST %s refused metrics" % config.METRICS_HOST
except ConnectionRefusedError:
error_msg = "The METRICS_HOST %s is refusing connections" % config.METRICS_HOST
except (TimeoutError, asyncio.TimeoutError):
error_msg = "Got timeout while sending metrics to METRICS_HOST %s" % config.METRICS_HOST
except Exception as E:
error_msg = ("Got exception while sending metrics to METRICS_HOST %s: %s" %
(config.METRICS_HOST, E))
if error_msg and error_msg != last_error_msg:
print_err(error_msg)
last_error_msg = error_msg
async def make_https_req(url, host="core.telegram.org"): async def make_https_req(url, host="core.telegram.org"):
""" Make request, return resp body and headers. """ """ Make request, return resp body and headers. """
SSL_PORT = 443 SSL_PORT = 443
@@ -2082,9 +2040,6 @@ def main():
stats_printer_task = asyncio.Task(stats_printer()) stats_printer_task = asyncio.Task(stats_printer())
asyncio.ensure_future(stats_printer_task) asyncio.ensure_future(stats_printer_task)
metrics_sender_task = asyncio.Task(metrics_sender())
asyncio.ensure_future(metrics_sender_task)
if config.USE_MIDDLE_PROXY: if config.USE_MIDDLE_PROXY:
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task) asyncio.ensure_future(middle_proxy_updater_task)
@@ -2120,6 +2075,16 @@ def main():
servers.append(loop.run_until_complete(task)) servers.append(loop.run_until_complete(task))
os.chmod(config.LISTEN_UNIX_SOCK, 0o666) os.chmod(config.LISTEN_UNIX_SOCK, 0o666)
if config.METRICS_PORT is not None:
if config.METRICS_LISTEN_ADDR_IPV4:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4,
config.METRICS_PORT, loop=loop)
servers.append(loop.run_until_complete(task))
if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6:
task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6,
config.METRICS_PORT, loop=loop)
servers.append(loop.run_until_complete(task))
try: try:
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt: