51 Commits
v0.9 ... v1.0.2

Author SHA1 Message Date
Alexander Bersenev
b082d06f9b Merge branch 'master' into stable 2018-11-14 02:46:31 +05:00
Alexander Bersenev
5187725088 Revert "just for history: attempting to pretent cloudfare service"
This reverts commit dd1d0a6262.
2018-11-13 02:18:13 +05:00
Alexander Bersenev
dd1d0a6262 just for history: attempting to pretent cloudfare service 2018-11-13 02:18:04 +05:00
Alexander Bersenev
d5daf8bbdf add secure only mode example in config 2018-11-13 01:11:24 +05:00
Alexander Bersenev
780dbc5866 document all advanced options 2018-09-20 04:03:32 +05:00
Alexander Bersenev
298614b1f6 add an ability to specify listen address 2018-09-16 12:50:41 +05:00
Alexander Bersenev
f5c30c6115 secure only mode 2018-08-29 00:04:58 +05:00
Alexander Bersenev
e4473d6374 Merge branch 'master' into stable 2018-08-01 21:32:34 +05:00
Alexander Bersenev
c2278501bf change the ip obtaining service 2018-08-01 21:30:05 +05:00
Alexander Bersenev
534b26cd04 Merge branch 'master' into stable 2018-07-13 12:57:46 +05:00
Alexander Bersenev
c1bef68602 update alpine linux in the image 2018-07-10 19:22:04 +05:00
Alexander Bersenev
8e79dacd26 make the passive protocol detection harder 2018-07-10 15:48:39 +05:00
Alexander Bersenev
520a26aa89 fix typo 2018-07-08 23:52:57 +05:00
Alexander Bersenev
647b6f6edd add connect retrying 2018-07-08 19:05:45 +05:00
Alexander Bersenev
c2ad0de665 increase default buffer limit 2018-07-08 17:48:13 +05:00
Alexander Bersenev
47f7c088af Merge branch 'master' into stable 2018-07-05 16:12:56 +05:00
Alexander Bersenev
6f8bfdb568 add timeout error to errno 2018-07-05 15:45:53 +05:00
Alexander Bersenev
0a7e2d85b8 shrink timeouts, removed annoying message about timeouts 2018-07-04 13:54:27 +05:00
Alexander Bersenev
0caf5f89a8 count msgs 2018-07-02 02:28:43 +05:00
Alexander Bersenev
33fabe7590 ignore no route to host error 2018-07-02 00:47:35 +05:00
Alexander Bersenev
c0ed5e1e38 Merge branch 'master' of github.com:alexbers/mtprotoproxy 2018-07-01 16:45:34 +05:00
Alexander Bersenev
bcac5eb878 add sending timeout 2018-07-01 16:43:54 +05:00
Alexander Bersenev
b38084bf36 add information about Prometheus to readme 2018-07-01 01:40:30 +05:00
Alexander Bersenev
675d5a6aba send buffer size on the direct handshake also 2018-06-30 23:09:43 +05:00
Alexander Bersenev
b31768165c buffers redesign 2018-06-30 22:54:11 +05:00
Alexander Bersenev
372861ac6e support for secure mode 2018-06-29 18:51:47 +05:00
Alexander Bersenev
6a27096618 add secure tag 2018-06-29 17:52:37 +05:00
Alexander Bersenev
93f71f5ec2 Merge branch 'master' into stable 2018-06-29 12:48:58 +05:00
Alexander Bersenev
03f7ca1d4c more reliable logic to check reuseport availability 2018-06-29 02:00:46 +05:00
Alexander Bersenev
3477402c0d use cryptography module in docker file, do not copy pyaes 2018-06-29 01:07:16 +05:00
Alexander Bersenev
532021ab87 support for cryptography module and advise to use it 2018-06-28 20:47:12 +05:00
Alexander Bersenev
6900cdda43 Merge branch 'master' of github.com:alexbers/mtprotoproxy 2018-06-27 20:04:28 +05:00
Alexander Bersenev
ec1c6b4fb6 we need at least one undocumented launching way :) 2018-06-27 20:04:05 +05:00
Alexander Bersenev
63b689e3bf Add a section about advanced usage 2018-06-27 18:25:40 +05:00
Alexander Bersenev
71e3206b19 check if signal exists before placing it. It can absent in some OSes, like Windows 2018-06-27 13:33:51 +05:00
Alexander Bersenev
7eea7d3201 replace infinite loop with timeout with while loop, when the client is bad 2018-06-27 11:13:42 +05:00
Alexander Bersenev
2e86308e90 Revert "Revert "simplify dissconnect logic". The idea with task cancelation doesn't work"
This reverts commit 32d3bffc7b.
2018-06-27 11:11:50 +05:00
Alexander Bersenev
d74bb68f03 Revert "Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work"
This reverts commit b74079c433.
2018-06-27 11:11:45 +05:00
Alexander Bersenev
5f35b4ed0a add debugging signal 2018-06-27 01:14:44 +05:00
Alexander Bersenev
b74079c433 Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work
This reverts commit 444a1876b6.
2018-06-27 01:05:08 +05:00
Alexander Bersenev
32d3bffc7b Revert "simplify dissconnect logic". The idea with task cancelation doesn't work
This reverts commit a20b1c9929.
2018-06-27 01:04:06 +05:00
Alexander Bersenev
a20b1c9929 simplify dissconnect logic 2018-06-26 22:53:46 +05:00
Alexander Bersenev
444a1876b6 refactor task canceling a bit 2018-06-26 20:39:43 +05:00
Alexander Bersenev
ed088d9449 revert the last commit 2018-06-26 20:21:51 +05:00
Alexander Bersenev
accba06b45 count client stats only for successfull clients 2018-06-26 20:17:52 +05:00
Alexander Bersenev
bd3d9731d7 if the handshake failed, just consume all the data 2018-06-26 11:48:58 +05:00
Alexander Bersenev
9077ceb471 simplify current connects counting 2018-06-26 03:38:11 +05:00
Alexander Bersenev
d2ff0f61e4 add handshake timeout, refactor client handling a bit 2018-06-26 03:24:45 +05:00
Alexander Bersenev
d56c995ee2 use uvloop if available 2018-06-22 15:26:33 +05:00
Alexander Bersenev
51c40903ab allows to bind on privilleged ports 2018-06-21 10:19:38 +05:00
Alexander Bersenev
e1d592cd84 enable port reuse on non-windows platforms 2018-06-19 21:51:02 +05:00
5 changed files with 277 additions and 73 deletions

View File

@@ -1,13 +1,13 @@
FROM alpine:3.6 FROM alpine:3.8
RUN adduser tgproxy -u 10000 -D RUN adduser tgproxy -u 10000 -D
RUN apk add --no-cache python3 py3-crypto ca-certificates RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
COPY mtprotoproxy.py config.py /home/tgproxy/ COPY mtprotoproxy.py config.py /home/tgproxy/
COPY pyaes/*.py /home/tgproxy/pyaes/
RUN chown -R tgproxy:tgproxy /home/tgproxy RUN chown -R tgproxy:tgproxy /home/tgproxy
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.6
USER tgproxy USER tgproxy

View File

@@ -16,4 +16,12 @@ To advertise a channel get a tag from **@MTProxybot** and write it to *config.py
## Performance ## ## Performance ##
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
the smallest VDS instance with 1 CPU core and 1024MB RAM. the VDS instance with 1 CPU core and 1024MB RAM.
## Advanced Usage ##
The proxy can be launched:
- with a custom config: `python3 mtprotoproxy.py [configfile]`
- several times, clients will be automaticaly balanced between instances
- using *PyPy* interprteter
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch

View File

@@ -8,3 +8,7 @@ USERS = {
# Tag for advertising, obtainable from @MTProxybot # Tag for advertising, obtainable from @MTProxybot
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d" # AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
# Uncommenting this do make a proxy harder to detect
# But it can be incompatible with old clients
# SECURE_ONLY = True

View File

@@ -3,5 +3,5 @@ services:
mtprotoproxy: mtprotoproxy:
build: . build: .
restart: unless-stopped restart: unless-stopped
mem_limit: 1024m
network_mode: "host" network_mode: "host"
# mem_limit: 1024m

View File

@@ -12,9 +12,54 @@ import binascii
import sys import sys
import re import re
import runpy import runpy
import signal
try: try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
def try_use_cryptography_module():
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def create_aes_ctr(key, iv):
class EncryptorAdapter:
def __init__(self, cipher):
self.encryptor = cipher.encryptor()
self.decryptor = cipher.decryptor()
def encrypt(self, data):
return self.encryptor.update(data)
def decrypt(self, data):
return self.decryptor.update(data)
iv_bytes = int.to_bytes(iv, 16, "big")
cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend())
return EncryptorAdapter(cipher)
def create_aes_cbc(key, iv):
class EncryptorAdapter:
def __init__(self, cipher):
self.encryptor = cipher.encryptor()
self.decryptor = cipher.decryptor()
def encrypt(self, data):
return self.encryptor.update(data)
def decrypt(self, data):
return self.decryptor.update(data)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend())
return EncryptorAdapter(cipher)
return create_aes_ctr, create_aes_cbc
def try_use_pycrypto_or_pycryptodome_module():
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util import Counter from Crypto.Util import Counter
@@ -25,11 +70,16 @@ try:
def create_aes_cbc(key, iv): def create_aes_cbc(key, iv):
return AES.new(key, AES.MODE_CBC, iv) return AES.new(key, AES.MODE_CBC, iv)
except ImportError: return create_aes_ctr, create_aes_cbc
print("Failed to find pycryptodome or pycrypto, using slow AES implementation",
flush=True, file=sys.stderr)
def use_slow_bundled_cryptography_module():
import pyaes import pyaes
msg = "To make the program a *lot* faster, please install cryptography module: "
msg += "pip install cryptography\n"
print(msg, flush=True, file=sys.stderr)
def create_aes_ctr(key, iv): def create_aes_ctr(key, iv):
ctr = pyaes.Counter(iv) ctr = pyaes.Counter(iv)
return pyaes.AESModeOfOperationCTR(key, ctr) return pyaes.AESModeOfOperationCTR(key, ctr)
@@ -49,8 +99,17 @@ except ImportError:
mode = pyaes.AESModeOfOperationCBC(key, iv) mode = pyaes.AESModeOfOperationCBC(key, iv)
return EncryptorAdapter(mode) return EncryptorAdapter(mode)
return create_aes_ctr, create_aes_cbc
try:
create_aes_ctr, create_aes_cbc = try_use_cryptography_module()
except ImportError:
try:
create_aes_ctr, create_aes_cbc = try_use_pycrypto_or_pycryptodome_module()
except ImportError:
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
try: try:
import resource import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE) soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
@@ -60,24 +119,57 @@ except (ValueError, OSError):
except ImportError: except ImportError:
pass pass
if len(sys.argv) > 1: if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
if len(sys.argv) < 2:
config = runpy.run_module("config")
elif len(sys.argv) == 2:
config = runpy.run_path(sys.argv[1]) config = runpy.run_path(sys.argv[1])
else: else:
config = runpy.run_module("config") # undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"] PORT = config["PORT"]
USERS = config["USERS"] USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings # load advanced settings
# if IPv6 avaliable, use it by default
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6) PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure # disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True) FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600) STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24) # delay in seconds between middle proxy info updates
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384) PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536) # max socket buffer size to the client direction, the more the faster, but more RAM hungry
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30) TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
AD_TAG = bytes.fromhex(config.get("AD_TAG", "")) # max socket buffer size to the telegram servers direction
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
# drop client after this timeout if the handshake fail
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
# if client doesn't confirm data for this number of seconds, it is dropped
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
# telegram servers connect timeout in seconds
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
# listen address for IPv4
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443 TG_DATACENTER_PORT = 443
@@ -128,6 +220,7 @@ DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef" PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee" PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16 CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00" PADDING_FILLER = b"\x04\x00\x00\x00"
@@ -147,14 +240,14 @@ def init_stats():
stats = {user: collections.Counter() for user in USERS} stats = {user: collections.Counter() for user in USERS}
def update_stats(user, connects=0, curr_connects_x2=0, octets=0): def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
global stats global stats
if user not in stats: if user not in stats:
stats[user] = collections.Counter() stats[user] = collections.Counter()
stats[user].update(connects=connects, curr_connects_x2=curr_connects_x2, stats[user].update(connects=connects, curr_connects=curr_connects,
octets=octets) octets=octets, msgs=msgs)
class LayeredStreamReaderBase: class LayeredStreamReaderBase:
@@ -353,7 +446,6 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
msg_len -= 0x80000000 msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len) data = await self.upstream.readexactly(msg_len)
return data, extra return data, extra
@@ -365,6 +457,38 @@ class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data) return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase):
async def read(self, buf_size):
msg_len_bytes = await self.upstream.readexactly(4)
msg_len = int.from_bytes(msg_len_bytes, "little")
extra = {}
if msg_len > 0x80000000:
extra["QUICKACK_FLAG"] = True
msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len)
if msg_len % 4 != 0:
cut_border = msg_len - (msg_len % 4)
data = data[:cut_border]
return data, extra
class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase):
def write(self, data, extra={}):
MAX_PADDING_LEN = 4
if extra.get("SIMPLE_ACK"):
# TODO: make this unpredictable
return self.upstream.write(data)
else:
padding_len = random.randrange(MAX_PADDING_LEN)
padding = bytearray([random.randrange(256) for i in range(padding_len)])
padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little')
return self.upstream.write(padded_data_len_bytes + data + padding)
class ProxyReqStreamReader(LayeredStreamReaderBase): class ProxyReqStreamReader(LayeredStreamReaderBase):
async def read(self, msg): async def read(self, msg):
RPC_PROXY_ANS = b"\x0d\xda\x03\x44" RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
@@ -423,6 +547,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
FLAG_HAS_AD_TAG = 0x8 FLAG_HAS_AD_TAG = 0x8
FLAG_MAGIC = 0x1000 FLAG_MAGIC = 0x1000
FLAG_EXTMODE2 = 0x20000 FLAG_EXTMODE2 = 0x20000
FLAG_PAD = 0x8000000
FLAG_INTERMEDIATE = 0x20000000 FLAG_INTERMEDIATE = 0x20000000
FLAG_ABRIDGED = 0x40000000 FLAG_ABRIDGED = 0x40000000
FLAG_QUICKACK = 0x80000000 FLAG_QUICKACK = 0x80000000
@@ -437,6 +562,8 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
flags |= FLAG_ABRIDGED flags |= FLAG_ABRIDGED
elif self.proto_tag == PROTO_TAG_INTERMEDIATE: elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
flags |= FLAG_INTERMEDIATE flags |= FLAG_INTERMEDIATE
elif self.proto_tag == PROTO_TAG_SECURE:
flags |= FLAG_INTERMEDIATE | FLAG_PAD
if extra.get("QUICKACK_FLAG"): if extra.get("QUICKACK_FLAG"):
flags |= FLAG_QUICKACK flags |= FLAG_QUICKACK
@@ -473,7 +600,10 @@ async def handle_handshake(reader, writer):
decrypted = decryptor.decrypt(handshake) decrypted = decryptor.decrypt(handshake)
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4] proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE): if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
continue
if SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
continue continue
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True) dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
@@ -481,9 +611,50 @@ async def handle_handshake(reader, writer):
reader = CryptoWrappedStreamReader(reader, decryptor) reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor) writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
EMPTY_READ_BUF_SIZE = 4096
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False return False
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_ack_timeout(sock, timeout):
if hasattr(socket, "TCP_USER_TIMEOUT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
def set_bufsizes(sock, recv_buf, send_buf):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
for attempt in range(max_attempts-1):
try:
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
except (OSError, asyncio.TimeoutError):
continue
# the last attempt
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None): async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_FIRST_CHARS = [b"\xef"] RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54", RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
@@ -502,15 +673,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
dc = TG_DATACENTERS_V4[dc_idx] dc = TG_DATACENTERS_V4[dc_idx]
try: try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT, reader_tgt, writer_tgt = await open_connection_tryer(
limit=READ_BUF_SIZE) dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E: except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False return False
except OSError as E: except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT) print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
return False return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
while True: while True:
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)]) rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS: if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
@@ -573,21 +747,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
return key, iv return key, iv
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port): async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
START_SEQ_NO = -2 START_SEQ_NO = -2
NONCE_LEN = 16 NONCE_LEN = 16
@@ -615,16 +774,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try: try:
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE) reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
set_keepalive(writer_tgt.get_extra_info("socket")) timeout=TG_CONNECT_TIMEOUT)
set_bufsizes(writer_tgt.get_extra_info("socket"))
except ConnectionRefusedError as E: except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port) print_err("Got connection refused while trying to connect to", addr, port)
return False return False
except OSError as E: except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", addr, port) print_err("Unable to connect to", addr, port)
return False return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO) writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
key_selector = PROXY_SECRET[:4] key_selector = PROXY_SECRET[:4]
@@ -639,7 +800,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(READ_BUF_SIZE) ans = await reader_tgt.read(TO_CLT_BUFSIZE)
if len(ans) != RPC_NONCE_ANS_LEN: if len(ans) != RPC_NONCE_ANS_LEN:
return False return False
@@ -720,12 +881,17 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt): async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE) set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
set_bufsizes(writer_clt.get_extra_info("socket")) set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
timeout=CLIENT_HANDSHAKE_TIMEOUT)
except asyncio.TimeoutError:
return
clt_data = await handle_handshake(reader_clt, writer_clt)
if not clt_data: if not clt_data:
writer_clt.transport.abort()
return return
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
@@ -742,7 +908,6 @@ async def handle_client(reader_clt, writer_clt):
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port) tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
if not tg_data: if not tg_data:
writer_clt.transport.abort()
return return
reader_tg, writer_tg = tg_data reader_tg, writer_tg = tg_data
@@ -766,14 +931,16 @@ async def handle_client(reader_clt, writer_clt):
elif proto_tag == PROTO_TAG_INTERMEDIATE: elif proto_tag == PROTO_TAG_INTERMEDIATE:
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt) reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt) writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
elif proto_tag == PROTO_TAG_SECURE:
reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt)
else: else:
return return
async def connect_reader_to_writer(rd, wr, user): async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
update_stats(user, curr_connects_x2=1)
try: try:
while True: while True:
data = await rd.read(READ_BUF_SIZE) data = await rd.read(rd_buf_size)
if isinstance(data, tuple): if isinstance(data, tuple):
data, extra = data data, extra = data
else: else:
@@ -782,27 +949,36 @@ async def handle_client(reader_clt, writer_clt):
if not data: if not data:
wr.write_eof() wr.write_eof()
await wr.drain() await wr.drain()
wr.close()
return return
else: else:
update_stats(user, octets=len(data)) update_stats(user, octets=len(data), msgs=1)
wr.write(data, extra) wr.write(data, extra)
await wr.drain() await wr.drain()
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e: except (OSError, asyncio.streams.IncompleteReadError) as e:
# print_err(e) # print_err(e)
pass pass
finally:
wr.transport.abort()
update_stats(user, curr_connects_x2=-1)
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user)) tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user)) clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
update_stats(user, curr_connects=1)
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
update_stats(user, curr_connects=-1)
task_tg_to_clt.cancel()
task_clt_to_tg.cancel()
writer_tg.transport.abort()
async def handle_client_wrapper(reader, writer): async def handle_client_wrapper(reader, writer):
try: try:
await handle_client(reader, writer) await handle_client(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError): except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort() writer.transport.abort()
@@ -813,9 +989,9 @@ async def stats_printer():
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S")) print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
for user, stat in stats.items(): for user, stat in stats.items():
print("%s: %d connects (%d current), %.2f MB" % ( print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
user, stat["connects"], stat["curr_connects_x2"] // 2, user, stat["connects"], stat["curr_connects"],
stat["octets"] / 1000000)) stat["octets"] / 1000000, stat["msgs"]))
print(flush=True) print(flush=True)
@@ -905,7 +1081,7 @@ def init_ip_info():
TIMEOUT = 5 TIMEOUT = 5
try: try:
with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f: with urllib.request.urlopen('http://ipv4.myexternalip.com/raw', timeout=TIMEOUT) as f:
if f.status != 200: if f.status != 200:
raise Exception("Invalid status code") raise Exception("Invalid status code")
my_ip_info["ipv4"] = f.read().decode().strip() my_ip_info["ipv4"] = f.read().decode().strip()
@@ -914,7 +1090,7 @@ def init_ip_info():
if PREFER_IPV6: if PREFER_IPV6:
try: try:
with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f: with urllib.request.urlopen('http://ipv6.myexternalip.com/raw', timeout=TIMEOUT) as f:
if f.status != 200: if f.status != 200:
raise Exception("Invalid status code") raise Exception("Invalid status code")
my_ip_info["ipv6"] = f.read().decode().strip() my_ip_info["ipv6"] = f.read().decode().strip()
@@ -939,7 +1115,12 @@ def print_tg_info():
for user, secret in sorted(USERS.items(), key=lambda x: x[0]): for user, secret in sorted(USERS.items(), key=lambda x: x[0]):
for ip in ip_addrs: for ip in ip_addrs:
params = {"server": ip, "port": PORT, "secret": secret} if not SECURE_ONLY:
params = {"server": ip, "port": PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':') params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True) print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
@@ -950,15 +1131,24 @@ def loop_exception_handler(loop, context):
if exception: if exception:
if isinstance(exception, TimeoutError): if isinstance(exception, TimeoutError):
if transport: if transport:
print_err("Timeout, killing transport")
transport.abort() transport.abort()
return return
if isinstance(exception, OSError): if isinstance(exception, OSError):
IGNORE_ERRNO = { IGNORE_ERRNO = {
10038 # operation on non-socket on Windows, likely because fd == -1 10038, # operation on non-socket on Windows, likely because fd == -1
121, # the semaphore timeout period has expired on Windows
}
FORCE_CLOSE_ERRNO = {
113, # no route to host
} }
if exception.errno in IGNORE_ERRNO: if exception.errno in IGNORE_ERRNO:
return return
elif exception.errno in FORCE_CLOSE_ERRNO:
if transport:
transport.abort()
return
loop.default_exception_handler(context) loop.default_exception_handler(context)
@@ -966,7 +1156,7 @@ def loop_exception_handler(loop, context):
def main(): def main():
init_stats() init_stats()
if sys.platform == 'win32': if sys.platform == "win32":
loop = asyncio.ProactorEventLoop() loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@@ -980,13 +1170,15 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task) asyncio.ensure_future(middle_proxy_updater_task)
task_v4 = asyncio.start_server(handle_client_wrapper, reuse_port = hasattr(socket, "SO_REUSEPORT")
'0.0.0.0', PORT, limit=READ_BUF_SIZE, loop=loop)
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4) server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6: if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
'::', PORT, limit=READ_BUF_SIZE, loop=loop) limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6) server_v6 = loop.run_until_complete(task_v6)
try: try: