mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 23:32:30 +00:00
Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4473d6374 | ||
|
|
c2278501bf | ||
|
|
534b26cd04 | ||
|
|
c1bef68602 | ||
|
|
8e79dacd26 | ||
|
|
520a26aa89 | ||
|
|
647b6f6edd | ||
|
|
c2ad0de665 | ||
|
|
47f7c088af | ||
|
|
6f8bfdb568 | ||
|
|
0a7e2d85b8 | ||
|
|
0caf5f89a8 | ||
|
|
33fabe7590 | ||
|
|
c0ed5e1e38 | ||
|
|
bcac5eb878 | ||
|
|
b38084bf36 | ||
|
|
675d5a6aba | ||
|
|
b31768165c | ||
|
|
372861ac6e | ||
|
|
6a27096618 | ||
|
|
93f71f5ec2 | ||
|
|
03f7ca1d4c | ||
|
|
3477402c0d | ||
|
|
532021ab87 | ||
|
|
6900cdda43 | ||
|
|
ec1c6b4fb6 | ||
|
|
63b689e3bf | ||
|
|
71e3206b19 | ||
|
|
7eea7d3201 | ||
|
|
2e86308e90 | ||
|
|
d74bb68f03 | ||
|
|
5f35b4ed0a | ||
|
|
b74079c433 | ||
|
|
32d3bffc7b | ||
|
|
a20b1c9929 | ||
|
|
444a1876b6 | ||
|
|
ed088d9449 | ||
|
|
accba06b45 | ||
|
|
bd3d9731d7 | ||
|
|
9077ceb471 | ||
|
|
d2ff0f61e4 | ||
|
|
d56c995ee2 | ||
|
|
51c40903ab | ||
|
|
e1d592cd84 |
@@ -1,13 +1,13 @@
|
||||
FROM alpine:3.6
|
||||
FROM alpine:3.8
|
||||
|
||||
RUN adduser tgproxy -u 10000 -D
|
||||
|
||||
RUN apk add --no-cache python3 py3-crypto ca-certificates
|
||||
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
|
||||
|
||||
COPY mtprotoproxy.py config.py /home/tgproxy/
|
||||
COPY pyaes/*.py /home/tgproxy/pyaes/
|
||||
|
||||
RUN chown -R tgproxy:tgproxy /home/tgproxy
|
||||
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.6
|
||||
|
||||
USER tgproxy
|
||||
|
||||
|
||||
10
README.md
10
README.md
@@ -16,4 +16,12 @@ To advertise a channel get a tag from **@MTProxybot** and write it to *config.py
|
||||
## Performance ##
|
||||
|
||||
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
|
||||
the smallest VDS instance with 1 CPU core and 1024MB RAM.
|
||||
the VDS instance with 1 CPU core and 1024MB RAM.
|
||||
|
||||
## Advanced Usage ##
|
||||
|
||||
The proxy can be launched:
|
||||
- with a custom config: `python3 mtprotoproxy.py [configfile]`
|
||||
- several times, clients will be automaticaly balanced between instances
|
||||
- using *PyPy* interprteter
|
||||
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch
|
||||
|
||||
@@ -3,5 +3,5 @@ services:
|
||||
mtprotoproxy:
|
||||
build: .
|
||||
restart: unless-stopped
|
||||
mem_limit: 1024m
|
||||
network_mode: "host"
|
||||
# mem_limit: 1024m
|
||||
|
||||
307
mtprotoproxy.py
307
mtprotoproxy.py
@@ -12,9 +12,54 @@ import binascii
|
||||
import sys
|
||||
import re
|
||||
import runpy
|
||||
|
||||
import signal
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def try_use_cryptography_module():
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
def create_aes_ctr(key, iv):
|
||||
class EncryptorAdapter:
|
||||
def __init__(self, cipher):
|
||||
self.encryptor = cipher.encryptor()
|
||||
self.decryptor = cipher.decryptor()
|
||||
|
||||
def encrypt(self, data):
|
||||
return self.encryptor.update(data)
|
||||
|
||||
def decrypt(self, data):
|
||||
return self.decryptor.update(data)
|
||||
|
||||
iv_bytes = int.to_bytes(iv, 16, "big")
|
||||
cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend())
|
||||
return EncryptorAdapter(cipher)
|
||||
|
||||
def create_aes_cbc(key, iv):
|
||||
class EncryptorAdapter:
|
||||
def __init__(self, cipher):
|
||||
self.encryptor = cipher.encryptor()
|
||||
self.decryptor = cipher.decryptor()
|
||||
|
||||
def encrypt(self, data):
|
||||
return self.encryptor.update(data)
|
||||
|
||||
def decrypt(self, data):
|
||||
return self.decryptor.update(data)
|
||||
|
||||
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend())
|
||||
return EncryptorAdapter(cipher)
|
||||
|
||||
return create_aes_ctr, create_aes_cbc
|
||||
|
||||
|
||||
def try_use_pycrypto_or_pycryptodome_module():
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util import Counter
|
||||
|
||||
@@ -25,11 +70,16 @@ try:
|
||||
def create_aes_cbc(key, iv):
|
||||
return AES.new(key, AES.MODE_CBC, iv)
|
||||
|
||||
except ImportError:
|
||||
print("Failed to find pycryptodome or pycrypto, using slow AES implementation",
|
||||
flush=True, file=sys.stderr)
|
||||
return create_aes_ctr, create_aes_cbc
|
||||
|
||||
|
||||
def use_slow_bundled_cryptography_module():
|
||||
import pyaes
|
||||
|
||||
msg = "To make the program a *lot* faster, please install cryptography module: "
|
||||
msg += "pip install cryptography\n"
|
||||
print(msg, flush=True, file=sys.stderr)
|
||||
|
||||
def create_aes_ctr(key, iv):
|
||||
ctr = pyaes.Counter(iv)
|
||||
return pyaes.AESModeOfOperationCTR(key, ctr)
|
||||
@@ -49,8 +99,17 @@ except ImportError:
|
||||
|
||||
mode = pyaes.AESModeOfOperationCBC(key, iv)
|
||||
return EncryptorAdapter(mode)
|
||||
return create_aes_ctr, create_aes_cbc
|
||||
|
||||
|
||||
try:
|
||||
create_aes_ctr, create_aes_cbc = try_use_cryptography_module()
|
||||
except ImportError:
|
||||
try:
|
||||
create_aes_ctr, create_aes_cbc = try_use_pycrypto_or_pycryptodome_module()
|
||||
except ImportError:
|
||||
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
|
||||
|
||||
try:
|
||||
import resource
|
||||
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
||||
@@ -60,24 +119,42 @@ except (ValueError, OSError):
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
if hasattr(signal, 'SIGUSR1'):
|
||||
def debug_signal(signum, frame):
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
|
||||
signal.signal(signal.SIGUSR1, debug_signal)
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
config = runpy.run_module("config")
|
||||
elif len(sys.argv) == 2:
|
||||
config = runpy.run_path(sys.argv[1])
|
||||
else:
|
||||
config = runpy.run_module("config")
|
||||
# undocumented way of launching
|
||||
config = {}
|
||||
config["PORT"] = int(sys.argv[1])
|
||||
secrets = sys.argv[2].split(",")
|
||||
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
||||
if len(sys.argv) > 3:
|
||||
config["AD_TAG"] = sys.argv[3]
|
||||
|
||||
PORT = config["PORT"]
|
||||
USERS = config["USERS"]
|
||||
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||
|
||||
# load advanced settings
|
||||
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
FAST_MODE = config.get("FAST_MODE", True)
|
||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
||||
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
|
||||
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
||||
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
|
||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
|
||||
|
||||
TG_DATACENTER_PORT = 443
|
||||
|
||||
@@ -128,6 +205,7 @@ DC_IDX_POS = 60
|
||||
|
||||
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
||||
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
||||
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
||||
|
||||
CBC_PADDING = 16
|
||||
PADDING_FILLER = b"\x04\x00\x00\x00"
|
||||
@@ -147,14 +225,14 @@ def init_stats():
|
||||
stats = {user: collections.Counter() for user in USERS}
|
||||
|
||||
|
||||
def update_stats(user, connects=0, curr_connects_x2=0, octets=0):
|
||||
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
|
||||
global stats
|
||||
|
||||
if user not in stats:
|
||||
stats[user] = collections.Counter()
|
||||
|
||||
stats[user].update(connects=connects, curr_connects_x2=curr_connects_x2,
|
||||
octets=octets)
|
||||
stats[user].update(connects=connects, curr_connects=curr_connects,
|
||||
octets=octets, msgs=msgs)
|
||||
|
||||
|
||||
class LayeredStreamReaderBase:
|
||||
@@ -353,7 +431,6 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
||||
msg_len -= 0x80000000
|
||||
|
||||
data = await self.upstream.readexactly(msg_len)
|
||||
|
||||
return data, extra
|
||||
|
||||
|
||||
@@ -365,6 +442,38 @@ class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
||||
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
|
||||
|
||||
|
||||
class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
||||
async def read(self, buf_size):
|
||||
msg_len_bytes = await self.upstream.readexactly(4)
|
||||
msg_len = int.from_bytes(msg_len_bytes, "little")
|
||||
|
||||
extra = {}
|
||||
if msg_len > 0x80000000:
|
||||
extra["QUICKACK_FLAG"] = True
|
||||
msg_len -= 0x80000000
|
||||
|
||||
data = await self.upstream.readexactly(msg_len)
|
||||
|
||||
if msg_len % 4 != 0:
|
||||
cut_border = msg_len - (msg_len % 4)
|
||||
data = data[:cut_border]
|
||||
|
||||
return data, extra
|
||||
|
||||
|
||||
class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase):
|
||||
def write(self, data, extra={}):
|
||||
MAX_PADDING_LEN = 4
|
||||
if extra.get("SIMPLE_ACK"):
|
||||
# TODO: make this unpredictable
|
||||
return self.upstream.write(data)
|
||||
else:
|
||||
padding_len = random.randrange(MAX_PADDING_LEN)
|
||||
padding = bytearray([random.randrange(256) for i in range(padding_len)])
|
||||
padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little')
|
||||
return self.upstream.write(padded_data_len_bytes + data + padding)
|
||||
|
||||
|
||||
class ProxyReqStreamReader(LayeredStreamReaderBase):
|
||||
async def read(self, msg):
|
||||
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
|
||||
@@ -423,6 +532,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||
FLAG_HAS_AD_TAG = 0x8
|
||||
FLAG_MAGIC = 0x1000
|
||||
FLAG_EXTMODE2 = 0x20000
|
||||
FLAG_PAD = 0x8000000
|
||||
FLAG_INTERMEDIATE = 0x20000000
|
||||
FLAG_ABRIDGED = 0x40000000
|
||||
FLAG_QUICKACK = 0x80000000
|
||||
@@ -437,6 +547,8 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||
flags |= FLAG_ABRIDGED
|
||||
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||
flags |= FLAG_INTERMEDIATE
|
||||
elif self.proto_tag == PROTO_TAG_SECURE:
|
||||
flags |= FLAG_INTERMEDIATE | FLAG_PAD
|
||||
|
||||
if extra.get("QUICKACK_FLAG"):
|
||||
flags |= FLAG_QUICKACK
|
||||
@@ -473,7 +585,7 @@ async def handle_handshake(reader, writer):
|
||||
decrypted = decryptor.decrypt(handshake)
|
||||
|
||||
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
|
||||
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE):
|
||||
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
||||
continue
|
||||
|
||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||
@@ -481,9 +593,50 @@ async def handle_handshake(reader, writer):
|
||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
||||
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
|
||||
|
||||
EMPTY_READ_BUF_SIZE = 4096
|
||||
while await reader.read(EMPTY_READ_BUF_SIZE):
|
||||
# just consume all the data
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def set_keepalive(sock, interval=40, attempts=5):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
if hasattr(socket, "TCP_KEEPIDLE"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
||||
if hasattr(socket, "TCP_KEEPINTVL"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
||||
if hasattr(socket, "TCP_KEEPCNT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||
|
||||
|
||||
def set_ack_timeout(sock, timeout):
|
||||
if hasattr(socket, "TCP_USER_TIMEOUT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
|
||||
|
||||
|
||||
def set_bufsizes(sock, recv_buf, send_buf):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||
|
||||
|
||||
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
|
||||
for attempt in range(max_attempts-1):
|
||||
try:
|
||||
task = asyncio.open_connection(addr, port, limit=limit)
|
||||
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||
return reader_tgt, writer_tgt
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
continue
|
||||
|
||||
# the last attempt
|
||||
task = asyncio.open_connection(addr, port, limit=limit)
|
||||
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
|
||||
return reader_tgt, writer_tgt
|
||||
|
||||
|
||||
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
||||
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
||||
@@ -502,15 +655,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
dc = TG_DATACENTERS_V4[dc_idx]
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
|
||||
limit=READ_BUF_SIZE)
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(
|
||||
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
except OSError as E:
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||
|
||||
while True:
|
||||
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
|
||||
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
|
||||
@@ -573,21 +729,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
|
||||
return key, iv
|
||||
|
||||
|
||||
def set_keepalive(sock, interval=40, attempts=5):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
if hasattr(socket, "TCP_KEEPIDLE"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
||||
if hasattr(socket, "TCP_KEEPINTVL"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
||||
if hasattr(socket, "TCP_KEEPCNT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||
|
||||
|
||||
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||
|
||||
|
||||
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
START_SEQ_NO = -2
|
||||
NONCE_LEN = 16
|
||||
@@ -615,16 +756,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE)
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"))
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
|
||||
timeout=TG_CONNECT_TIMEOUT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("Got connection refused while trying to connect to", addr, port)
|
||||
return False
|
||||
except OSError as E:
|
||||
except (OSError, asyncio.TimeoutError) as E:
|
||||
print_err("Unable to connect to", addr, port)
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||
|
||||
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
||||
|
||||
key_selector = PROXY_SECRET[:4]
|
||||
@@ -639,7 +782,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
|
||||
old_reader = reader_tgt
|
||||
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
||||
ans = await reader_tgt.read(READ_BUF_SIZE)
|
||||
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
|
||||
|
||||
if len(ans) != RPC_NONCE_ANS_LEN:
|
||||
return False
|
||||
@@ -720,12 +863,17 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
|
||||
|
||||
async def handle_client(reader_clt, writer_clt):
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
|
||||
set_bufsizes(writer_clt.get_extra_info("socket"))
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
|
||||
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
|
||||
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
||||
|
||||
try:
|
||||
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
|
||||
timeout=CLIENT_HANDSHAKE_TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
return
|
||||
|
||||
clt_data = await handle_handshake(reader_clt, writer_clt)
|
||||
if not clt_data:
|
||||
writer_clt.transport.abort()
|
||||
return
|
||||
|
||||
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
|
||||
@@ -742,7 +890,6 @@ async def handle_client(reader_clt, writer_clt):
|
||||
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
||||
|
||||
if not tg_data:
|
||||
writer_clt.transport.abort()
|
||||
return
|
||||
|
||||
reader_tg, writer_tg = tg_data
|
||||
@@ -766,14 +913,16 @@ async def handle_client(reader_clt, writer_clt):
|
||||
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
||||
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
||||
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
||||
elif proto_tag == PROTO_TAG_SECURE:
|
||||
reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt)
|
||||
writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt)
|
||||
else:
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(rd, wr, user):
|
||||
update_stats(user, curr_connects_x2=1)
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(READ_BUF_SIZE)
|
||||
data = await rd.read(rd_buf_size)
|
||||
if isinstance(data, tuple):
|
||||
data, extra = data
|
||||
else:
|
||||
@@ -782,27 +931,36 @@ async def handle_client(reader_clt, writer_clt):
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
wr.close()
|
||||
return
|
||||
else:
|
||||
update_stats(user, octets=len(data))
|
||||
update_stats(user, octets=len(data), msgs=1)
|
||||
wr.write(data, extra)
|
||||
await wr.drain()
|
||||
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e:
|
||||
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
||||
# print_err(e)
|
||||
pass
|
||||
finally:
|
||||
wr.transport.abort()
|
||||
update_stats(user, curr_connects_x2=-1)
|
||||
|
||||
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
|
||||
asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
|
||||
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
||||
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
||||
|
||||
update_stats(user, curr_connects=1)
|
||||
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
|
||||
update_stats(user, curr_connects=-1)
|
||||
|
||||
task_tg_to_clt.cancel()
|
||||
task_clt_to_tg.cancel()
|
||||
|
||||
writer_tg.transport.abort()
|
||||
|
||||
|
||||
async def handle_client_wrapper(reader, writer):
|
||||
try:
|
||||
await handle_client(reader, writer)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
||||
pass
|
||||
finally:
|
||||
writer.transport.abort()
|
||||
|
||||
|
||||
@@ -813,9 +971,9 @@ async def stats_printer():
|
||||
|
||||
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
||||
for user, stat in stats.items():
|
||||
print("%s: %d connects (%d current), %.2f MB" % (
|
||||
user, stat["connects"], stat["curr_connects_x2"] // 2,
|
||||
stat["octets"] / 1000000))
|
||||
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
|
||||
user, stat["connects"], stat["curr_connects"],
|
||||
stat["octets"] / 1000000, stat["msgs"]))
|
||||
print(flush=True)
|
||||
|
||||
|
||||
@@ -905,7 +1063,7 @@ def init_ip_info():
|
||||
TIMEOUT = 5
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f:
|
||||
with urllib.request.urlopen('http://ipv4.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||
if f.status != 200:
|
||||
raise Exception("Invalid status code")
|
||||
my_ip_info["ipv4"] = f.read().decode().strip()
|
||||
@@ -914,7 +1072,7 @@ def init_ip_info():
|
||||
|
||||
if PREFER_IPV6:
|
||||
try:
|
||||
with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f:
|
||||
with urllib.request.urlopen('http://ipv6.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||
if f.status != 200:
|
||||
raise Exception("Invalid status code")
|
||||
my_ip_info["ipv6"] = f.read().decode().strip()
|
||||
@@ -943,6 +1101,10 @@ def print_tg_info():
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
||||
|
||||
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
|
||||
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
||||
print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True)
|
||||
|
||||
|
||||
def loop_exception_handler(loop, context):
|
||||
exception = context.get("exception")
|
||||
@@ -950,15 +1112,24 @@ def loop_exception_handler(loop, context):
|
||||
if exception:
|
||||
if isinstance(exception, TimeoutError):
|
||||
if transport:
|
||||
print_err("Timeout, killing transport")
|
||||
transport.abort()
|
||||
return
|
||||
if isinstance(exception, OSError):
|
||||
IGNORE_ERRNO = {
|
||||
10038 # operation on non-socket on Windows, likely because fd == -1
|
||||
10038, # operation on non-socket on Windows, likely because fd == -1
|
||||
121, # the semaphore timeout period has expired on Windows
|
||||
}
|
||||
|
||||
FORCE_CLOSE_ERRNO = {
|
||||
113, # no route to host
|
||||
|
||||
}
|
||||
if exception.errno in IGNORE_ERRNO:
|
||||
return
|
||||
elif exception.errno in FORCE_CLOSE_ERRNO:
|
||||
if transport:
|
||||
transport.abort()
|
||||
return
|
||||
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
@@ -966,7 +1137,7 @@ def loop_exception_handler(loop, context):
|
||||
def main():
|
||||
init_stats()
|
||||
|
||||
if sys.platform == 'win32':
|
||||
if sys.platform == "win32":
|
||||
loop = asyncio.ProactorEventLoop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
@@ -980,13 +1151,15 @@ def main():
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
task_v4 = asyncio.start_server(handle_client_wrapper,
|
||||
'0.0.0.0', PORT, limit=READ_BUF_SIZE, loop=loop)
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
|
||||
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
|
||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||
server_v4 = loop.run_until_complete(task_v4)
|
||||
|
||||
if socket.has_ipv6:
|
||||
task_v6 = asyncio.start_server(handle_client_wrapper,
|
||||
'::', PORT, limit=READ_BUF_SIZE, loop=loop)
|
||||
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
|
||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||
server_v6 = loop.run_until_complete(task_v6)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user