mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-18 00:29:47 +00:00
Compare commits
12 Commits
prometheus
...
v1.0.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47f7c088af | ||
|
|
6f8bfdb568 | ||
|
|
0a7e2d85b8 | ||
|
|
0caf5f89a8 | ||
|
|
33fabe7590 | ||
|
|
c0ed5e1e38 | ||
|
|
bcac5eb878 | ||
|
|
b38084bf36 | ||
|
|
93f71f5ec2 | ||
|
|
4a47d79ea4 | ||
|
|
92204e492b | ||
|
|
97ae711223 |
@@ -24,3 +24,4 @@ The proxy can be launched:
|
|||||||
- with a custom config: `python3 mtprotoproxy.py [configfile]`
|
- with a custom config: `python3 mtprotoproxy.py [configfile]`
|
||||||
- several times, clients will be automaticaly balanced between instances
|
- several times, clients will be automaticaly balanced between instances
|
||||||
- using *PyPy* interprteter
|
- using *PyPy* interprteter
|
||||||
|
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch
|
||||||
|
|||||||
@@ -148,11 +148,12 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
|||||||
# disables tg->client trafic reencryption, faster but less secure
|
# disables tg->client trafic reencryption, faster but less secure
|
||||||
FAST_MODE = config.get("FAST_MODE", True)
|
FAST_MODE = config.get("FAST_MODE", True)
|
||||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
||||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||||
|
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||||
|
|
||||||
TG_DATACENTER_PORT = 443
|
TG_DATACENTER_PORT = 443
|
||||||
|
|
||||||
@@ -223,14 +224,14 @@ def init_stats():
|
|||||||
stats = {user: collections.Counter() for user in USERS}
|
stats = {user: collections.Counter() for user in USERS}
|
||||||
|
|
||||||
|
|
||||||
def update_stats(user, connects=0, curr_connects=0, octets=0):
|
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
|
||||||
global stats
|
global stats
|
||||||
|
|
||||||
if user not in stats:
|
if user not in stats:
|
||||||
stats[user] = collections.Counter()
|
stats[user] = collections.Counter()
|
||||||
|
|
||||||
stats[user].update(connects=connects, curr_connects=curr_connects,
|
stats[user].update(connects=connects, curr_connects=curr_connects,
|
||||||
octets=octets)
|
octets=octets, msgs=msgs)
|
||||||
|
|
||||||
|
|
||||||
class LayeredStreamReaderBase:
|
class LayeredStreamReaderBase:
|
||||||
@@ -580,6 +581,11 @@ def set_keepalive(sock, interval=40, attempts=5):
|
|||||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||||
|
|
||||||
|
|
||||||
|
def set_ack_timeout(sock, timeout):
|
||||||
|
if hasattr(socket, "TCP_USER_TIMEOUT"):
|
||||||
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
|
||||||
|
|
||||||
|
|
||||||
def set_bufsizes(sock, recv_buf, send_buf):
|
def set_bufsizes(sock, recv_buf, send_buf):
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||||
@@ -809,7 +815,8 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|||||||
|
|
||||||
|
|
||||||
async def handle_client(reader_clt, writer_clt):
|
async def handle_client(reader_clt, writer_clt):
|
||||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
|
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
|
||||||
|
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
|
||||||
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -875,7 +882,7 @@ async def handle_client(reader_clt, writer_clt):
|
|||||||
await wr.drain()
|
await wr.drain()
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
update_stats(user, octets=len(data))
|
update_stats(user, octets=len(data), msgs=1)
|
||||||
wr.write(data, extra)
|
wr.write(data, extra)
|
||||||
await wr.drain()
|
await wr.drain()
|
||||||
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
||||||
@@ -913,9 +920,9 @@ async def stats_printer():
|
|||||||
|
|
||||||
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
||||||
for user, stat in stats.items():
|
for user, stat in stats.items():
|
||||||
print("%s: %d connects (%d current), %.2f MB" % (
|
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
||||||
user, stat["connects"], stat["curr_connects"],
|
user, stat["connects"], stat["curr_connects"],
|
||||||
stat["octets"] / 1000000))
|
stat["octets"] / 1000000, stat["msgs"]))
|
||||||
print(flush=True)
|
print(flush=True)
|
||||||
|
|
||||||
|
|
||||||
@@ -1054,15 +1061,24 @@ def loop_exception_handler(loop, context):
|
|||||||
if exception:
|
if exception:
|
||||||
if isinstance(exception, TimeoutError):
|
if isinstance(exception, TimeoutError):
|
||||||
if transport:
|
if transport:
|
||||||
print_err("Timeout, killing transport")
|
|
||||||
transport.abort()
|
transport.abort()
|
||||||
return
|
return
|
||||||
if isinstance(exception, OSError):
|
if isinstance(exception, OSError):
|
||||||
IGNORE_ERRNO = {
|
IGNORE_ERRNO = {
|
||||||
10038 # operation on non-socket on Windows, likely because fd == -1
|
10038, # operation on non-socket on Windows, likely because fd == -1
|
||||||
|
121, # the semaphore timeout period has expired on Windows
|
||||||
|
}
|
||||||
|
|
||||||
|
FORCE_CLOSE_ERRNO = {
|
||||||
|
113, # no route to host
|
||||||
|
|
||||||
}
|
}
|
||||||
if exception.errno in IGNORE_ERRNO:
|
if exception.errno in IGNORE_ERRNO:
|
||||||
return
|
return
|
||||||
|
elif exception.errno in FORCE_CLOSE_ERRNO:
|
||||||
|
if transport:
|
||||||
|
transport.abort()
|
||||||
|
return
|
||||||
|
|
||||||
loop.default_exception_handler(context)
|
loop.default_exception_handler(context)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user