From c48cacce83682f3738c9cd400e4be02b3e1f6adc Mon Sep 17 00:00:00 2001 From: Alexander Bersenev Date: Wed, 12 Feb 2020 16:28:18 +0500 Subject: [PATCH] add statisctics about up/down traffic --- mtprotoproxy.py | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/mtprotoproxy.py b/mtprotoproxy.py index 90327d2..874708b 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -1601,7 +1601,7 @@ async def handle_client(reader_clt, writer_clt): else: return - async def connect_reader_to_writer(rd, wr, user, rd_buf_size): + async def connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream): try: while True: data = await rd.read(rd_buf_size) @@ -1615,15 +1615,19 @@ async def handle_client(reader_clt, writer_clt): await wr.drain() return else: - update_user_stats(user, octets=len(data), msgs=1) + if is_upstream: + update_user_stats(user, octets_from_client=len(data), msgs_from_client=1) + else: + update_user_stats(user, octets_to_client=len(data), msgs_to_client=1) + wr.write(data, extra) await wr.drain() except (OSError, asyncio.IncompleteReadError) as e: # print_err(e) pass - tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize()) - clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize()) + tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(), False) + clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize(), True) task_tg_to_clt = asyncio.ensure_future(tg_to_clt) task_clt_to_tg = asyncio.ensure_future(clt_to_tg) @@ -1641,7 +1645,8 @@ async def handle_client(reader_clt, writer_clt): user_data_quota_hit = ( user in config.USER_DATA_QUOTA and - user_stats[user]["octets"] > config.USER_DATA_QUOTA[user] + (user_stats[user]["octets_to_client"] + + user_stats[user]["octets_from_client"] > config.USER_DATA_QUOTA[user]) ) if (not tcp_limit_hit) and (not user_expired) and (not user_data_quota_hit): @@ -1748,13 +1753,25 @@ async def handle_metrics(reader, writer): user_metrics_desc = [ ["user_connects", "counter", "user connects", "connects"], ["user_connects_curr", "gauge", "current user connects", "curr_connects"], - ["user_octets", "counter", "octets proxied for user", "octets"], - ["user_msgs", "counter", "msgs proxied for user", "msgs"], + ["user_octets", "counter", "octets proxied for user", + "octets_from_client+octets_to_client"], + ["user_msgs", "counter", "msgs proxied for user", + "msgs_from_client+msgs_to_client"], + ["user_octets_from", "counter", "octets proxied from user", "octets_from_client"], + ["user_octets_to", "counter", "octets proxied to user", "octets_to_client"], + ["user_msgs_from", "counter", "msgs proxied from user", "msgs_from_client"], + ["user_msgs_to", "counter", "msgs proxied from user", "msgs_to_client"], ] for m_name, m_type, m_desc, stat_key in user_metrics_desc: for user, stat in user_stats.items(): - metric = {"user": user, "val": stat[stat_key]} + if "+" in stat_key: + val = 0 + for key_part in stat_key.split("+"): + val += stat[key_part] + else: + val = stat[stat_key] + metric = {"user": user, "val": val} metrics.append([m_name, m_type, m_desc, metric]) pkt = make_metrics_pkt(metrics) @@ -1780,7 +1797,8 @@ async def stats_printer(): for user, stat in user_stats.items(): print("%s: %d connects (%d current), %.2f MB, %d msgs" % ( user, stat["connects"], stat["curr_connects"], - stat["octets"] / 1000000, stat["msgs"])) + (stat["octets_from_client"] + stat["octets_to_client"]) / 1000000, + stat["msgs_from_client"] + stat["msgs_to_client"])) print(flush=True) if last_client_ips: