1 Commits

Author SHA1 Message Date
Alexander Bersenev
7fcc854e80 add Prometheus metrics exporter 2018-07-01 01:35:50 +05:00
4 changed files with 264 additions and 300 deletions

View File

@@ -1,4 +1,4 @@
FROM alpine:3.8
FROM alpine:3.6
RUN adduser tgproxy -u 10000 -D

View File

@@ -24,4 +24,3 @@ The proxy can be launched:
- with a custom config: `python3 mtprotoproxy.py [configfile]`
- several times, clients will be automaticaly balanced between instances
- using *PyPy* interprteter
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch

View File

@@ -8,7 +8,3 @@ USERS = {
# Tag for advertising, obtainable from @MTProxybot
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
# Uncommenting this do make a proxy harder to detect
# But it can be incompatible with old clients
# SECURE_ONLY = True

View File

@@ -13,151 +13,13 @@ import sys
import re
import runpy
import signal
import http.server
if len(sys.argv) < 2:
config = runpy.run_module("config")
elif len(sys.argv) == 2:
# launch with own config
config = runpy.run_path(sys.argv[1])
else:
# undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"]
USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings
# if IPv6 avaliable, use it by default
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
# delay in seconds between middle proxy info updates
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
# max socket buffer size to the telegram servers direction
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
# drop client after this timeout if the handshake fail
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
# if client doesn't confirm data for this number of seconds, it is dropped
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
# telegram servers connect timeout in seconds
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
# listen address for IPv4
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443
TG_DATACENTERS_V4 = [
"149.154.175.50", "149.154.167.51", "149.154.175.100",
"149.154.167.91", "149.154.171.5"
]
TG_DATACENTERS_V6 = [
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
]
# This list will be updated in the runtime
TG_MIDDLE_PROXIES_V4 = {
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
}
TG_MIDDLE_PROXIES_V6 = {
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
}
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
PROXY_SECRET = bytes.fromhex(
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
)
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
HANDSHAKE_LEN = 64
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
def setup_files_limit():
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
def setup_debug():
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
def try_setup_uvloop():
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
def try_use_cryptography_module():
@@ -249,6 +111,114 @@ except ImportError:
except ImportError:
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
if len(sys.argv) < 2:
config = runpy.run_module("config")
elif len(sys.argv) == 2:
config = runpy.run_path(sys.argv[1])
else:
# undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"]
USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True)
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
# set it to false value to disable.
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
TG_DATACENTER_PORT = 443
TG_DATACENTERS_V4 = [
"149.154.175.50", "149.154.167.51", "149.154.175.100",
"149.154.167.91", "149.154.171.5"
]
TG_DATACENTERS_V6 = [
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
]
# This list will be updated in the runtime
TG_MIDDLE_PROXIES_V4 = {
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
}
TG_MIDDLE_PROXIES_V6 = {
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
}
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
PROXY_SECRET = bytes.fromhex(
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
)
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
HANDSHAKE_LEN = 64
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
def print_err(*params):
print(*params, file=sys.stderr, flush=True)
@@ -259,14 +229,14 @@ def init_stats():
stats = {user: collections.Counter() for user in USERS}
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
def update_stats(user, connects=0, curr_connects=0, octets=0):
global stats
if user not in stats:
stats[user] = collections.Counter()
stats[user].update(connects=connects, curr_connects=curr_connects,
octets=octets, msgs=msgs)
octets=octets)
class LayeredStreamReaderBase:
@@ -465,6 +435,11 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len)
if msg_len % 4 != 0:
cut_border = msg_len - (msg_len % 4)
data = data[:cut_border]
return data, extra
@@ -476,38 +451,6 @@ class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase):
async def read(self, buf_size):
msg_len_bytes = await self.upstream.readexactly(4)
msg_len = int.from_bytes(msg_len_bytes, "little")
extra = {}
if msg_len > 0x80000000:
extra["QUICKACK_FLAG"] = True
msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len)
if msg_len % 4 != 0:
cut_border = msg_len - (msg_len % 4)
data = data[:cut_border]
return data, extra
class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase):
def write(self, data, extra={}):
MAX_PADDING_LEN = 4
if extra.get("SIMPLE_ACK"):
# TODO: make this unpredictable
return self.upstream.write(data)
else:
padding_len = random.randrange(MAX_PADDING_LEN)
padding = bytearray([random.randrange(256) for i in range(padding_len)])
padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little')
return self.upstream.write(padded_data_len_bytes + data + padding)
class ProxyReqStreamReader(LayeredStreamReaderBase):
async def read(self, msg):
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
@@ -566,7 +509,6 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
FLAG_HAS_AD_TAG = 0x8
FLAG_MAGIC = 0x1000
FLAG_EXTMODE2 = 0x20000
FLAG_PAD = 0x8000000
FLAG_INTERMEDIATE = 0x20000000
FLAG_ABRIDGED = 0x40000000
FLAG_QUICKACK = 0x80000000
@@ -581,8 +523,6 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
flags |= FLAG_ABRIDGED
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
flags |= FLAG_INTERMEDIATE
elif self.proto_tag == PROTO_TAG_SECURE:
flags |= FLAG_INTERMEDIATE | FLAG_PAD
if extra.get("QUICKACK_FLAG"):
flags |= FLAG_QUICKACK
@@ -622,9 +562,6 @@ async def handle_handshake(reader, writer):
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
continue
if SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
continue
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
reader = CryptoWrappedStreamReader(reader, decryptor)
@@ -639,46 +576,19 @@ async def handle_handshake(reader, writer):
return False
def try_setsockopt(sock, level, option, value):
try:
sock.setsockopt(level, option, value)
except OSError as E:
pass
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_ack_timeout(sock, timeout):
if hasattr(socket, "TCP_USER_TIMEOUT"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf, send_buf):
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
for attempt in range(max_attempts-1):
try:
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
except (OSError, asyncio.TimeoutError):
continue
# the last attempt
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
@@ -699,18 +609,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
dc = TG_DATACENTERS_V4[dc_idx]
try:
reader_tgt, writer_tgt = await open_connection_tryer(
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
limit=TO_CLT_BUFSIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
except (OSError, asyncio.TimeoutError) as E:
except OSError as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
while True:
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
@@ -800,18 +710,16 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
timeout=TG_CONNECT_TIMEOUT)
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
except (OSError, asyncio.TimeoutError) as E:
except OSError as E:
print_err("Unable to connect to", addr, port)
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
key_selector = PROXY_SECRET[:4]
@@ -907,8 +815,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
try:
@@ -954,12 +861,9 @@ async def handle_client(reader_clt, writer_clt):
if proto_tag == PROTO_TAG_ABRIDGED:
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
elif proto_tag == PROTO_TAG_INTERMEDIATE:
elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
elif proto_tag == PROTO_TAG_SECURE:
reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt)
else:
return
@@ -977,7 +881,7 @@ async def handle_client(reader_clt, writer_clt):
await wr.drain()
return
else:
update_stats(user, octets=len(data), msgs=1)
update_stats(user, octets=len(data))
wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.streams.IncompleteReadError) as e:
@@ -997,6 +901,68 @@ async def handle_client(reader_clt, writer_clt):
task_clt_to_tg.cancel()
writer_tg.transport.abort()
async def http_reply(writer, line, body=b"", eof=False):
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
msg = (
"HTTP/1.1 {}\r\n"
"Server: mtprotoproxy\r\n"
"Date: {}\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: {:d}\r\n"
).format(
line,
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
len(body)
).encode("ascii")
if eof:
msg += b"Connection: close\r\n"
msg += b"\r\n" + body
writer.write(msg)
await writer.drain()
if eof:
writer.write_eof()
writer.close()
async def handle_promstats(reader, writer):
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
return
while True: # Keep-Alive
request = await reader.readuntil(b"\r\n\r\n")
if request.startswith(b"GET /metrics HTTP/1."):
promstat = (
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
"# TYPE mtproxy_pump_bytes counter\n"
) + "".join(
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
for u in stats
) + (
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
"# TYPE mtproxy_connections gauge\n"
) + "".join(
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
for u in stats
) + (
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
"# TYPE mtproxy_connections_total counter\n"
) + "".join(
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
for u in stats
)
await http_reply(writer, "200 OK", promstat.encode("ascii"))
else:
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
return
async def handle_promstats_wrapper(reader, writer):
try:
await handle_promstats(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort()
async def handle_client_wrapper(reader, writer):
@@ -1015,9 +981,9 @@ async def stats_printer():
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
for user, stat in stats.items():
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
print("%s: %d connects (%d current), %.2f MB" % (
user, stat["connects"], stat["curr_connects"],
stat["octets"] / 1000000, stat["msgs"]))
stat["octets"] / 1000000))
print(flush=True)
@@ -1029,19 +995,26 @@ async def update_middle_proxy_info():
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org",
"Connection: close"]) + "\r\n\r\n"
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
try:
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1)
return body
headers, body = data.split(b"\r\n\r\n", 1)
return body
except Exception:
return b""
async def get_new_proxies(url):
PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;")
ans = {}
body = await make_https_req(url)
try:
body = await make_https_req(url)
except Exception:
return ans
fields = PROXY_REGEXP.findall(body.decode("utf8"))
if fields:
@@ -1069,16 +1042,16 @@ async def update_middle_proxy_info():
if not v4_proxies:
raise Exception("no proxy data")
TG_MIDDLE_PROXIES_V4 = v4_proxies
except Exception as E:
print_err("Error updating middle proxy list:", E)
except Exception:
print_err("Error updating middle proxy list")
try:
v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6)
if not v6_proxies:
raise Exception("no proxy data (ipv6)")
TG_MIDDLE_PROXIES_V6 = v6_proxies
except Exception as E:
print_err("Error updating middle proxy list for IPv6:", E)
except Exception:
print_err("Error updating middle proxy list for IPv6")
try:
secret = await make_https_req(PROXY_SECRET_ADDR)
@@ -1087,8 +1060,8 @@ async def update_middle_proxy_info():
if secret != PROXY_SECRET:
PROXY_SECRET = secret
print_err("Middle proxy secret updated")
except Exception as E:
print_err("Error updating middle proxy secret, using old", E)
except Exception:
print_err("Error updating middle proxy secret, using old")
await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD)
@@ -1097,31 +1070,26 @@ def init_ip_info():
global USE_MIDDLE_PROXY
global PREFER_IPV6
global my_ip_info
TIMEOUT = 5
def get_ip_from_url(url):
TIMEOUT = 5
try:
with urllib.request.urlopen(url, timeout=TIMEOUT) as f:
if f.status != 200:
raise Exception("Invalid status code")
return f.read().decode().strip()
except Exception:
return None
IPV4_URL1 = "http://v4.ident.me/)"
IPV4_URL2 = "http://ipv4.icanhazip.com/"
IPV6_URL1 = "http://v6.ident.me/"
IPV6_URL2 = "http://ipv6.icanhazip.com/"
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
try:
with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f:
if f.status != 200:
raise Exception("Invalid status code")
my_ip_info["ipv4"] = f.read().decode().strip()
except Exception:
pass
if PREFER_IPV6:
if my_ip_info["ipv6"]:
print_err("IPv6 found, using it for external communication")
else:
try:
with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f:
if f.status != 200:
raise Exception("Invalid status code")
my_ip_info["ipv6"] = f.read().decode().strip()
except Exception:
PREFER_IPV6 = False
else:
print_err("IPv6 found, using it for external communication")
if USE_MIDDLE_PROXY:
if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or
@@ -1139,14 +1107,13 @@ def print_tg_info():
for user, secret in sorted(USERS.items(), key=lambda x: x[0]):
for ip in ip_addrs:
if not SECURE_ONLY:
params = {"server": ip, "port": PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
params = {"server": ip, "port": PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True)
def loop_exception_handler(loop, context):
@@ -1155,33 +1122,20 @@ def loop_exception_handler(loop, context):
if exception:
if isinstance(exception, TimeoutError):
if transport:
print_err("Timeout, killing transport")
transport.abort()
return
if isinstance(exception, OSError):
IGNORE_ERRNO = {
10038, # operation on non-socket on Windows, likely because fd == -1
121, # the semaphore timeout period has expired on Windows
}
FORCE_CLOSE_ERRNO = {
113, # no route to host
10038 # operation on non-socket on Windows, likely because fd == -1
}
if exception.errno in IGNORE_ERRNO:
return
elif exception.errno in FORCE_CLOSE_ERRNO:
if transport:
transport.abort()
return
loop.default_exception_handler(context)
def main():
setup_files_limit()
setup_debug()
try_setup_uvloop()
init_stats()
if sys.platform == "win32":
@@ -1198,14 +1152,25 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if PROMETHEUS_PORT:
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
limit=4096, # http request is quite small
backlog=8, # there are few prometheus collectors
reuse_address=True, # that's still server, TIME_WAIT should not block restart
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
loop=loop)
server_promstats = loop.run_until_complete(task_promstats)
else:
server_promstats = None
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6)
@@ -1223,6 +1188,10 @@ def main():
server_v6.close()
loop.run_until_complete(server_v6.wait_closed())
if server_promstats is not None:
server_promstats.close()
loop.run_until_complete(server_promstats.wait_closed())
loop.close()