15 Commits

Author SHA1 Message Date
Alexander Bersenev
eba7f9be69 protect from time skewing. The proxy protocol is very sensible to clock skew. If the skew is detected, disable advertising, making the connection directly to tg servers, instead of middle proxies 2019-05-12 01:42:20 +05:00
Alexander Bersenev
af8c102449 disable one fingerprinting protection by default because it causes trouble on some ios clinets 2019-05-09 03:29:53 +05:00
Alexander Bersenev
a01896522d changed the comment 2019-05-09 02:59:06 +05:00
Alexander Bersenev
6f70ff3003 adaptive buffer sizes 2019-05-09 02:51:36 +05:00
Alexander Bersenev
d48c177e36 comment out the message active fingerprinting - there is too many messages 2019-04-23 15:01:34 +05:00
Alexander Bersenev
f55ae68092 even more protect against replay-based fingerprinting 2019-04-20 15:02:13 +05:00
Alexander Bersenev
4cae6290b9 active fingerprinting detection and blocking 2019-04-20 04:44:11 +05:00
Alexander Bersenev
830d55fe77 fix ipv4 resolver url 2019-04-04 16:06:24 +05:00
Alexander Bersenev
66d9c03ff9 set secure mode on by default because most tg clients support this mode and many countries are able to detect proxies in non-secure mode 2019-03-10 23:02:27 +05:00
Alexander Bersenev
73592c4f72 change ip address resovers since the old one doesnt work anymore 2019-02-15 20:11:57 +05:00
Alexander Bersenev
b0cb48f684 ignore errors in setsockopt on old kernels 2018-12-30 14:44:28 +05:00
Alexander Bersenev
cb10355681 more verbose error messages on https failures 2018-12-30 14:25:17 +05:00
Alexander Bersenev
bd8e0f935d add some endlines 2018-11-27 22:25:47 +05:00
Alexander Bersenev
e2435461ca refactoring 2018-11-27 22:15:38 +05:00
Alexander Bersenev
47218748aa more reliable ip detection 2018-11-25 22:25:13 +05:00
2 changed files with 330 additions and 194 deletions

View File

@@ -6,9 +6,9 @@ USERS = {
"tg2": "0123456789abcdef0123456789abcdef" "tg2": "0123456789abcdef0123456789abcdef"
} }
# Makes the proxy harder to detect
# Can be incompatible with very old clients
SECURE_ONLY = True
# Tag for advertising, obtainable from @MTProxybot # Tag for advertising, obtainable from @MTProxybot
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d" # AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
# Uncommenting this do make a proxy harder to detect
# But it can be incompatible with old clients
# SECURE_ONLY = True

View File

@@ -6,6 +6,7 @@ import urllib.parse
import urllib.request import urllib.request
import collections import collections
import time import time
import datetime
import hashlib import hashlib
import random import random
import binascii import binascii
@@ -14,11 +15,162 @@ import re
import runpy import runpy
import signal import signal
try: if len(sys.argv) < 2:
import uvloop config = runpy.run_module("config")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) elif len(sys.argv) == 2:
except ImportError: # launch with own config
pass config = runpy.run_path(sys.argv[1])
else:
# undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"]
USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings
# if IPv6 avaliable, use it by default
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# length of used handshake randoms for active fingerprinting protection
REPLAY_CHECK_LEN = config.get("REPLAY_CHECK_LEN", 32768)
# block short first packets to even more protect against replay-based fingerprinting
BLOCK_SHORT_FIRST_PKT = config.get("BLOCK_SHORT_FIRST_PKT", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
# delay in seconds between middle proxy info updates
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
# delay in seconds between time getting, zero means disabled
GET_TIME_PERIOD = config.get("GET_TIME_PERIOD", 10*60)
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
# can be the tuple (low, users_margin, high) for the adaptive case. If no much users, use high
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", (16384, 100, 131072))
# max socket buffer size to the telegram servers direction, also can be the tuple
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
# drop client after this timeout if the handshake fail
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
# if client doesn't confirm data for this number of seconds, it is dropped
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
# telegram servers connect timeout in seconds
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
# listen address for IPv4
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443
TG_DATACENTERS_V4 = [
"149.154.175.50", "149.154.167.51", "149.154.175.100",
"149.154.167.91", "149.154.171.5"
]
TG_DATACENTERS_V6 = [
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
]
# This list will be updated in the runtime
TG_MIDDLE_PROXIES_V4 = {
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
}
TG_MIDDLE_PROXIES_V6 = {
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
}
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
PROXY_SECRET = bytes.fromhex(
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
)
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
HANDSHAKE_LEN = 64
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
used_handshakes = collections.OrderedDict()
def setup_files_limit():
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
def setup_debug():
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
def try_setup_uvloop():
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass
def try_use_cryptography_module(): def try_use_cryptography_module():
@@ -110,126 +262,6 @@ except ImportError:
except ImportError: except ImportError:
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module() create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
try:
import resource
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
except (ValueError, OSError):
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
except ImportError:
pass
if hasattr(signal, 'SIGUSR1'):
def debug_signal(signum, frame):
import pdb
pdb.set_trace()
signal.signal(signal.SIGUSR1, debug_signal)
if len(sys.argv) < 2:
config = runpy.run_module("config")
elif len(sys.argv) == 2:
config = runpy.run_path(sys.argv[1])
else:
# undocumented way of launching
config = {}
config["PORT"] = int(sys.argv[1])
secrets = sys.argv[2].split(",")
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
if len(sys.argv) > 3:
config["AD_TAG"] = sys.argv[3]
PORT = config["PORT"]
USERS = config["USERS"]
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
# load advanced settings
# if IPv6 avaliable, use it by default
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
# disables tg->client trafic reencryption, faster but less secure
FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
# delay in seconds between middle proxy info updates
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
# max socket buffer size to the telegram servers direction
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
# drop client after this timeout if the handshake fail
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
# if client doesn't confirm data for this number of seconds, it is dropped
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
# telegram servers connect timeout in seconds
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
# listen address for IPv4
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443
TG_DATACENTERS_V4 = [
"149.154.175.50", "149.154.167.51", "149.154.175.100",
"149.154.167.91", "149.154.171.5"
]
TG_DATACENTERS_V6 = [
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
]
# This list will be updated in the runtime
TG_MIDDLE_PROXIES_V4 = {
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
}
TG_MIDDLE_PROXIES_V6 = {
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
}
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
PROXY_SECRET = bytes.fromhex(
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
)
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
HANDSHAKE_LEN = 64
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
CBC_PADDING = 16
PADDING_FILLER = b"\x04\x00\x00\x00"
MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
def print_err(*params): def print_err(*params):
print(*params, file=sys.stderr, flush=True) print(*params, file=sys.stderr, flush=True)
@@ -250,6 +282,31 @@ def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
octets=octets, msgs=msgs) octets=octets, msgs=msgs)
def get_curr_connects_count():
global stats
all_connects = 0
for user, stat in stats.items():
all_connects += stat["curr_connects"]
return all_connects
def get_to_tg_bufsize():
if isinstance(TO_TG_BUFSIZE, int):
return TO_TG_BUFSIZE
low, margin, high = TO_TG_BUFSIZE
return high if get_curr_connects_count() < margin else low
def get_to_clt_bufsize():
if isinstance(TO_CLT_BUFSIZE, int):
return TO_CLT_BUFSIZE
low, margin, high = TO_CLT_BUFSIZE
return high if get_curr_connects_count() < margin else low
class LayeredStreamReaderBase: class LayeredStreamReaderBase:
def __init__(self, upstream): def __init__(self, upstream):
self.upstream = upstream self.upstream = upstream
@@ -582,18 +639,30 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
async def handle_handshake(reader, writer): async def handle_handshake(reader, writer):
global used_handshakes
EMPTY_READ_BUF_SIZE = 4096
handshake = await reader.readexactly(HANDSHAKE_LEN) handshake = await reader.readexactly(HANDSHAKE_LEN)
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
if dec_prekey_and_iv in used_handshakes:
ip = writer.get_extra_info('peername')[0]
print_err("Active fingerprinting detected from %s, freezing it" % ip)
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False
for user in USERS: for user in USERS:
secret = bytes.fromhex(USERS[user]) secret = bytes.fromhex(USERS[user])
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
dec_key = hashlib.sha256(dec_prekey + secret).digest() dec_key = hashlib.sha256(dec_prekey + secret).digest()
decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big")) decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big"))
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
enc_key = hashlib.sha256(enc_prekey + secret).digest() enc_key = hashlib.sha256(enc_prekey + secret).digest()
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big")) encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
@@ -608,11 +677,14 @@ async def handle_handshake(reader, writer):
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True) dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
while len(used_handshakes) >= REPLAY_CHECK_LEN:
used_handshakes.popitem(last=False)
used_handshakes[dec_prekey_and_iv] = True
reader = CryptoWrappedStreamReader(reader, decryptor) reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor) writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
EMPTY_READ_BUF_SIZE = 4096
while await reader.read(EMPTY_READ_BUF_SIZE): while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data # just consume all the data
pass pass
@@ -620,24 +692,31 @@ async def handle_handshake(reader, writer):
return False return False
def try_setsockopt(sock, level, option, value):
try:
sock.setsockopt(level, option, value)
except OSError as E:
pass
def set_keepalive(sock, interval=40, attempts=5): def set_keepalive(sock, interval=40, attempts=5):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, "TCP_KEEPIDLE"): if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval) try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
if hasattr(socket, "TCP_KEEPINTVL"): if hasattr(socket, "TCP_KEEPINTVL"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval) try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, "TCP_KEEPCNT"): if hasattr(socket, "TCP_KEEPCNT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts) try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
def set_ack_timeout(sock, timeout): def set_ack_timeout(sock, timeout):
if hasattr(socket, "TCP_USER_TIMEOUT"): if hasattr(socket, "TCP_USER_TIMEOUT"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000) try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
def set_bufsizes(sock, recv_buf, send_buf): def set_bufsizes(sock, recv_buf, send_buf):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf) try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf) try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3): async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
@@ -674,7 +753,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
try: try:
reader_tgt, writer_tgt = await open_connection_tryer( reader_tgt, writer_tgt = await open_connection_tryer(
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT) dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E: except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT) print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False return False
@@ -683,7 +762,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
return False return False
set_keepalive(writer_tgt.get_extra_info("socket")) set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
while True: while True:
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)]) rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
@@ -774,7 +853,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx]) addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try: try:
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE, reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
timeout=TG_CONNECT_TIMEOUT) timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E: except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port) print_err("Got connection refused while trying to connect to", addr, port)
@@ -784,7 +863,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
return False return False
set_keepalive(writer_tgt.get_extra_info("socket")) set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE) set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO) writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
@@ -800,7 +879,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO) reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(TO_CLT_BUFSIZE) ans = await reader_tgt.read(get_to_clt_bufsize())
if len(ans) != RPC_NONCE_ANS_LEN: if len(ans) != RPC_NONCE_ANS_LEN:
return False return False
@@ -883,8 +962,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt): async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3) set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT) set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE) set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize())
cl_ip, cl_port = writer_clt.get_extra_info('peername')[:2]
try: try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt), clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
timeout=CLIENT_HANDSHAKE_TIMEOUT) timeout=CLIENT_HANDSHAKE_TIMEOUT)
@@ -904,7 +984,6 @@ async def handle_client(reader_clt, writer_clt):
else: else:
tg_data = await do_direct_handshake(proto_tag, dc_idx) tg_data = await do_direct_handshake(proto_tag, dc_idx)
else: else:
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port) tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
if not tg_data: if not tg_data:
@@ -937,7 +1016,8 @@ async def handle_client(reader_clt, writer_clt):
else: else:
return return
async def connect_reader_to_writer(rd, wr, user, rd_buf_size): async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_short_first_pkt=False):
is_first_pkt = True
try: try:
while True: while True:
data = await rd.read(rd_buf_size) data = await rd.read(rd_buf_size)
@@ -946,6 +1026,19 @@ async def handle_client(reader_clt, writer_clt):
else: else:
extra = {} extra = {}
if is_first_pkt:
# protection against replay-based fingerprinting
MIN_FIRST_PKT_SIZE = 12
if block_short_first_pkt and 0 < len(data) < MIN_FIRST_PKT_SIZE:
# print_err("Active fingerprinting detected from %s, dropping it" % cl_ip)
# print_err("If this causes problems set BLOCK_SHORT_FIRST_PKT = False "
# "in the config")
wr.write_eof()
await wr.drain()
return
is_first_pkt = False
if not data: if not data:
wr.write_eof() wr.write_eof()
await wr.drain() await wr.drain()
@@ -958,8 +1051,9 @@ async def handle_client(reader_clt, writer_clt):
# print_err(e) # print_err(e)
pass pass
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE) tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE) block_short_first_pkt=BLOCK_SHORT_FIRST_PKT)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
task_tg_to_clt = asyncio.ensure_future(tg_to_clt) task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg) task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
@@ -995,34 +1089,63 @@ async def stats_printer():
print(flush=True) print(flush=True)
async def update_middle_proxy_info(): async def make_https_req(url, host="core.telegram.org"):
async def make_https_req(url): """ Make request, return resp body and headers. """
# returns resp body SSL_PORT = 443
SSL_PORT = 443 url_data = urllib.parse.urlparse(url)
url_data = urllib.parse.urlparse(url)
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org", HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: %s",
"Connection: close"]) + "\r\n\r\n" "Connection: close"]) + "\r\n\r\n"
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % (urllib.parse.quote(url_data.path), host)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1)
return headers, body
async def get_srv_time():
global USE_MIDDLE_PROXY
TIME_SYNC_ADDR = "https://core.telegram.org/getProxySecret"
MAX_TIME_SKEW = 30
want_to_reenable_advertising = False
while True:
try: try:
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True) headers, secret = await make_https_req(TIME_SYNC_ADDR)
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1) for line in headers.split(b"\r\n"):
return body if not line.startswith(b"Date: "):
except Exception: continue
return b"" line = line[len("Date: "):].decode()
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
now_time = datetime.datetime.utcnow()
time_diff = (now_time-srv_time).total_seconds()
if USE_MIDDLE_PROXY and abs(time_diff) > MAX_TIME_SKEW:
print_err("Time skew detected, please set the clock")
print_err("Server time:", srv_time, "your time:", now_time)
print_err("Disabling advertising to continue serving")
USE_MIDDLE_PROXY = False
want_to_reenable_advertising = True
elif want_to_reenable_advertising and abs(time_diff) <= MAX_TIME_SKEW:
print_err("Time is ok, reenabling advertising")
USE_MIDDLE_PROXY = True
want_to_reenable_advertising = False
except Exception as E:
print_err("Error getting server time", E)
await asyncio.sleep(GET_TIME_PERIOD)
async def update_middle_proxy_info():
async def get_new_proxies(url): async def get_new_proxies(url):
PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;") PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;")
ans = {} ans = {}
try: headers, body = await make_https_req(url)
body = await make_https_req(url)
except Exception:
return ans
fields = PROXY_REGEXP.findall(body.decode("utf8")) fields = PROXY_REGEXP.findall(body.decode("utf8"))
if fields: if fields:
@@ -1050,26 +1173,26 @@ async def update_middle_proxy_info():
if not v4_proxies: if not v4_proxies:
raise Exception("no proxy data") raise Exception("no proxy data")
TG_MIDDLE_PROXIES_V4 = v4_proxies TG_MIDDLE_PROXIES_V4 = v4_proxies
except Exception: except Exception as E:
print_err("Error updating middle proxy list") print_err("Error updating middle proxy list:", E)
try: try:
v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6) v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6)
if not v6_proxies: if not v6_proxies:
raise Exception("no proxy data (ipv6)") raise Exception("no proxy data (ipv6)")
TG_MIDDLE_PROXIES_V6 = v6_proxies TG_MIDDLE_PROXIES_V6 = v6_proxies
except Exception: except Exception as E:
print_err("Error updating middle proxy list for IPv6") print_err("Error updating middle proxy list for IPv6:", E)
try: try:
secret = await make_https_req(PROXY_SECRET_ADDR) headers, secret = await make_https_req(PROXY_SECRET_ADDR)
if not secret: if not secret:
raise Exception("no secret") raise Exception("no secret")
if secret != PROXY_SECRET: if secret != PROXY_SECRET:
PROXY_SECRET = secret PROXY_SECRET = secret
print_err("Middle proxy secret updated") print_err("Middle proxy secret updated")
except Exception: except Exception as E:
print_err("Error updating middle proxy secret, using old") print_err("Error updating middle proxy secret, using old", E)
await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD) await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD)
@@ -1078,26 +1201,31 @@ def init_ip_info():
global USE_MIDDLE_PROXY global USE_MIDDLE_PROXY
global PREFER_IPV6 global PREFER_IPV6
global my_ip_info global my_ip_info
TIMEOUT = 5
try: def get_ip_from_url(url):
with urllib.request.urlopen('http://ipv4.myexternalip.com/raw', timeout=TIMEOUT) as f: TIMEOUT = 5
if f.status != 200:
raise Exception("Invalid status code")
my_ip_info["ipv4"] = f.read().decode().strip()
except Exception:
pass
if PREFER_IPV6:
try: try:
with urllib.request.urlopen('http://ipv6.myexternalip.com/raw', timeout=TIMEOUT) as f: with urllib.request.urlopen(url, timeout=TIMEOUT) as f:
if f.status != 200: if f.status != 200:
raise Exception("Invalid status code") raise Exception("Invalid status code")
my_ip_info["ipv6"] = f.read().decode().strip() return f.read().decode().strip()
except Exception: except Exception:
PREFER_IPV6 = False return None
else:
IPV4_URL1 = "http://v4.ident.me/"
IPV4_URL2 = "http://ipv4.icanhazip.com/"
IPV6_URL1 = "http://v6.ident.me/"
IPV6_URL2 = "http://ipv6.icanhazip.com/"
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
if PREFER_IPV6:
if my_ip_info["ipv6"]:
print_err("IPv6 found, using it for external communication") print_err("IPv6 found, using it for external communication")
else:
PREFER_IPV6 = False
if USE_MIDDLE_PROXY: if USE_MIDDLE_PROXY:
if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or
@@ -1154,6 +1282,10 @@ def loop_exception_handler(loop, context):
def main(): def main():
setup_files_limit()
setup_debug()
try_setup_uvloop()
init_stats() init_stats()
if sys.platform == "win32": if sys.platform == "win32":
@@ -1170,15 +1302,19 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info()) middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task) asyncio.ensure_future(middle_proxy_updater_task)
if GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
asyncio.ensure_future(time_get_task)
reuse_port = hasattr(socket, "SO_REUSEPORT") reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT, task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4) server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6: if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT, task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop) limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6) server_v6 = loop.run_until_complete(task_v6)
try: try: