5 Commits

Author SHA1 Message Date
Alexander Bersenev
7fcc854e80 add Prometheus metrics exporter 2018-07-01 01:35:50 +05:00
Alexander Bersenev
675d5a6aba send buffer size on the direct handshake also 2018-06-30 23:09:43 +05:00
Alexander Bersenev
b31768165c buffers redesign 2018-06-30 22:54:11 +05:00
Alexander Bersenev
372861ac6e support for secure mode 2018-06-29 18:51:47 +05:00
Alexander Bersenev
6a27096618 add secure tag 2018-06-29 17:52:37 +05:00

View File

@@ -13,6 +13,7 @@ import sys
import re
import runpy
import signal
import http.server
try:
import uvloop
@@ -149,10 +150,15 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
FAST_MODE = config.get("FAST_MODE", True)
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
# set it to false value to disable.
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
TG_DATACENTER_PORT = 443
@@ -203,6 +209,7 @@ DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
@@ -429,6 +436,10 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
data = await self.upstream.readexactly(msg_len)
if msg_len % 4 != 0:
cut_border = msg_len - (msg_len % 4)
data = data[:cut_border]
return data, extra
@@ -548,7 +559,7 @@ async def handle_handshake(reader, writer):
decrypted = decryptor.decrypt(handshake)
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE):
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
continue
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
@@ -557,13 +568,29 @@ async def handle_handshake(reader, writer):
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
while await reader.read(READ_BUF_SIZE):
EMPTY_READ_BUF_SIZE = 4096
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf, send_buf):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
@@ -583,7 +610,10 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
limit=READ_BUF_SIZE)
limit=TO_CLT_BUFSIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
@@ -653,21 +683,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
return key, iv
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
START_SEQ_NO = -2
NONCE_LEN = 16
@@ -695,9 +710,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE)
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
@@ -719,7 +734,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(READ_BUF_SIZE)
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
if len(ans) != RPC_NONCE_ANS_LEN:
return False
@@ -801,7 +816,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
set_bufsizes(writer_clt.get_extra_info("socket"))
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
@@ -846,16 +861,16 @@ async def handle_client(reader_clt, writer_clt):
if proto_tag == PROTO_TAG_ABRIDGED:
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
elif proto_tag == PROTO_TAG_INTERMEDIATE:
elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
else:
return
async def connect_reader_to_writer(rd, wr, user):
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
try:
while True:
data = await rd.read(READ_BUF_SIZE)
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
@@ -873,8 +888,10 @@ async def handle_client(reader_clt, writer_clt):
# print_err(e)
pass
task_tg_to_clt = asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
task_clt_to_tg = asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
update_stats(user, curr_connects=1)
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
@@ -884,6 +901,68 @@ async def handle_client(reader_clt, writer_clt):
task_clt_to_tg.cancel()
writer_tg.transport.abort()
async def http_reply(writer, line, body=b"", eof=False):
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
msg = (
"HTTP/1.1 {}\r\n"
"Server: mtprotoproxy\r\n"
"Date: {}\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: {:d}\r\n"
).format(
line,
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
len(body)
).encode("ascii")
if eof:
msg += b"Connection: close\r\n"
msg += b"\r\n" + body
writer.write(msg)
await writer.drain()
if eof:
writer.write_eof()
writer.close()
async def handle_promstats(reader, writer):
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
return
while True: # Keep-Alive
request = await reader.readuntil(b"\r\n\r\n")
if request.startswith(b"GET /metrics HTTP/1."):
promstat = (
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
"# TYPE mtproxy_pump_bytes counter\n"
) + "".join(
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
for u in stats
) + (
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
"# TYPE mtproxy_connections gauge\n"
) + "".join(
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
for u in stats
) + (
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
"# TYPE mtproxy_connections_total counter\n"
) + "".join(
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
for u in stats
)
await http_reply(writer, "200 OK", promstat.encode("ascii"))
else:
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
return
async def handle_promstats_wrapper(reader, writer):
try:
await handle_promstats(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort()
async def handle_client_wrapper(reader, writer):
@@ -1032,6 +1111,10 @@ def print_tg_info():
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True)
def loop_exception_handler(loop, context):
exception = context.get("exception")
@@ -1069,15 +1152,26 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if PROMETHEUS_PORT:
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
limit=4096, # http request is quite small
backlog=8, # there are few prometheus collectors
reuse_address=True, # that's still server, TIME_WAIT should not block restart
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
loop=loop)
server_promstats = loop.run_until_complete(task_promstats)
else:
server_promstats = None
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop)
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
limit=READ_BUF_SIZE, reuse_port=reuse_port, loop=loop)
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6)
try:
@@ -1094,6 +1188,10 @@ def main():
server_v6.close()
loop.run_until_complete(server_v6.wait_closed())
if server_promstats is not None:
server_promstats.close()
loop.run_until_complete(server_promstats.wait_closed())
loop.close()