|
|
|
@@ -12,9 +12,55 @@ import binascii
|
|
|
|
import sys
|
|
|
|
import sys
|
|
|
|
import re
|
|
|
|
import re
|
|
|
|
import runpy
|
|
|
|
import runpy
|
|
|
|
|
|
|
|
import signal
|
|
|
|
|
|
|
|
import http.server
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|
|
|
|
import uvloop
|
|
|
|
|
|
|
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def try_use_cryptography_module():
|
|
|
|
|
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_aes_ctr(key, iv):
|
|
|
|
|
|
|
|
class EncryptorAdapter:
|
|
|
|
|
|
|
|
def __init__(self, cipher):
|
|
|
|
|
|
|
|
self.encryptor = cipher.encryptor()
|
|
|
|
|
|
|
|
self.decryptor = cipher.decryptor()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encrypt(self, data):
|
|
|
|
|
|
|
|
return self.encryptor.update(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt(self, data):
|
|
|
|
|
|
|
|
return self.decryptor.update(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
iv_bytes = int.to_bytes(iv, 16, "big")
|
|
|
|
|
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend())
|
|
|
|
|
|
|
|
return EncryptorAdapter(cipher)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_aes_cbc(key, iv):
|
|
|
|
|
|
|
|
class EncryptorAdapter:
|
|
|
|
|
|
|
|
def __init__(self, cipher):
|
|
|
|
|
|
|
|
self.encryptor = cipher.encryptor()
|
|
|
|
|
|
|
|
self.decryptor = cipher.decryptor()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encrypt(self, data):
|
|
|
|
|
|
|
|
return self.encryptor.update(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt(self, data):
|
|
|
|
|
|
|
|
return self.decryptor.update(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend())
|
|
|
|
|
|
|
|
return EncryptorAdapter(cipher)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return create_aes_ctr, create_aes_cbc
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def try_use_pycrypto_or_pycryptodome_module():
|
|
|
|
from Crypto.Cipher import AES
|
|
|
|
from Crypto.Cipher import AES
|
|
|
|
from Crypto.Util import Counter
|
|
|
|
from Crypto.Util import Counter
|
|
|
|
|
|
|
|
|
|
|
|
@@ -25,11 +71,16 @@ try:
|
|
|
|
def create_aes_cbc(key, iv):
|
|
|
|
def create_aes_cbc(key, iv):
|
|
|
|
return AES.new(key, AES.MODE_CBC, iv)
|
|
|
|
return AES.new(key, AES.MODE_CBC, iv)
|
|
|
|
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
return create_aes_ctr, create_aes_cbc
|
|
|
|
print("Failed to find pycryptodome or pycrypto, using slow AES implementation",
|
|
|
|
|
|
|
|
flush=True, file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
|
|
def use_slow_bundled_cryptography_module():
|
|
|
|
import pyaes
|
|
|
|
import pyaes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg = "To make the program a *lot* faster, please install cryptography module: "
|
|
|
|
|
|
|
|
msg += "pip install cryptography\n"
|
|
|
|
|
|
|
|
print(msg, flush=True, file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
|
|
def create_aes_ctr(key, iv):
|
|
|
|
def create_aes_ctr(key, iv):
|
|
|
|
ctr = pyaes.Counter(iv)
|
|
|
|
ctr = pyaes.Counter(iv)
|
|
|
|
return pyaes.AESModeOfOperationCTR(key, ctr)
|
|
|
|
return pyaes.AESModeOfOperationCTR(key, ctr)
|
|
|
|
@@ -49,8 +100,17 @@ except ImportError:
|
|
|
|
|
|
|
|
|
|
|
|
mode = pyaes.AESModeOfOperationCBC(key, iv)
|
|
|
|
mode = pyaes.AESModeOfOperationCBC(key, iv)
|
|
|
|
return EncryptorAdapter(mode)
|
|
|
|
return EncryptorAdapter(mode)
|
|
|
|
|
|
|
|
return create_aes_ctr, create_aes_cbc
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
create_aes_ctr, create_aes_cbc = try_use_cryptography_module()
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
create_aes_ctr, create_aes_cbc = try_use_pycrypto_or_pycryptodome_module()
|
|
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
|
|
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
import resource
|
|
|
|
import resource
|
|
|
|
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
|
|
|
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
|
|
|
@@ -60,13 +120,29 @@ except (ValueError, OSError):
|
|
|
|
except ImportError:
|
|
|
|
except ImportError:
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
if len(sys.argv) > 1:
|
|
|
|
if hasattr(signal, 'SIGUSR1'):
|
|
|
|
|
|
|
|
def debug_signal(signum, frame):
|
|
|
|
|
|
|
|
import pdb
|
|
|
|
|
|
|
|
pdb.set_trace()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
signal.signal(signal.SIGUSR1, debug_signal)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(sys.argv) < 2:
|
|
|
|
|
|
|
|
config = runpy.run_module("config")
|
|
|
|
|
|
|
|
elif len(sys.argv) == 2:
|
|
|
|
config = runpy.run_path(sys.argv[1])
|
|
|
|
config = runpy.run_path(sys.argv[1])
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
config = runpy.run_module("config")
|
|
|
|
# undocumented way of launching
|
|
|
|
|
|
|
|
config = {}
|
|
|
|
|
|
|
|
config["PORT"] = int(sys.argv[1])
|
|
|
|
|
|
|
|
secrets = sys.argv[2].split(",")
|
|
|
|
|
|
|
|
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
|
|
|
|
|
|
|
if len(sys.argv) > 3:
|
|
|
|
|
|
|
|
config["AD_TAG"] = sys.argv[3]
|
|
|
|
|
|
|
|
|
|
|
|
PORT = config["PORT"]
|
|
|
|
PORT = config["PORT"]
|
|
|
|
USERS = config["USERS"]
|
|
|
|
USERS = config["USERS"]
|
|
|
|
|
|
|
|
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
|
|
|
|
|
|
|
|
|
|
|
# load advanced settings
|
|
|
|
# load advanced settings
|
|
|
|
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
|
|
|
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
|
|
|
@@ -74,10 +150,15 @@ PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
|
|
|
FAST_MODE = config.get("FAST_MODE", True)
|
|
|
|
FAST_MODE = config.get("FAST_MODE", True)
|
|
|
|
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
|
|
|
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
|
|
|
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
|
|
|
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
|
|
|
|
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
|
|
|
|
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 8192)
|
|
|
|
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
|
|
|
|
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
|
|
|
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
|
|
|
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
|
|
|
|
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
|
|
|
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
|
|
|
|
|
|
|
PROMETHEUS_HOST = config.get("PROMETHEUS_HOST")
|
|
|
|
|
|
|
|
PROMETHEUS_PORT = config.get("PROMETHEUS_PORT")
|
|
|
|
|
|
|
|
# PROMETHEUS_SCRAPERS is a safety net in case of missing firewall,
|
|
|
|
|
|
|
|
# set it to false value to disable.
|
|
|
|
|
|
|
|
PROMETHEUS_SCRAPERS = config.get("PROMETHEUS_SCRAPERS", {'127.0.0.1', '::1'})
|
|
|
|
|
|
|
|
|
|
|
|
TG_DATACENTER_PORT = 443
|
|
|
|
TG_DATACENTER_PORT = 443
|
|
|
|
|
|
|
|
|
|
|
|
@@ -128,6 +209,7 @@ DC_IDX_POS = 60
|
|
|
|
|
|
|
|
|
|
|
|
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
|
|
|
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
|
|
|
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
|
|
|
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
|
|
|
|
|
|
|
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
|
|
|
|
|
|
|
|
|
|
|
CBC_PADDING = 16
|
|
|
|
CBC_PADDING = 16
|
|
|
|
PADDING_FILLER = b"\x04\x00\x00\x00"
|
|
|
|
PADDING_FILLER = b"\x04\x00\x00\x00"
|
|
|
|
@@ -147,13 +229,13 @@ def init_stats():
|
|
|
|
stats = {user: collections.Counter() for user in USERS}
|
|
|
|
stats = {user: collections.Counter() for user in USERS}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_stats(user, connects=0, curr_connects_x2=0, octets=0):
|
|
|
|
def update_stats(user, connects=0, curr_connects=0, octets=0):
|
|
|
|
global stats
|
|
|
|
global stats
|
|
|
|
|
|
|
|
|
|
|
|
if user not in stats:
|
|
|
|
if user not in stats:
|
|
|
|
stats[user] = collections.Counter()
|
|
|
|
stats[user] = collections.Counter()
|
|
|
|
|
|
|
|
|
|
|
|
stats[user].update(connects=connects, curr_connects_x2=curr_connects_x2,
|
|
|
|
stats[user].update(connects=connects, curr_connects=curr_connects,
|
|
|
|
octets=octets)
|
|
|
|
octets=octets)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -354,6 +436,10 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
|
|
|
|
|
|
|
|
|
|
|
|
data = await self.upstream.readexactly(msg_len)
|
|
|
|
data = await self.upstream.readexactly(msg_len)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if msg_len % 4 != 0:
|
|
|
|
|
|
|
|
cut_border = msg_len - (msg_len % 4)
|
|
|
|
|
|
|
|
data = data[:cut_border]
|
|
|
|
|
|
|
|
|
|
|
|
return data, extra
|
|
|
|
return data, extra
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -473,7 +559,7 @@ async def handle_handshake(reader, writer):
|
|
|
|
decrypted = decryptor.decrypt(handshake)
|
|
|
|
decrypted = decryptor.decrypt(handshake)
|
|
|
|
|
|
|
|
|
|
|
|
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
|
|
|
|
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
|
|
|
|
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE):
|
|
|
|
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
|
|
|
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
|
|
|
@@ -481,9 +567,30 @@ async def handle_handshake(reader, writer):
|
|
|
|
reader = CryptoWrappedStreamReader(reader, decryptor)
|
|
|
|
reader = CryptoWrappedStreamReader(reader, decryptor)
|
|
|
|
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
|
|
|
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
|
|
|
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
|
|
|
|
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EMPTY_READ_BUF_SIZE = 4096
|
|
|
|
|
|
|
|
while await reader.read(EMPTY_READ_BUF_SIZE):
|
|
|
|
|
|
|
|
# just consume all the data
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_keepalive(sock, interval=40, attempts=5):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPIDLE"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPINTVL"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPCNT"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_bufsizes(sock, recv_buf, send_buf):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
|
|
|
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
|
|
|
|
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
|
|
|
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
|
|
|
|
@@ -503,7 +610,10 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
|
|
|
|
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
|
|
|
|
limit=READ_BUF_SIZE)
|
|
|
|
limit=TO_CLT_BUFSIZE)
|
|
|
|
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
|
|
|
|
|
|
|
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
|
|
|
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
@@ -573,21 +683,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
|
|
|
|
return key, iv
|
|
|
|
return key, iv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_keepalive(sock, interval=40, attempts=5):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPIDLE"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPINTVL"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
|
|
|
|
|
|
|
if hasattr(socket, "TCP_KEEPCNT"):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
|
|
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
START_SEQ_NO = -2
|
|
|
|
START_SEQ_NO = -2
|
|
|
|
NONCE_LEN = 16
|
|
|
|
NONCE_LEN = 16
|
|
|
|
@@ -615,9 +710,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
|
|
|
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE)
|
|
|
|
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=TO_CLT_BUFSIZE)
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
set_keepalive(writer_tgt.get_extra_info("socket"))
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"))
|
|
|
|
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
except ConnectionRefusedError as E:
|
|
|
|
print_err("Got connection refused while trying to connect to", addr, port)
|
|
|
|
print_err("Got connection refused while trying to connect to", addr, port)
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
@@ -639,7 +734,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
|
|
|
|
|
|
|
old_reader = reader_tgt
|
|
|
|
old_reader = reader_tgt
|
|
|
|
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
|
|
|
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
|
|
|
ans = await reader_tgt.read(READ_BUF_SIZE)
|
|
|
|
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
|
|
|
|
|
|
|
|
|
|
|
|
if len(ans) != RPC_NONCE_ANS_LEN:
|
|
|
|
if len(ans) != RPC_NONCE_ANS_LEN:
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
@@ -721,11 +816,15 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_client(reader_clt, writer_clt):
|
|
|
|
async def handle_client(reader_clt, writer_clt):
|
|
|
|
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
|
|
|
|
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
|
|
|
|
set_bufsizes(writer_clt.get_extra_info("socket"))
|
|
|
|
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
|
|
|
|
|
|
|
|
timeout=CLIENT_HANDSHAKE_TIMEOUT)
|
|
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
clt_data = await handle_handshake(reader_clt, writer_clt)
|
|
|
|
|
|
|
|
if not clt_data:
|
|
|
|
if not clt_data:
|
|
|
|
writer_clt.transport.abort()
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
|
|
|
|
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
|
|
|
|
@@ -742,7 +841,6 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
|
|
|
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
|
|
|
|
|
|
|
|
|
|
|
if not tg_data:
|
|
|
|
if not tg_data:
|
|
|
|
writer_clt.transport.abort()
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
reader_tg, writer_tg = tg_data
|
|
|
|
reader_tg, writer_tg = tg_data
|
|
|
|
@@ -763,17 +861,16 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
|
|
|
if proto_tag == PROTO_TAG_ABRIDGED:
|
|
|
|
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
|
|
|
reader_clt = MTProtoCompactFrameStreamReader(reader_clt)
|
|
|
|
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
|
|
|
writer_clt = MTProtoCompactFrameStreamWriter(writer_clt)
|
|
|
|
elif proto_tag == PROTO_TAG_INTERMEDIATE:
|
|
|
|
elif proto_tag in (PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
|
|
|
|
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
|
|
|
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
|
|
|
|
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
|
|
|
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
async def connect_reader_to_writer(rd, wr, user):
|
|
|
|
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
|
|
|
|
update_stats(user, curr_connects_x2=1)
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
data = await rd.read(READ_BUF_SIZE)
|
|
|
|
data = await rd.read(rd_buf_size)
|
|
|
|
if isinstance(data, tuple):
|
|
|
|
if isinstance(data, tuple):
|
|
|
|
data, extra = data
|
|
|
|
data, extra = data
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
@@ -782,27 +879,98 @@ async def handle_client(reader_clt, writer_clt):
|
|
|
|
if not data:
|
|
|
|
if not data:
|
|
|
|
wr.write_eof()
|
|
|
|
wr.write_eof()
|
|
|
|
await wr.drain()
|
|
|
|
await wr.drain()
|
|
|
|
wr.close()
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
update_stats(user, octets=len(data))
|
|
|
|
update_stats(user, octets=len(data))
|
|
|
|
wr.write(data, extra)
|
|
|
|
wr.write(data, extra)
|
|
|
|
await wr.drain()
|
|
|
|
await wr.drain()
|
|
|
|
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e:
|
|
|
|
except (OSError, asyncio.streams.IncompleteReadError) as e:
|
|
|
|
# print_err(e)
|
|
|
|
# print_err(e)
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
finally:
|
|
|
|
|
|
|
|
wr.transport.abort()
|
|
|
|
|
|
|
|
update_stats(user, curr_connects_x2=-1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
|
|
|
|
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
|
|
|
|
asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
|
|
|
|
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
|
|
|
|
|
|
|
|
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
|
|
|
|
|
|
|
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
update_stats(user, curr_connects=1)
|
|
|
|
|
|
|
|
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
|
|
|
|
|
|
|
|
update_stats(user, curr_connects=-1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
task_tg_to_clt.cancel()
|
|
|
|
|
|
|
|
task_clt_to_tg.cancel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
writer_tg.transport.abort()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def http_reply(writer, line, body=b"", eof=False):
|
|
|
|
|
|
|
|
BaseHTTPRequestHandler = http.server.BaseHTTPRequestHandler
|
|
|
|
|
|
|
|
msg = (
|
|
|
|
|
|
|
|
"HTTP/1.1 {}\r\n"
|
|
|
|
|
|
|
|
"Server: mtprotoproxy\r\n"
|
|
|
|
|
|
|
|
"Date: {}\r\n"
|
|
|
|
|
|
|
|
"Content-Type: text/plain\r\n"
|
|
|
|
|
|
|
|
"Content-Length: {:d}\r\n"
|
|
|
|
|
|
|
|
).format(
|
|
|
|
|
|
|
|
line,
|
|
|
|
|
|
|
|
BaseHTTPRequestHandler.date_time_string(BaseHTTPRequestHandler),
|
|
|
|
|
|
|
|
len(body)
|
|
|
|
|
|
|
|
).encode("ascii")
|
|
|
|
|
|
|
|
if eof:
|
|
|
|
|
|
|
|
msg += b"Connection: close\r\n"
|
|
|
|
|
|
|
|
msg += b"\r\n" + body
|
|
|
|
|
|
|
|
writer.write(msg)
|
|
|
|
|
|
|
|
await writer.drain()
|
|
|
|
|
|
|
|
if eof:
|
|
|
|
|
|
|
|
writer.write_eof()
|
|
|
|
|
|
|
|
writer.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_promstats(reader, writer):
|
|
|
|
|
|
|
|
set_keepalive(writer.get_extra_info("socket"), 75) # prometheus should never go away for a long time
|
|
|
|
|
|
|
|
if PROMETHEUS_SCRAPERS and writer.get_extra_info('peername')[0] not in PROMETHEUS_SCRAPERS:
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
while True: # Keep-Alive
|
|
|
|
|
|
|
|
request = await reader.readuntil(b"\r\n\r\n")
|
|
|
|
|
|
|
|
if request.startswith(b"GET /metrics HTTP/1."):
|
|
|
|
|
|
|
|
promstat = (
|
|
|
|
|
|
|
|
"# HELP mtproxy_pump_bytes Number of post-handshake bytes pumped in both directions.\n"
|
|
|
|
|
|
|
|
"# TYPE mtproxy_pump_bytes counter\n"
|
|
|
|
|
|
|
|
) + "".join(
|
|
|
|
|
|
|
|
"mtproxy_pump_bytes{{user=\"{}\"}} {:d}\n".format(u, stats[u]["octets"])
|
|
|
|
|
|
|
|
for u in stats
|
|
|
|
|
|
|
|
) + (
|
|
|
|
|
|
|
|
"# HELP mtproxy_connections Current number of post-handshake client connections.\n"
|
|
|
|
|
|
|
|
"# TYPE mtproxy_connections gauge\n"
|
|
|
|
|
|
|
|
) + "".join(
|
|
|
|
|
|
|
|
"mtproxy_connections{{user=\"{}\"}} {:d}\n".format(u, stats[u]["curr_connects"])
|
|
|
|
|
|
|
|
for u in stats
|
|
|
|
|
|
|
|
) + (
|
|
|
|
|
|
|
|
"# HELP mtproxy_connections_total Total number of post-handshake client connections served.\n"
|
|
|
|
|
|
|
|
"# TYPE mtproxy_connections_total counter\n"
|
|
|
|
|
|
|
|
) + "".join(
|
|
|
|
|
|
|
|
"mtproxy_connections_total{{user=\"{}\"}} {:d}\n".format(u, stats[u]["connects"])
|
|
|
|
|
|
|
|
for u in stats
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
await http_reply(writer, "200 OK", promstat.encode("ascii"))
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
await http_reply(writer, "400 Bad Request", b"Bad Request.\n", eof=True)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_promstats_wrapper(reader, writer):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
await handle_promstats(reader, writer)
|
|
|
|
|
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
|
|
|
writer.transport.abort()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def handle_client_wrapper(reader, writer):
|
|
|
|
async def handle_client_wrapper(reader, writer):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
await handle_client(reader, writer)
|
|
|
|
await handle_client(reader, writer)
|
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
|
|
|
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
finally:
|
|
|
|
writer.transport.abort()
|
|
|
|
writer.transport.abort()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -814,7 +982,7 @@ async def stats_printer():
|
|
|
|
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
|
|
|
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
|
|
|
|
for user, stat in stats.items():
|
|
|
|
for user, stat in stats.items():
|
|
|
|
print("%s: %d connects (%d current), %.2f MB" % (
|
|
|
|
print("%s: %d connects (%d current), %.2f MB" % (
|
|
|
|
user, stat["connects"], stat["curr_connects_x2"] // 2,
|
|
|
|
user, stat["connects"], stat["curr_connects"],
|
|
|
|
stat["octets"] / 1000000))
|
|
|
|
stat["octets"] / 1000000))
|
|
|
|
print(flush=True)
|
|
|
|
print(flush=True)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -943,6 +1111,10 @@ def print_tg_info():
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
|
|
|
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
|
|
|
|
|
|
|
|
params_encodeded = urllib.parse.urlencode(params, safe=':')
|
|
|
|
|
|
|
|
print("{}: tg://proxy?{} (beta)".format(user, params_encodeded), flush=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def loop_exception_handler(loop, context):
|
|
|
|
def loop_exception_handler(loop, context):
|
|
|
|
exception = context.get("exception")
|
|
|
|
exception = context.get("exception")
|
|
|
|
@@ -966,7 +1138,7 @@ def loop_exception_handler(loop, context):
|
|
|
|
def main():
|
|
|
|
def main():
|
|
|
|
init_stats()
|
|
|
|
init_stats()
|
|
|
|
|
|
|
|
|
|
|
|
if sys.platform == 'win32':
|
|
|
|
if sys.platform == "win32":
|
|
|
|
loop = asyncio.ProactorEventLoop()
|
|
|
|
loop = asyncio.ProactorEventLoop()
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -980,13 +1152,26 @@ def main():
|
|
|
|
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
|
|
|
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
|
|
|
asyncio.ensure_future(middle_proxy_updater_task)
|
|
|
|
asyncio.ensure_future(middle_proxy_updater_task)
|
|
|
|
|
|
|
|
|
|
|
|
task_v4 = asyncio.start_server(handle_client_wrapper,
|
|
|
|
if PROMETHEUS_PORT:
|
|
|
|
'0.0.0.0', PORT, limit=READ_BUF_SIZE, loop=loop)
|
|
|
|
task_promstats = asyncio.start_server(handle_promstats_wrapper, PROMETHEUS_HOST, PROMETHEUS_PORT,
|
|
|
|
|
|
|
|
limit=4096, # http request is quite small
|
|
|
|
|
|
|
|
backlog=8, # there are few prometheus collectors
|
|
|
|
|
|
|
|
reuse_address=True, # that's still server, TIME_WAIT should not block restart
|
|
|
|
|
|
|
|
reuse_port=False, # if you reuse statistics port for several instances, you're doing it wrong!
|
|
|
|
|
|
|
|
loop=loop)
|
|
|
|
|
|
|
|
server_promstats = loop.run_until_complete(task_promstats)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
server_promstats = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
task_v4 = asyncio.start_server(handle_client_wrapper, '0.0.0.0', PORT,
|
|
|
|
|
|
|
|
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
|
|
|
server_v4 = loop.run_until_complete(task_v4)
|
|
|
|
server_v4 = loop.run_until_complete(task_v4)
|
|
|
|
|
|
|
|
|
|
|
|
if socket.has_ipv6:
|
|
|
|
if socket.has_ipv6:
|
|
|
|
task_v6 = asyncio.start_server(handle_client_wrapper,
|
|
|
|
task_v6 = asyncio.start_server(handle_client_wrapper, '::', PORT,
|
|
|
|
'::', PORT, limit=READ_BUF_SIZE, loop=loop)
|
|
|
|
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
|
|
|
server_v6 = loop.run_until_complete(task_v6)
|
|
|
|
server_v6 = loop.run_until_complete(task_v6)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
@@ -1003,6 +1188,10 @@ def main():
|
|
|
|
server_v6.close()
|
|
|
|
server_v6.close()
|
|
|
|
loop.run_until_complete(server_v6.wait_closed())
|
|
|
|
loop.run_until_complete(server_v6.wait_closed())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if server_promstats is not None:
|
|
|
|
|
|
|
|
server_promstats.close()
|
|
|
|
|
|
|
|
loop.run_until_complete(server_promstats.wait_closed())
|
|
|
|
|
|
|
|
|
|
|
|
loop.close()
|
|
|
|
loop.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|