diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 5305d4f..5df0ebb 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -172,9 +172,6 @@ def init_config(): # delay in seconds between stats printing conf_dict.setdefault("STATS_PRINT_PERIOD", 600) - # delay in seconds between metrics sending, if enabled - conf_dict.setdefault("SEND_METRICS_PERIOD", 15) - # delay in seconds between middle proxy info updates conf_dict.setdefault("PROXY_INFO_UPDATE_PERIOD", 24*60*60) @@ -212,14 +209,17 @@ def init_config(): # listen unix socket conf_dict.setdefault("LISTEN_UNIX_SOCK", "") - # prometheus push gateway addr to send metrics, disabled by default - conf_dict.setdefault("METRICS_HOST", None) + # prometheus exporter listen port, use some random port here + conf_dict.setdefault("METRICS_PORT", None) - # prometheus push gateway port - conf_dict.setdefault("METRICS_PORT", 9091) + # prometheus listen addr ipv4 + conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV4", "0.0.0.0") - # prometheus push gateway identity string, by default proxy addr and port will be used - conf_dict.setdefault("METRICS_ID", None) + # prometheus listen addr ipv6 + conf_dict.setdefault("METRICS_LISTEN_ADDR_IPV6", None) + + # prometheus scrapers whitelist + conf_dict.setdefault("METRICS_WHITELIST", ["127.0.0.1", "::1"]) # allow access to config by attributes config = type("config", (dict,), conf_dict)(conf_dict) @@ -1539,6 +1539,85 @@ async def handle_client_wrapper(reader, writer): writer.transport.abort() +def make_metrics_pkt(metrics): + pkt_body_list = [] + used_names = set() + + for name, m_type, desc, val in metrics: + if name not in used_names: + pkt_body_list.append("# HELP %s %s" % (name, desc)) + pkt_body_list.append("# TYPE %s %s" % (name, m_type)) + used_names.add(name) + + if isinstance(val, dict): + tags = [] + for tag, tag_val in val.items(): + if tag == "val": + continue + tag_val = tag_val.replace('"', r'\"') + tags.append('%s="%s"' % (tag, tag_val)) + pkt_body_list.append("%s{%s} %s" % (name, ",".join(tags), val["val"])) + else: + pkt_body_list.append("%s %s" % (name, val)) + pkt_body = "\n".join(pkt_body_list) + "\n" + + pkt_header_list = [] + pkt_header_list.append("HTTP/1.1 200 OK") + pkt_header_list.append("Content-Length: %d" % len(pkt_body)) + pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8") + pkt_header_list.append("Date: %s" % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())) + + pkt_header = "\r\n".join(pkt_header_list) + + pkt = pkt_header + "\r\n\r\n" + pkt_body + return pkt + + +async def handle_metrics(reader, writer): + global stats + global user_stats + global my_ip_info + global proxy_start_time + global last_clients_with_time_skew + global last_clients_with_first_pkt_error + global last_clients_with_same_handshake + + client_ip = writer.get_extra_info("peername")[0] + if client_ip not in config.METRICS_WHITELIST: + writer.close() + return + + try: + metrics = [] + metrics.append(["uptime", "counter", "proxy uptime", time.time() - proxy_start_time]) + metrics.append(["connects_bad", "counter", "connects with bad secret", + stats["connects_bad"]]) + metrics.append(["connects_all", "counter", "incoming connects", stats["connects_all"]]) + metrics.append(["handshake_timeouts", "counter", "number of timed out handshakes", + stats["handshake_timeouts"]]) + + user_metrics_desc = [ + ["user_connects", "counter", "user connects", "connects"], + ["user_connects_curr", "gauge", "current user connects", "curr_connects"], + ["user_octets", "counter", "octets proxied for user", "octets"], + ["user_msgs", "counter", "msgs proxied for user", "msgs"], + ] + + for m_name, m_type, m_desc, stat_key in user_metrics_desc: + for user, stat in user_stats.items(): + metric = {"user": user, "val": stat[stat_key]} + metrics.append([m_name, m_type, m_desc, metric]) + + pkt = make_metrics_pkt(metrics) + writer.write(pkt.encode()) + await writer.drain() + + except Exception: + traceback.print_exc() + finally: + writer.close() + + async def stats_printer(): global user_stats global last_clients_with_time_skew @@ -1575,127 +1654,6 @@ async def stats_printer(): last_clients_with_same_handshake.clear() -def make_metrics_pkt(host, instance_name, metrics): - pkt_body_list = [] - used_names = set() - - for name, m_type, desc, val in metrics: - if name not in used_names: - pkt_body_list.append("# HELP %s %s" % (name, desc)) - pkt_body_list.append("# TYPE %s %s" % (name, m_type)) - used_names.add(name) - - if isinstance(val, dict): - tags = [] - for tag, tag_val in val.items(): - if tag == "val": - continue - tag_val = tag_val.replace('"', r'\"') - tags.append('%s="%s"' % (tag, tag_val)) - pkt_body_list.append("%s{%s} %s" % (name, ",".join(tags), val["val"])) - else: - pkt_body_list.append("%s %s" % (name, val)) - pkt_body = "\n".join(pkt_body_list) + "\n" - - instance_name = urllib.parse.quote_plus(instance_name) - - pkt_header_list = [] - pkt_header_list.append("PUT /metrics/job/mtprotoproxy/instance/%s HTTP/1.1" % instance_name) - pkt_header_list.append("Accept-Encoding: identity") - pkt_header_list.append("Content-Length: %d" % len(pkt_body)) - pkt_header_list.append("Host: %s" % host) - pkt_header_list.append("User-Agent: Python-urllib/3.7") - pkt_header_list.append("Content-Type: text/plain; version=0.0.4; charset=utf-8") - pkt_header_list.append("Connection: close") - - pkt_header = "\r\n".join(pkt_header_list) - - pkt = pkt_header + "\r\n\r\n" + pkt_body - return pkt - - -async def send_metrics(host, port): - global stats - global user_stats - global my_ip_info - global proxy_start_time - global last_clients_with_time_skew - global last_clients_with_first_pkt_error - global last_clients_with_same_handshake - - instance_name = config.METRICS_ID - if not instance_name: - if my_ip_info.get("ipv4"): - instance_name = "%s:%d" % (my_ip_info["ipv4"], config.PORT) - elif my_ip_info.get("ipv6"): - instance_name = "%s:%d" % (my_ip_info["ipv6"], config.PORT) - else: - instance_name = "%s:%d" % ("unknown_ip", config.PORT) - - metrics = [] - metrics.append(["uptime", "counter", "proxy uptime", time.time() - proxy_start_time]) - metrics.append(["connects_bad", "counter", "connects with bad secret", stats["connects_bad"]]) - metrics.append(["connects_all", "counter", "incoming connects", stats["connects_all"]]) - metrics.append(["handshake_timeouts", "counter", "number of timed out handshakes", - stats["handshake_timeouts"]]) - - user_metrics_desc = [ - ["user_connects", "counter", "user connects", "connects"], - ["user_connects_curr", "gauge", "current user connects", "curr_connects"], - ["user_octets", "counter", "octets proxied for user", "octets"], - ["user_msgs", "counter", "msgs proxied for user", "msgs"], - ] - - for m_name, m_type, m_desc, stat_key in user_metrics_desc: - for user, stat in user_stats.items(): - metric = {"user": user, "val": stat[stat_key]} - metrics.append([m_name, m_type, m_desc, metric]) - - metrics_host_hdr = "%s:%s" % (host, port) - pkt = make_metrics_pkt(metrics_host_hdr, instance_name, metrics) - - reader, writer = await asyncio.open_connection(host, port) - writer.write(pkt.encode()) - await writer.drain() - - http_vers = (await reader.readuntil(b" "))[:-1] - http_statuscode = (await reader.readuntil(b" "))[:-1] - writer.close() - - return http_vers == b"HTTP/1.1" and http_statuscode == b"202" - - -async def metrics_sender(): - SEND_METRICS_ENABLED_CHECK_PERIOD = 60 - SEND_METRICS_TIMEOUT = 10 - - last_error_msg = "" - - while True: - if not config.METRICS_HOST: - await asyncio.sleep(config.SEND_METRICS_ENABLED_CHECK_PERIOD) - continue - await asyncio.sleep(config.SEND_METRICS_PERIOD) - - error_msg = "" - try: - task = send_metrics(config.METRICS_HOST, config.METRICS_PORT) - sent = await asyncio.wait_for(task, timeout=SEND_METRICS_TIMEOUT) - if not sent: - error_msg = "The METRICS_HOST %s refused metrics" % config.METRICS_HOST - except ConnectionRefusedError: - error_msg = "The METRICS_HOST %s is refusing connections" % config.METRICS_HOST - except (TimeoutError, asyncio.TimeoutError): - error_msg = "Got timeout while sending metrics to METRICS_HOST %s" % config.METRICS_HOST - except Exception as E: - error_msg = ("Got exception while sending metrics to METRICS_HOST %s: %s" % - (config.METRICS_HOST, E)) - - if error_msg and error_msg != last_error_msg: - print_err(error_msg) - last_error_msg = error_msg - - async def make_https_req(url, host="core.telegram.org"): """ Make request, return resp body and headers. """ SSL_PORT = 443 @@ -2082,9 +2040,6 @@ def main(): stats_printer_task = asyncio.Task(stats_printer()) asyncio.ensure_future(stats_printer_task) - metrics_sender_task = asyncio.Task(metrics_sender()) - asyncio.ensure_future(metrics_sender_task) - if config.USE_MIDDLE_PROXY: middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) asyncio.ensure_future(middle_proxy_updater_task) @@ -2120,6 +2075,16 @@ def main(): servers.append(loop.run_until_complete(task)) os.chmod(config.LISTEN_UNIX_SOCK, 0o666) + if config.METRICS_PORT is not None: + if config.METRICS_LISTEN_ADDR_IPV4: + task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV4, + config.METRICS_PORT, loop=loop) + servers.append(loop.run_until_complete(task)) + if config.METRICS_LISTEN_ADDR_IPV6 and socket.has_ipv6: + task = asyncio.start_server(handle_metrics, config.METRICS_LISTEN_ADDR_IPV6, + config.METRICS_PORT, loop=loop) + servers.append(loop.run_until_complete(task)) + try: loop.run_forever() except KeyboardInterrupt: