66 Commits
v0.9 ... v1.0.5

Author SHA1 Message Date
Alexander Bersenev
eba7f9be69 protect from time skewing. The proxy protocol is very sensible to clock skew. If the skew is detected, disable advertising, making the connection directly to tg servers, instead of middle proxies 2019-05-12 01:42:20 +05:00
Alexander Bersenev
af8c102449 disable one fingerprinting protection by default because it causes trouble on some ios clinets 2019-05-09 03:29:53 +05:00
Alexander Bersenev
a01896522d changed the comment 2019-05-09 02:59:06 +05:00
Alexander Bersenev
6f70ff3003 adaptive buffer sizes 2019-05-09 02:51:36 +05:00
Alexander Bersenev
d48c177e36 comment out the message active fingerprinting - there is too many messages 2019-04-23 15:01:34 +05:00
Alexander Bersenev
f55ae68092 even more protect against replay-based fingerprinting 2019-04-20 15:02:13 +05:00
Alexander Bersenev
4cae6290b9 active fingerprinting detection and blocking 2019-04-20 04:44:11 +05:00
Alexander Bersenev
830d55fe77 fix ipv4 resolver url 2019-04-04 16:06:24 +05:00
Alexander Bersenev
66d9c03ff9 set secure mode on by default because most tg clients support this mode and many countries are able to detect proxies in non-secure mode 2019-03-10 23:02:27 +05:00
Alexander Bersenev
73592c4f72 change ip address resovers since the old one doesnt work anymore 2019-02-15 20:11:57 +05:00
Alexander Bersenev
b0cb48f684 ignore errors in setsockopt on old kernels 2018-12-30 14:44:28 +05:00
Alexander Bersenev
cb10355681 more verbose error messages on https failures 2018-12-30 14:25:17 +05:00
Alexander Bersenev
bd8e0f935d add some endlines 2018-11-27 22:25:47 +05:00
Alexander Bersenev
e2435461ca refactoring 2018-11-27 22:15:38 +05:00
Alexander Bersenev
47218748aa more reliable ip detection 2018-11-25 22:25:13 +05:00
Alexander Bersenev
b082d06f9b Merge branch 'master' into stable 2018-11-14 02:46:31 +05:00
Alexander Bersenev
5187725088 Revert "just for history: attempting to pretent cloudfare service"
This reverts commit dd1d0a6262.
2018-11-13 02:18:13 +05:00
Alexander Bersenev
dd1d0a6262 just for history: attempting to pretent cloudfare service 2018-11-13 02:18:04 +05:00
Alexander Bersenev
d5daf8bbdf add secure only mode example in config 2018-11-13 01:11:24 +05:00
Alexander Bersenev
780dbc5866 document all advanced options 2018-09-20 04:03:32 +05:00
Alexander Bersenev
298614b1f6 add an ability to specify listen address 2018-09-16 12:50:41 +05:00
Alexander Bersenev
f5c30c6115 secure only mode 2018-08-29 00:04:58 +05:00
Alexander Bersenev
e4473d6374 Merge branch 'master' into stable 2018-08-01 21:32:34 +05:00
Alexander Bersenev
c2278501bf change the ip obtaining service 2018-08-01 21:30:05 +05:00
Alexander Bersenev
534b26cd04 Merge branch 'master' into stable 2018-07-13 12:57:46 +05:00
Alexander Bersenev
c1bef68602 update alpine linux in the image 2018-07-10 19:22:04 +05:00
Alexander Bersenev
8e79dacd26 make the passive protocol detection harder 2018-07-10 15:48:39 +05:00
Alexander Bersenev
520a26aa89 fix typo 2018-07-08 23:52:57 +05:00
Alexander Bersenev
647b6f6edd add connect retrying 2018-07-08 19:05:45 +05:00
Alexander Bersenev
c2ad0de665 increase default buffer limit 2018-07-08 17:48:13 +05:00
Alexander Bersenev
47f7c088af Merge branch 'master' into stable 2018-07-05 16:12:56 +05:00
Alexander Bersenev
6f8bfdb568 add timeout error to errno 2018-07-05 15:45:53 +05:00
Alexander Bersenev
0a7e2d85b8 shrink timeouts, removed annoying message about timeouts 2018-07-04 13:54:27 +05:00
Alexander Bersenev
0caf5f89a8 count msgs 2018-07-02 02:28:43 +05:00
Alexander Bersenev
33fabe7590 ignore no route to host error 2018-07-02 00:47:35 +05:00
Alexander Bersenev
c0ed5e1e38 Merge branch 'master' of github.com:alexbers/mtprotoproxy 2018-07-01 16:45:34 +05:00
Alexander Bersenev
bcac5eb878 add sending timeout 2018-07-01 16:43:54 +05:00
Alexander Bersenev
b38084bf36 add information about Prometheus to readme 2018-07-01 01:40:30 +05:00
Alexander Bersenev
675d5a6aba send buffer size on the direct handshake also 2018-06-30 23:09:43 +05:00
Alexander Bersenev
b31768165c buffers redesign 2018-06-30 22:54:11 +05:00
Alexander Bersenev
372861ac6e support for secure mode 2018-06-29 18:51:47 +05:00
Alexander Bersenev
6a27096618 add secure tag 2018-06-29 17:52:37 +05:00
Alexander Bersenev
93f71f5ec2 Merge branch 'master' into stable 2018-06-29 12:48:58 +05:00
Alexander Bersenev
03f7ca1d4c more reliable logic to check reuseport availability 2018-06-29 02:00:46 +05:00
Alexander Bersenev
3477402c0d use cryptography module in docker file, do not copy pyaes 2018-06-29 01:07:16 +05:00
Alexander Bersenev
532021ab87 support for cryptography module and advise to use it 2018-06-28 20:47:12 +05:00
Alexander Bersenev
6900cdda43 Merge branch 'master' of github.com:alexbers/mtprotoproxy 2018-06-27 20:04:28 +05:00
Alexander Bersenev
ec1c6b4fb6 we need at least one undocumented launching way :) 2018-06-27 20:04:05 +05:00
Alexander Bersenev
63b689e3bf Add a section about advanced usage 2018-06-27 18:25:40 +05:00
Alexander Bersenev
71e3206b19 check if signal exists before placing it. It can absent in some OSes, like Windows 2018-06-27 13:33:51 +05:00
Alexander Bersenev
7eea7d3201 replace infinite loop with timeout with while loop, when the client is bad 2018-06-27 11:13:42 +05:00
Alexander Bersenev
2e86308e90 Revert "Revert "simplify dissconnect logic". The idea with task cancelation doesn't work"
This reverts commit 32d3bffc7b.
2018-06-27 11:11:50 +05:00
Alexander Bersenev
d74bb68f03 Revert "Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work"
This reverts commit b74079c433.
2018-06-27 11:11:45 +05:00
Alexander Bersenev
5f35b4ed0a add debugging signal 2018-06-27 01:14:44 +05:00
Alexander Bersenev
b74079c433 Revert "refactor task canceling a bit". The idea with the task cancelation doesn't work
This reverts commit 444a1876b6.
2018-06-27 01:05:08 +05:00
Alexander Bersenev
32d3bffc7b Revert "simplify dissconnect logic". The idea with task cancelation doesn't work
This reverts commit a20b1c9929.
2018-06-27 01:04:06 +05:00
Alexander Bersenev
a20b1c9929 simplify dissconnect logic 2018-06-26 22:53:46 +05:00
Alexander Bersenev
444a1876b6 refactor task canceling a bit 2018-06-26 20:39:43 +05:00
Alexander Bersenev
ed088d9449 revert the last commit 2018-06-26 20:21:51 +05:00
Alexander Bersenev
accba06b45 count client stats only for successfull clients 2018-06-26 20:17:52 +05:00
Alexander Bersenev
bd3d9731d7 if the handshake failed, just consume all the data 2018-06-26 11:48:58 +05:00
Alexander Bersenev
9077ceb471 simplify current connects counting 2018-06-26 03:38:11 +05:00
Alexander Bersenev
d2ff0f61e4 add handshake timeout, refactor client handling a bit 2018-06-26 03:24:45 +05:00
Alexander Bersenev
d56c995ee2 use uvloop if available 2018-06-22 15:26:33 +05:00
Alexander Bersenev
51c40903ab allows to bind on privilleged ports 2018-06-21 10:19:38 +05:00
Alexander Bersenev
e1d592cd84 enable port reuse on non-windows platforms 2018-06-19 21:51:02 +05:00
5 changed files with 502 additions and 162 deletions

View File

@@ -1,13 +1,13 @@
FROM alpine:3.6
FROM alpine:3.8
RUN adduser tgproxy -u 10000 -D
RUN apk add --no-cache python3 py3-crypto ca-certificates
RUN apk add --no-cache python3 py3-cryptography ca-certificates libcap
COPY mtprotoproxy.py config.py /home/tgproxy/
COPY pyaes/*.py /home/tgproxy/pyaes/
RUN chown -R tgproxy:tgproxy /home/tgproxy
RUN setcap cap_net_bind_service=+ep /usr/bin/python3.6
USER tgproxy

View File

@@ -16,4 +16,12 @@ To advertise a channel get a tag from **@MTProxybot** and write it to *config.py
## Performance ##
The proxy performance should be enough to comfortably serve about 4 000 simultaneous users on
the smallest VDS instance with 1 CPU core and 1024MB RAM.
the VDS instance with 1 CPU core and 1024MB RAM.
## Advanced Usage ##
The proxy can be launched:
- with a custom config: `python3 mtprotoproxy.py [configfile]`
- several times, clients will be automaticaly balanced between instances
- using *PyPy* interprteter
- with runtime statistics exported for [Prometheus](https://prometheus.io/): using [prometheus](https://github.com/alexbers/mtprotoproxy/tree/prometheus) branch

View File

@@ -6,5 +6,9 @@ USERS = {
"tg2": "0123456789abcdef0123456789abcdef"
}
# Makes the proxy harder to detect
# Can be incompatible with very old clients
SECURE_ONLY = True
# Tag for advertising, obtainable from @MTProxybot
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"

View File

@@ -3,5 +3,5 @@ services:
mtprotoproxy:
build: .
restart: unless-stopped
mem_limit: 1024m
network_mode: "host"
# mem_limit: 1024m

View File

@@ -6,78 +6,84 @@ import urllib.parse
import urllib.request
import collections
import time
import datetime
import hashlib
import random
import binascii
import sys
import re
import runpy
import signal
try:
from Crypto.Cipher import AES
from Crypto.Util import Counter
def create_aes_ctr(key, iv):
ctr = Counter.new(128, initial_value=iv)
return AES.new(key, AES.MODE_CTR, counter=ctr)
def create_aes_cbc(key, iv):
return AES.new(key, AES.MODE_CBC, iv)
except ImportError:
print("Failed to find pycryptodome or pycrypto, using slow AES implementation",
flush=True, file=sys.stderr)
import pyaes
def create_aes_ctr(key, iv):
ctr = pyaes.Counter(iv)
return pyaes.AESModeOfOperationCTR(key, ctr)
def create_aes_cbc(key, iv):
class EncryptorAdapter:
def __init__(self, mode):
self.mode = mode
def encrypt(self, data):
encrypter = pyaes.Encrypter(self.mode, pyaes.PADDING_NONE)
return encrypter.feed(data) + encrypter.feed()
def decrypt(self, data):
decrypter = pyaes.Decrypter(self.mode, pyaes.PADDING_NONE)
return decrypter.feed(data) + decrypter.feed()
mode = pyaes.AESModeOfOperationCBC(key, iv)
return EncryptorAdapter(mode)
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
if len(sys.argv) > 1:
if len(sys.argv) < 2:
config = runpy.run_module("config")
elif len(sys.argv) == 2:
# launch with own config
config = runpy.run_path(sys.argv[1])
else:
config = runpy.run_module("config")
# undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"]
USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings
# if IPv6 avaliable, use it by default
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# length of used handshake randoms for active fingerprinting protection
REPLAY_CHECK_LEN = config.get("REPLAY_CHECK_LEN", 32768)
# block short first packets to even more protect against replay-based fingerprinting
BLOCK_SHORT_FIRST_PKT = config.get("BLOCK_SHORT_FIRST_PKT", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 60*60*24)
READ_BUF_SIZE = config.get("READ_BUF_SIZE", 16384)
WRITE_BUF_SIZE = config.get("WRITE_BUF_SIZE", 65536)
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 60*30)
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# delay in seconds between middle proxy info updates
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
# delay in seconds between time getting, zero means disabled
GET_TIME_PERIOD = config.get("GET_TIME_PERIOD", 10*60)
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
# can be the tuple (low, users_margin, high) for the adaptive case. If no much users, use high
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", (16384, 100, 131072))
# max socket buffer size to the telegram servers direction, also can be the tuple
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
# drop client after this timeout if the handshake fail
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
# if client doesn't confirm data for this number of seconds, it is dropped
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
# telegram servers connect timeout in seconds
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
# listen address for IPv4
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443
@@ -108,7 +114,6 @@ TG_MIDDLE_PROXIES_V6 = {
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
}
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
PROXY_SECRET = bytes.fromhex(
@@ -128,6 +133,7 @@ DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
@@ -136,6 +142,125 @@ MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
used_handshakes = collections.OrderedDict()
def setup_files_limit():
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
def setup_debug():
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
def try_setup_uvloop():
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
def try_use_cryptography_module():
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def create_aes_ctr(key, iv):
class EncryptorAdapter:
def __init__(self, cipher):
self.encryptor = cipher.encryptor()
self.decryptor = cipher.decryptor()
def encrypt(self, data):
return self.encryptor.update(data)
def decrypt(self, data):
return self.decryptor.update(data)
iv_bytes = int.to_bytes(iv, 16, "big")
cipher = Cipher(algorithms.AES(key), modes.CTR(iv_bytes), default_backend())
return EncryptorAdapter(cipher)
def create_aes_cbc(key, iv):
class EncryptorAdapter:
def __init__(self, cipher):
self.encryptor = cipher.encryptor()
self.decryptor = cipher.decryptor()
def encrypt(self, data):
return self.encryptor.update(data)
def decrypt(self, data):
return self.decryptor.update(data)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), default_backend())
return EncryptorAdapter(cipher)
return create_aes_ctr, create_aes_cbc
def try_use_pycrypto_or_pycryptodome_module():
from Crypto.Cipher import AES
from Crypto.Util import Counter
def create_aes_ctr(key, iv):
ctr = Counter.new(128, initial_value=iv)
return AES.new(key, AES.MODE_CTR, counter=ctr)
def create_aes_cbc(key, iv):
return AES.new(key, AES.MODE_CBC, iv)
return create_aes_ctr, create_aes_cbc
def use_slow_bundled_cryptography_module():
import pyaes
msg = "To make the program a *lot* faster, please install cryptography module: "
msg += "pip install cryptography\n"
print(msg, flush=True, file=sys.stderr)
def create_aes_ctr(key, iv):
ctr = pyaes.Counter(iv)
return pyaes.AESModeOfOperationCTR(key, ctr)
def create_aes_cbc(key, iv):
class EncryptorAdapter:
def __init__(self, mode):
self.mode = mode
def encrypt(self, data):
encrypter = pyaes.Encrypter(self.mode, pyaes.PADDING_NONE)
return encrypter.feed(data) + encrypter.feed()
def decrypt(self, data):
decrypter = pyaes.Decrypter(self.mode, pyaes.PADDING_NONE)
return decrypter.feed(data) + decrypter.feed()
mode = pyaes.AESModeOfOperationCBC(key, iv)
return EncryptorAdapter(mode)
return create_aes_ctr, create_aes_cbc
try:
create_aes_ctr, create_aes_cbc = try_use_cryptography_module()
except ImportError:
try:
create_aes_ctr, create_aes_cbc = try_use_pycrypto_or_pycryptodome_module()
except ImportError:
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
def print_err(*params):
@@ -147,14 +272,39 @@ def init_stats():
stats = {user: collections.Counter() for user in USERS}
def update_stats(user, connects=0, curr_connects_x2=0, octets=0):
def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
global stats
if user not in stats:
stats[user] = collections.Counter()
stats[user].update(connects=connects, curr_connects_x2=curr_connects_x2,
octets=octets)
stats[user].update(connects=connects, curr_connects=curr_connects,
octets=octets, msgs=msgs)
def get_curr_connects_count():
global stats
all_connects = 0
for user, stat in stats.items():
all_connects += stat["curr_connects"]
return all_connects
def get_to_tg_bufsize():
if isinstance(TO_TG_BUFSIZE, int):
return TO_TG_BUFSIZE
low, margin, high = TO_TG_BUFSIZE
return high if get_curr_connects_count() < margin else low
def get_to_clt_bufsize():
if isinstance(TO_CLT_BUFSIZE, int):
return TO_CLT_BUFSIZE
low, margin, high = TO_CLT_BUFSIZE
return high if get_curr_connects_count() < margin else low
class LayeredStreamReaderBase:
@@ -353,7 +503,6 @@ class MTProtoIntermediateFrameStreamReader(LayeredStreamReaderBase):
msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len)
return data, extra
@@ -365,6 +514,38 @@ class MTProtoIntermediateFrameStreamWriter(LayeredStreamWriterBase):
return self.upstream.write(int.to_bytes(len(data), 4, 'little') + data)
class MTProtoSecureIntermediateFrameStreamReader(LayeredStreamReaderBase):
async def read(self, buf_size):
msg_len_bytes = await self.upstream.readexactly(4)
msg_len = int.from_bytes(msg_len_bytes, "little")
extra = {}
if msg_len > 0x80000000:
extra["QUICKACK_FLAG"] = True
msg_len -= 0x80000000
data = await self.upstream.readexactly(msg_len)
if msg_len % 4 != 0:
cut_border = msg_len - (msg_len % 4)
data = data[:cut_border]
return data, extra
class MTProtoSecureIntermediateFrameStreamWriter(LayeredStreamWriterBase):
def write(self, data, extra={}):
MAX_PADDING_LEN = 4
if extra.get("SIMPLE_ACK"):
# TODO: make this unpredictable
return self.upstream.write(data)
else:
padding_len = random.randrange(MAX_PADDING_LEN)
padding = bytearray([random.randrange(256) for i in range(padding_len)])
padded_data_len_bytes = int.to_bytes(len(data) + padding_len, 4, 'little')
return self.upstream.write(padded_data_len_bytes + data + padding)
class ProxyReqStreamReader(LayeredStreamReaderBase):
async def read(self, msg):
RPC_PROXY_ANS = b"\x0d\xda\x03\x44"
@@ -423,6 +604,7 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
FLAG_HAS_AD_TAG = 0x8
FLAG_MAGIC = 0x1000
FLAG_EXTMODE2 = 0x20000
FLAG_PAD = 0x8000000
FLAG_INTERMEDIATE = 0x20000000
FLAG_ABRIDGED = 0x40000000
FLAG_QUICKACK = 0x80000000
@@ -437,6 +619,8 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
flags |= FLAG_ABRIDGED
elif self.proto_tag == PROTO_TAG_INTERMEDIATE:
flags |= FLAG_INTERMEDIATE
elif self.proto_tag == PROTO_TAG_SECURE:
flags |= FLAG_INTERMEDIATE | FLAG_PAD
if extra.get("QUICKACK_FLAG"):
flags |= FLAG_QUICKACK
@@ -455,35 +639,101 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
async def handle_handshake(reader, writer):
global used_handshakes
EMPTY_READ_BUF_SIZE = 4096
handshake = await reader.readexactly(HANDSHAKE_LEN)
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
if dec_prekey_and_iv in used_handshakes:
ip = writer.get_extra_info('peername')[0]
print_err("Active fingerprinting detected from %s, freezing it" % ip)
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False
for user in USERS:
secret = bytes.fromhex(USERS[user])
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
dec_key = hashlib.sha256(dec_prekey + secret).digest()
decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big"))
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
enc_key = hashlib.sha256(enc_prekey + secret).digest()
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
decrypted = decryptor.decrypt(handshake)
proto_tag = decrypted[PROTO_TAG_POS:PROTO_TAG_POS+4]
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE):
if proto_tag not in (PROTO_TAG_ABRIDGED, PROTO_TAG_INTERMEDIATE, PROTO_TAG_SECURE):
continue
if SECURE_ONLY and proto_tag != PROTO_TAG_SECURE:
continue
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
while len(used_handshakes) >= REPLAY_CHECK_LEN:
used_handshakes.popitem(last=False)
used_handshakes[dec_prekey_and_iv] = True
reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False
def try_setsockopt(sock, level, option, value):
try:
sock.setsockopt(level, option, value)
except OSError as E:
pass
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_ack_timeout(sock, timeout):
if hasattr(socket, "TCP_USER_TIMEOUT"):
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
def set_bufsizes(sock, recv_buf, send_buf):
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
for attempt in range(max_attempts-1):
try:
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
except (OSError, asyncio.TimeoutError):
continue
# the last attempt
task = asyncio.open_connection(addr, port, limit=limit)
reader_tgt, writer_tgt = await asyncio.wait_for(task, timeout=timeout)
return reader_tgt, writer_tgt
async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
RESERVED_NONCE_FIRST_CHARS = [b"\xef"]
RESERVED_NONCE_BEGININGS = [b"\x48\x45\x41\x44", b"\x50\x4F\x53\x54",
@@ -502,15 +752,18 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
dc = TG_DATACENTERS_V4[dc_idx]
try:
reader_tgt, writer_tgt = await asyncio.open_connection(dc, TG_DATACENTER_PORT,
limit=READ_BUF_SIZE)
reader_tgt, writer_tgt = await open_connection_tryer(
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
except OSError as E:
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
while True:
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
if rnd[:1] in RESERVED_NONCE_FIRST_CHARS:
@@ -573,21 +826,6 @@ def get_middleproxy_aes_key_and_iv(nonce_srv, nonce_clt, clt_ts, srv_ip, clt_por
return key, iv
def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_bufsizes(sock, recv_buf=READ_BUF_SIZE, send_buf=WRITE_BUF_SIZE):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
START_SEQ_NO = -2
NONCE_LEN = 16
@@ -615,16 +853,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await asyncio.open_connection(addr, port, limit=READ_BUF_SIZE)
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"))
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
return False
except OSError as E:
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", addr, port)
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
key_selector = PROXY_SECRET[:4]
@@ -639,7 +879,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(READ_BUF_SIZE)
ans = await reader_tgt.read(get_to_clt_bufsize())
if len(ans) != RPC_NONCE_ANS_LEN:
return False
@@ -720,12 +960,18 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE)
set_bufsizes(writer_clt.get_extra_info("socket"))
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize())
cl_ip, cl_port = writer_clt.get_extra_info('peername')[:2]
try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
timeout=CLIENT_HANDSHAKE_TIMEOUT)
except asyncio.TimeoutError:
return
clt_data = await handle_handshake(reader_clt, writer_clt)
if not clt_data:
writer_clt.transport.abort()
return
reader_clt, writer_clt, proto_tag, user, dc_idx, enc_key_and_iv = clt_data
@@ -738,11 +984,9 @@ async def handle_client(reader_clt, writer_clt):
else:
tg_data = await do_direct_handshake(proto_tag, dc_idx)
else:
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
if not tg_data:
writer_clt.transport.abort()
return
reader_tg, writer_tg = tg_data
@@ -766,43 +1010,69 @@ async def handle_client(reader_clt, writer_clt):
elif proto_tag == PROTO_TAG_INTERMEDIATE:
reader_clt = MTProtoIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoIntermediateFrameStreamWriter(writer_clt)
elif proto_tag == PROTO_TAG_SECURE:
reader_clt = MTProtoSecureIntermediateFrameStreamReader(reader_clt)
writer_clt = MTProtoSecureIntermediateFrameStreamWriter(writer_clt)
else:
return
async def connect_reader_to_writer(rd, wr, user):
update_stats(user, curr_connects_x2=1)
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_short_first_pkt=False):
is_first_pkt = True
try:
while True:
data = await rd.read(READ_BUF_SIZE)
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
extra = {}
if is_first_pkt:
# protection against replay-based fingerprinting
MIN_FIRST_PKT_SIZE = 12
if block_short_first_pkt and 0 < len(data) < MIN_FIRST_PKT_SIZE:
# print_err("Active fingerprinting detected from %s, dropping it" % cl_ip)
# print_err("If this causes problems set BLOCK_SHORT_FIRST_PKT = False "
# "in the config")
wr.write_eof()
await wr.drain()
return
is_first_pkt = False
if not data:
wr.write_eof()
await wr.drain()
wr.close()
return
else:
update_stats(user, octets=len(data))
update_stats(user, octets=len(data), msgs=1)
wr.write(data, extra)
await wr.drain()
except (OSError, AttributeError, asyncio.streams.IncompleteReadError) as e:
except (OSError, asyncio.streams.IncompleteReadError) as e:
# print_err(e)
pass
finally:
wr.transport.abort()
update_stats(user, curr_connects_x2=-1)
asyncio.ensure_future(connect_reader_to_writer(reader_tg, writer_clt, user))
asyncio.ensure_future(connect_reader_to_writer(reader_clt, writer_tg, user))
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
block_short_first_pkt=BLOCK_SHORT_FIRST_PKT)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
update_stats(user, curr_connects=1)
await asyncio.wait([task_tg_to_clt, task_clt_to_tg], return_when=asyncio.FIRST_COMPLETED)
update_stats(user, curr_connects=-1)
task_tg_to_clt.cancel()
task_clt_to_tg.cancel()
writer_tg.transport.abort()
async def handle_client_wrapper(reader, writer):
try:
await handle_client(reader, writer)
except (asyncio.IncompleteReadError, ConnectionResetError, TimeoutError):
pass
finally:
writer.transport.abort()
@@ -813,40 +1083,69 @@ async def stats_printer():
print("Stats for", time.strftime("%d.%m.%Y %H:%M:%S"))
for user, stat in stats.items():
print("%s: %d connects (%d current), %.2f MB" % (
user, stat["connects"], stat["curr_connects_x2"] // 2,
stat["octets"] / 1000000))
print("%s: %d connects (%d current), %.2f MB, %d msgs" % (
user, stat["connects"], stat["curr_connects"],
stat["octets"] / 1000000, stat["msgs"]))
print(flush=True)
async def update_middle_proxy_info():
async def make_https_req(url):
# returns resp body
SSL_PORT = 443
url_data = urllib.parse.urlparse(url)
async def make_https_req(url, host="core.telegram.org"):
""" Make request, return resp body and headers. """
SSL_PORT = 443
url_data = urllib.parse.urlparse(url)
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org",
"Connection: close"]) + "\r\n\r\n"
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: %s",
"Connection: close"]) + "\r\n\r\n"
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % (urllib.parse.quote(url_data.path), host)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1)
return headers, body
async def get_srv_time():
global USE_MIDDLE_PROXY
TIME_SYNC_ADDR = "https://core.telegram.org/getProxySecret"
MAX_TIME_SKEW = 30
want_to_reenable_advertising = False
while True:
try:
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, secret = await make_https_req(TIME_SYNC_ADDR)
headers, body = data.split(b"\r\n\r\n", 1)
return body
except Exception:
return b""
for line in headers.split(b"\r\n"):
if not line.startswith(b"Date: "):
continue
line = line[len("Date: "):].decode()
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
now_time = datetime.datetime.utcnow()
time_diff = (now_time-srv_time).total_seconds()
if USE_MIDDLE_PROXY and abs(time_diff) > MAX_TIME_SKEW:
print_err("Time skew detected, please set the clock")
print_err("Server time:", srv_time, "your time:", now_time)
print_err("Disabling advertising to continue serving")
USE_MIDDLE_PROXY = False
want_to_reenable_advertising = True
elif want_to_reenable_advertising and abs(time_diff) <= MAX_TIME_SKEW:
print_err("Time is ok, reenabling advertising")
USE_MIDDLE_PROXY = True
want_to_reenable_advertising = False
except Exception as E:
print_err("Error getting server time", E)
await asyncio.sleep(GET_TIME_PERIOD)
async def update_middle_proxy_info():
async def get_new_proxies(url):
PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;")
ans = {}
try:
body = await make_https_req(url)
except Exception:
return ans
headers, body = await make_https_req(url)
fields = PROXY_REGEXP.findall(body.decode("utf8"))
if fields:
@@ -874,26 +1173,26 @@ async def update_middle_proxy_info():
if not v4_proxies:
raise Exception("no proxy data")
TG_MIDDLE_PROXIES_V4 = v4_proxies
except Exception:
print_err("Error updating middle proxy list")
except Exception as E:
print_err("Error updating middle proxy list:", E)
try:
v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6)
if not v6_proxies:
raise Exception("no proxy data (ipv6)")
TG_MIDDLE_PROXIES_V6 = v6_proxies
except Exception:
print_err("Error updating middle proxy list for IPv6")
except Exception as E:
print_err("Error updating middle proxy list for IPv6:", E)
try:
secret = await make_https_req(PROXY_SECRET_ADDR)
headers, secret = await make_https_req(PROXY_SECRET_ADDR)
if not secret:
raise Exception("no secret")
if secret != PROXY_SECRET:
PROXY_SECRET = secret
print_err("Middle proxy secret updated")
except Exception:
print_err("Error updating middle proxy secret, using old")
except Exception as E:
print_err("Error updating middle proxy secret, using old", E)
await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD)
@@ -902,26 +1201,31 @@ def init_ip_info():
global USE_MIDDLE_PROXY
global PREFER_IPV6
global my_ip_info
TIMEOUT = 5
try:
with urllib.request.urlopen('https://v4.ifconfig.co/ip', timeout=TIMEOUT) as f:
if f.status != 200:
raise Exception("Invalid status code")
my_ip_info["ipv4"] = f.read().decode().strip()
except Exception:
pass
if PREFER_IPV6:
def get_ip_from_url(url):
TIMEOUT = 5
try:
with urllib.request.urlopen('https://v6.ifconfig.co/ip', timeout=TIMEOUT) as f:
with urllib.request.urlopen(url, timeout=TIMEOUT) as f:
if f.status != 200:
raise Exception("Invalid status code")
my_ip_info["ipv6"] = f.read().decode().strip()
return f.read().decode().strip()
except Exception:
PREFER_IPV6 = False
else:
return None
IPV4_URL1 = "http://v4.ident.me/"
IPV4_URL2 = "http://ipv4.icanhazip.com/"
IPV6_URL1 = "http://v6.ident.me/"
IPV6_URL2 = "http://ipv6.icanhazip.com/"
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
if PREFER_IPV6:
if my_ip_info["ipv6"]:
print_err("IPv6 found, using it for external communication")
else:
PREFER_IPV6 = False
if USE_MIDDLE_PROXY:
if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or
@@ -939,7 +1243,12 @@ def print_tg_info():
for user, secret in sorted(USERS.items(), key=lambda x: x[0]):
for ip in ip_addrs:
params = {"server": ip, "port": PORT, "secret": secret}
if not SECURE_ONLY:
params = {"server": ip, "port": PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
params = {"server": ip, "port": PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=':')
print("{}: tg://proxy?{}".format(user, params_encodeded), flush=True)
@@ -950,23 +1259,36 @@ def loop_exception_handler(loop, context):
if exception:
if isinstance(exception, TimeoutError):
if transport:
print_err("Timeout, killing transport")
transport.abort()
return
if isinstance(exception, OSError):
IGNORE_ERRNO = {
10038 # operation on non-socket on Windows, likely because fd == -1
10038, # operation on non-socket on Windows, likely because fd == -1
121, # the semaphore timeout period has expired on Windows
}
FORCE_CLOSE_ERRNO = {
113, # no route to host
}
if exception.errno in IGNORE_ERRNO:
return
elif exception.errno in FORCE_CLOSE_ERRNO:
if transport:
transport.abort()
return
loop.default_exception_handler(context)
def main():
setup_files_limit()
setup_debug()
try_setup_uvloop()
init_stats()
if sys.platform == 'win32':
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
@@ -980,13 +1302,19 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
task_v4 = asyncio.start_server(handle_client_wrapper,
'0.0.0.0', PORT, limit=READ_BUF_SIZE, loop=loop)
if GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
asyncio.ensure_future(time_get_task)
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper,
'::', PORT, limit=READ_BUF_SIZE, loop=loop)
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6)
try: