mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eba7f9be69 | ||
|
|
af8c102449 | ||
|
|
a01896522d | ||
|
|
6f70ff3003 | ||
|
|
d48c177e36 | ||
|
|
f55ae68092 | ||
|
|
4cae6290b9 | ||
|
|
830d55fe77 | ||
|
|
66d9c03ff9 | ||
|
|
73592c4f72 | ||
|
|
b0cb48f684 | ||
|
|
cb10355681 | ||
|
|
bd8e0f935d | ||
|
|
e2435461ca | ||
|
|
47218748aa |
@@ -6,9 +6,9 @@ USERS = {
|
||||
"tg2": "0123456789abcdef0123456789abcdef"
|
||||
}
|
||||
|
||||
# Makes the proxy harder to detect
|
||||
# Can be incompatible with very old clients
|
||||
SECURE_ONLY = True
|
||||
|
||||
# Tag for advertising, obtainable from @MTProxybot
|
||||
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
|
||||
|
||||
# Uncommenting this do make a proxy harder to detect
|
||||
# But it can be incompatible with old clients
|
||||
# SECURE_ONLY = True
|
||||
|
||||
516
mtprotoproxy.py
516
mtprotoproxy.py
@@ -6,6 +6,7 @@ import urllib.parse
|
||||
import urllib.request
|
||||
import collections
|
||||
import time
|
||||
import datetime
|
||||
import hashlib
|
||||
import random
|
||||
import binascii
|
||||
@@ -14,11 +15,162 @@ import re
|
||||
import runpy
|
||||
import signal
|
||||
|
||||
try:
|
||||
import uvloop
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
if len(sys.argv) < 2:
|
||||
config = runpy.run_module("config")
|
||||
elif len(sys.argv) == 2:
|
||||
# launch with own config
|
||||
config = runpy.run_path(sys.argv[1])
|
||||
else:
|
||||
# undocumented way of launching
|
||||
config = {}
|
||||
config["PORT"] = int(sys.argv[1])
|
||||
secrets = sys.argv[2].split(",")
|
||||
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
||||
if len(sys.argv) > 3:
|
||||
config["AD_TAG"] = sys.argv[3]
|
||||
|
||||
PORT = config["PORT"]
|
||||
USERS = config["USERS"]
|
||||
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||
|
||||
# load advanced settings
|
||||
|
||||
# if IPv6 avaliable, use it by default
|
||||
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
FAST_MODE = config.get("FAST_MODE", True)
|
||||
|
||||
# doesn't allow to connect in not-secure mode
|
||||
SECURE_ONLY = config.get("SECURE_ONLY", False)
|
||||
|
||||
# length of used handshake randoms for active fingerprinting protection
|
||||
REPLAY_CHECK_LEN = config.get("REPLAY_CHECK_LEN", 32768)
|
||||
|
||||
# block short first packets to even more protect against replay-based fingerprinting
|
||||
BLOCK_SHORT_FIRST_PKT = config.get("BLOCK_SHORT_FIRST_PKT", False)
|
||||
|
||||
# delay in seconds between stats printing
|
||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||
|
||||
# delay in seconds between middle proxy info updates
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||
|
||||
# delay in seconds between time getting, zero means disabled
|
||||
GET_TIME_PERIOD = config.get("GET_TIME_PERIOD", 10*60)
|
||||
|
||||
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
|
||||
# can be the tuple (low, users_margin, high) for the adaptive case. If no much users, use high
|
||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", (16384, 100, 131072))
|
||||
|
||||
# max socket buffer size to the telegram servers direction, also can be the tuple
|
||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||
|
||||
# keepalive period for clients in secs
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||
|
||||
# drop client after this timeout if the handshake fail
|
||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||
|
||||
# if client doesn't confirm data for this number of seconds, it is dropped
|
||||
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||
|
||||
# telegram servers connect timeout in seconds
|
||||
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
|
||||
|
||||
# listen address for IPv4
|
||||
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
|
||||
|
||||
# listen address for IPv6
|
||||
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
|
||||
|
||||
|
||||
TG_DATACENTER_PORT = 443
|
||||
|
||||
TG_DATACENTERS_V4 = [
|
||||
"149.154.175.50", "149.154.167.51", "149.154.175.100",
|
||||
"149.154.167.91", "149.154.171.5"
|
||||
]
|
||||
|
||||
TG_DATACENTERS_V6 = [
|
||||
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
|
||||
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
|
||||
]
|
||||
|
||||
# This list will be updated in the runtime
|
||||
TG_MIDDLE_PROXIES_V4 = {
|
||||
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
|
||||
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
|
||||
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
|
||||
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
|
||||
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
|
||||
}
|
||||
|
||||
TG_MIDDLE_PROXIES_V6 = {
|
||||
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
|
||||
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
|
||||
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
|
||||
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
|
||||
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
|
||||
}
|
||||
|
||||
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
|
||||
|
||||
PROXY_SECRET = bytes.fromhex(
|
||||
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
|
||||
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
|
||||
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
|
||||
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
|
||||
)
|
||||
|
||||
SKIP_LEN = 8
|
||||
PREKEY_LEN = 32
|
||||
KEY_LEN = 32
|
||||
IV_LEN = 16
|
||||
HANDSHAKE_LEN = 64
|
||||
PROTO_TAG_POS = 56
|
||||
DC_IDX_POS = 60
|
||||
|
||||
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
||||
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
||||
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
||||
|
||||
CBC_PADDING = 16
|
||||
PADDING_FILLER = b"\x04\x00\x00\x00"
|
||||
|
||||
MIN_MSG_LEN = 12
|
||||
MAX_MSG_LEN = 2 ** 24
|
||||
|
||||
my_ip_info = {"ipv4": None, "ipv6": None}
|
||||
used_handshakes = collections.OrderedDict()
|
||||
|
||||
|
||||
def setup_files_limit():
|
||||
try:
|
||||
import resource
|
||||
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
||||
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
|
||||
except (ValueError, OSError):
|
||||
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def setup_debug():
|
||||
if hasattr(signal, 'SIGUSR1'):
|
||||
def debug_signal(signum, frame):
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
|
||||
signal.signal(signal.SIGUSR1, debug_signal)
|
||||
|
||||
|
||||
def try_setup_uvloop():
|
||||
try:
|
||||
import uvloop
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def try_use_cryptography_module():
|
||||
@@ -110,126 +262,6 @@ except ImportError:
|
||||
except ImportError:
|
||||
create_aes_ctr, create_aes_cbc = use_slow_bundled_cryptography_module()
|
||||
|
||||
try:
|
||||
import resource
|
||||
soft_fd_limit, hard_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
||||
resource.setrlimit(resource.RLIMIT_NOFILE, (hard_fd_limit, hard_fd_limit))
|
||||
except (ValueError, OSError):
|
||||
print("Failed to increase the limit of opened files", flush=True, file=sys.stderr)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
if hasattr(signal, 'SIGUSR1'):
|
||||
def debug_signal(signum, frame):
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
|
||||
signal.signal(signal.SIGUSR1, debug_signal)
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
config = runpy.run_module("config")
|
||||
elif len(sys.argv) == 2:
|
||||
config = runpy.run_path(sys.argv[1])
|
||||
else:
|
||||
# undocumented way of launching
|
||||
config = {}
|
||||
config["PORT"] = int(sys.argv[1])
|
||||
secrets = sys.argv[2].split(",")
|
||||
config["USERS"] = {"user%d" % i: secrets[i].zfill(32) for i in range(len(secrets))}
|
||||
if len(sys.argv) > 3:
|
||||
config["AD_TAG"] = sys.argv[3]
|
||||
|
||||
PORT = config["PORT"]
|
||||
USERS = config["USERS"]
|
||||
AD_TAG = bytes.fromhex(config.get("AD_TAG", ""))
|
||||
|
||||
# load advanced settings
|
||||
# if IPv6 avaliable, use it by default
|
||||
PREFER_IPV6 = config.get("PREFER_IPV6", socket.has_ipv6)
|
||||
# disables tg->client trafic reencryption, faster but less secure
|
||||
FAST_MODE = config.get("FAST_MODE", True)
|
||||
# doesn't allow to connect in not-secure mode
|
||||
SECURE_ONLY = config.get("SECURE_ONLY", False)
|
||||
# delay in seconds between stats printing
|
||||
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
|
||||
# delay in seconds between middle proxy info updates
|
||||
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
|
||||
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
|
||||
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
|
||||
# max socket buffer size to the telegram servers direction
|
||||
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
|
||||
# keepalive period for clients in secs
|
||||
CLIENT_KEEPALIVE = config.get("CLIENT_KEEPALIVE", 10*60)
|
||||
# drop client after this timeout if the handshake fail
|
||||
CLIENT_HANDSHAKE_TIMEOUT = config.get("CLIENT_HANDSHAKE_TIMEOUT", 10)
|
||||
# if client doesn't confirm data for this number of seconds, it is dropped
|
||||
CLIENT_ACK_TIMEOUT = config.get("CLIENT_ACK_TIMEOUT", 5*60)
|
||||
# telegram servers connect timeout in seconds
|
||||
TG_CONNECT_TIMEOUT = config.get("TG_CONNECT_TIMEOUT", 10)
|
||||
# listen address for IPv4
|
||||
LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
|
||||
# listen address for IPv6
|
||||
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
|
||||
|
||||
TG_DATACENTER_PORT = 443
|
||||
|
||||
TG_DATACENTERS_V4 = [
|
||||
"149.154.175.50", "149.154.167.51", "149.154.175.100",
|
||||
"149.154.167.91", "149.154.171.5"
|
||||
]
|
||||
|
||||
TG_DATACENTERS_V6 = [
|
||||
"2001:b28:f23d:f001::a", "2001:67c:04e8:f002::a", "2001:b28:f23d:f003::a",
|
||||
"2001:67c:04e8:f004::a", "2001:b28:f23f:f005::a"
|
||||
]
|
||||
|
||||
# This list will be updated in the runtime
|
||||
TG_MIDDLE_PROXIES_V4 = {
|
||||
1: [("149.154.175.50", 8888)], -1: [("149.154.175.50", 8888)],
|
||||
2: [("149.154.162.38", 80)], -2: [("149.154.162.38", 80)],
|
||||
3: [("149.154.175.100", 8888)], -3: [("149.154.175.100", 8888)],
|
||||
4: [("91.108.4.136", 8888)], -4: [("91.108.4.136", 8888)],
|
||||
5: [("91.108.56.181", 8888)], -5: [("91.108.56.181", 8888)]
|
||||
}
|
||||
|
||||
TG_MIDDLE_PROXIES_V6 = {
|
||||
1: [("2001:b28:f23d:f001::d", 8888)], -1: [("2001:b28:f23d:f001::d", 8888)],
|
||||
2: [("2001:67c:04e8:f002::d", 80)], -2: [("2001:67c:04e8:f002::d", 80)],
|
||||
3: [("2001:b28:f23d:f003::d", 8888)], -3: [("2001:b28:f23d:f003::d", 8888)],
|
||||
4: [("2001:67c:04e8:f004::d", 8888)], -4: [("2001:67c:04e8:f004::d", 8888)],
|
||||
5: [("2001:b28:f23f:f005::d", 8888)], -5: [("2001:67c:04e8:f004::d", 8888)]
|
||||
}
|
||||
|
||||
|
||||
USE_MIDDLE_PROXY = (len(AD_TAG) == 16)
|
||||
|
||||
PROXY_SECRET = bytes.fromhex(
|
||||
"c4f9faca9678e6bb48ad6c7e2ce5c0d24430645d554addeb55419e034da62721" +
|
||||
"d046eaab6e52ab14a95a443ecfb3463e79a05a66612adf9caeda8be9a80da698" +
|
||||
"6fb0a6ff387af84d88ef3a6413713e5c3377f6e1a3d47d99f5e0c56eece8f05c" +
|
||||
"54c490b079e31bef82ff0ee8f2b0a32756d249c5f21269816cb7061b265db212"
|
||||
)
|
||||
|
||||
SKIP_LEN = 8
|
||||
PREKEY_LEN = 32
|
||||
KEY_LEN = 32
|
||||
IV_LEN = 16
|
||||
HANDSHAKE_LEN = 64
|
||||
PROTO_TAG_POS = 56
|
||||
DC_IDX_POS = 60
|
||||
|
||||
PROTO_TAG_ABRIDGED = b"\xef\xef\xef\xef"
|
||||
PROTO_TAG_INTERMEDIATE = b"\xee\xee\xee\xee"
|
||||
PROTO_TAG_SECURE = b"\xdd\xdd\xdd\xdd"
|
||||
|
||||
CBC_PADDING = 16
|
||||
PADDING_FILLER = b"\x04\x00\x00\x00"
|
||||
|
||||
MIN_MSG_LEN = 12
|
||||
MAX_MSG_LEN = 2 ** 24
|
||||
|
||||
my_ip_info = {"ipv4": None, "ipv6": None}
|
||||
|
||||
|
||||
def print_err(*params):
|
||||
print(*params, file=sys.stderr, flush=True)
|
||||
@@ -250,6 +282,31 @@ def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
|
||||
octets=octets, msgs=msgs)
|
||||
|
||||
|
||||
def get_curr_connects_count():
|
||||
global stats
|
||||
|
||||
all_connects = 0
|
||||
for user, stat in stats.items():
|
||||
all_connects += stat["curr_connects"]
|
||||
return all_connects
|
||||
|
||||
|
||||
def get_to_tg_bufsize():
|
||||
if isinstance(TO_TG_BUFSIZE, int):
|
||||
return TO_TG_BUFSIZE
|
||||
|
||||
low, margin, high = TO_TG_BUFSIZE
|
||||
return high if get_curr_connects_count() < margin else low
|
||||
|
||||
|
||||
def get_to_clt_bufsize():
|
||||
if isinstance(TO_CLT_BUFSIZE, int):
|
||||
return TO_CLT_BUFSIZE
|
||||
|
||||
low, margin, high = TO_CLT_BUFSIZE
|
||||
return high if get_curr_connects_count() < margin else low
|
||||
|
||||
|
||||
class LayeredStreamReaderBase:
|
||||
def __init__(self, upstream):
|
||||
self.upstream = upstream
|
||||
@@ -582,18 +639,30 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
|
||||
|
||||
|
||||
async def handle_handshake(reader, writer):
|
||||
global used_handshakes
|
||||
EMPTY_READ_BUF_SIZE = 4096
|
||||
|
||||
handshake = await reader.readexactly(HANDSHAKE_LEN)
|
||||
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
|
||||
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
|
||||
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
|
||||
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
|
||||
|
||||
if dec_prekey_and_iv in used_handshakes:
|
||||
ip = writer.get_extra_info('peername')[0]
|
||||
print_err("Active fingerprinting detected from %s, freezing it" % ip)
|
||||
while await reader.read(EMPTY_READ_BUF_SIZE):
|
||||
# just consume all the data
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
for user in USERS:
|
||||
secret = bytes.fromhex(USERS[user])
|
||||
|
||||
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
|
||||
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
|
||||
dec_key = hashlib.sha256(dec_prekey + secret).digest()
|
||||
decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big"))
|
||||
|
||||
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
|
||||
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
|
||||
enc_key = hashlib.sha256(enc_prekey + secret).digest()
|
||||
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
|
||||
|
||||
@@ -608,11 +677,14 @@ async def handle_handshake(reader, writer):
|
||||
|
||||
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
|
||||
|
||||
while len(used_handshakes) >= REPLAY_CHECK_LEN:
|
||||
used_handshakes.popitem(last=False)
|
||||
used_handshakes[dec_prekey_and_iv] = True
|
||||
|
||||
reader = CryptoWrappedStreamReader(reader, decryptor)
|
||||
writer = CryptoWrappedStreamWriter(writer, encryptor)
|
||||
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
|
||||
|
||||
EMPTY_READ_BUF_SIZE = 4096
|
||||
while await reader.read(EMPTY_READ_BUF_SIZE):
|
||||
# just consume all the data
|
||||
pass
|
||||
@@ -620,24 +692,31 @@ async def handle_handshake(reader, writer):
|
||||
return False
|
||||
|
||||
|
||||
def try_setsockopt(sock, level, option, value):
|
||||
try:
|
||||
sock.setsockopt(level, option, value)
|
||||
except OSError as E:
|
||||
pass
|
||||
|
||||
|
||||
def set_keepalive(sock, interval=40, attempts=5):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
if hasattr(socket, "TCP_KEEPIDLE"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
||||
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, interval)
|
||||
if hasattr(socket, "TCP_KEEPINTVL"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
||||
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
|
||||
if hasattr(socket, "TCP_KEEPCNT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_KEEPCNT, attempts)
|
||||
|
||||
|
||||
def set_ack_timeout(sock, timeout):
|
||||
if hasattr(socket, "TCP_USER_TIMEOUT"):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
|
||||
try_setsockopt(sock, socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, timeout*1000)
|
||||
|
||||
|
||||
def set_bufsizes(sock, recv_buf, send_buf):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
|
||||
try_setsockopt(sock, socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf)
|
||||
|
||||
|
||||
async def open_connection_tryer(addr, port, limit, timeout, max_attempts=3):
|
||||
@@ -674,7 +753,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(
|
||||
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
|
||||
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=TG_CONNECT_TIMEOUT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
|
||||
return False
|
||||
@@ -683,7 +762,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
||||
|
||||
while True:
|
||||
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
|
||||
@@ -774,7 +853,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
|
||||
|
||||
try:
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
|
||||
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
|
||||
timeout=TG_CONNECT_TIMEOUT)
|
||||
except ConnectionRefusedError as E:
|
||||
print_err("Got connection refused while trying to connect to", addr, port)
|
||||
@@ -784,7 +863,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
return False
|
||||
|
||||
set_keepalive(writer_tgt.get_extra_info("socket"))
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
|
||||
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
|
||||
|
||||
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
|
||||
|
||||
@@ -800,7 +879,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
|
||||
old_reader = reader_tgt
|
||||
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
|
||||
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
|
||||
ans = await reader_tgt.read(get_to_clt_bufsize())
|
||||
|
||||
if len(ans) != RPC_NONCE_ANS_LEN:
|
||||
return False
|
||||
@@ -883,8 +962,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
|
||||
async def handle_client(reader_clt, writer_clt):
|
||||
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
|
||||
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
|
||||
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
|
||||
set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize())
|
||||
|
||||
cl_ip, cl_port = writer_clt.get_extra_info('peername')[:2]
|
||||
try:
|
||||
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
|
||||
timeout=CLIENT_HANDSHAKE_TIMEOUT)
|
||||
@@ -904,7 +984,6 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
tg_data = await do_direct_handshake(proto_tag, dc_idx)
|
||||
else:
|
||||
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
|
||||
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
|
||||
|
||||
if not tg_data:
|
||||
@@ -937,7 +1016,8 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
return
|
||||
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
|
||||
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_short_first_pkt=False):
|
||||
is_first_pkt = True
|
||||
try:
|
||||
while True:
|
||||
data = await rd.read(rd_buf_size)
|
||||
@@ -946,6 +1026,19 @@ async def handle_client(reader_clt, writer_clt):
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
if is_first_pkt:
|
||||
# protection against replay-based fingerprinting
|
||||
MIN_FIRST_PKT_SIZE = 12
|
||||
if block_short_first_pkt and 0 < len(data) < MIN_FIRST_PKT_SIZE:
|
||||
# print_err("Active fingerprinting detected from %s, dropping it" % cl_ip)
|
||||
# print_err("If this causes problems set BLOCK_SHORT_FIRST_PKT = False "
|
||||
# "in the config")
|
||||
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
return
|
||||
is_first_pkt = False
|
||||
|
||||
if not data:
|
||||
wr.write_eof()
|
||||
await wr.drain()
|
||||
@@ -958,8 +1051,9 @@ async def handle_client(reader_clt, writer_clt):
|
||||
# print_err(e)
|
||||
pass
|
||||
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
|
||||
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
|
||||
block_short_first_pkt=BLOCK_SHORT_FIRST_PKT)
|
||||
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
|
||||
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
|
||||
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
|
||||
|
||||
@@ -995,34 +1089,63 @@ async def stats_printer():
|
||||
print(flush=True)
|
||||
|
||||
|
||||
async def update_middle_proxy_info():
|
||||
async def make_https_req(url):
|
||||
# returns resp body
|
||||
SSL_PORT = 443
|
||||
url_data = urllib.parse.urlparse(url)
|
||||
async def make_https_req(url, host="core.telegram.org"):
|
||||
""" Make request, return resp body and headers. """
|
||||
SSL_PORT = 443
|
||||
url_data = urllib.parse.urlparse(url)
|
||||
|
||||
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org",
|
||||
"Connection: close"]) + "\r\n\r\n"
|
||||
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: %s",
|
||||
"Connection: close"]) + "\r\n\r\n"
|
||||
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
|
||||
req = HTTP_REQ_TEMPLATE % (urllib.parse.quote(url_data.path), host)
|
||||
writer.write(req.encode("utf8"))
|
||||
data = await reader.read()
|
||||
writer.close()
|
||||
|
||||
headers, body = data.split(b"\r\n\r\n", 1)
|
||||
return headers, body
|
||||
|
||||
|
||||
async def get_srv_time():
|
||||
global USE_MIDDLE_PROXY
|
||||
TIME_SYNC_ADDR = "https://core.telegram.org/getProxySecret"
|
||||
MAX_TIME_SKEW = 30
|
||||
|
||||
want_to_reenable_advertising = False
|
||||
while True:
|
||||
try:
|
||||
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
|
||||
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
|
||||
writer.write(req.encode("utf8"))
|
||||
data = await reader.read()
|
||||
writer.close()
|
||||
headers, secret = await make_https_req(TIME_SYNC_ADDR)
|
||||
|
||||
headers, body = data.split(b"\r\n\r\n", 1)
|
||||
return body
|
||||
except Exception:
|
||||
return b""
|
||||
for line in headers.split(b"\r\n"):
|
||||
if not line.startswith(b"Date: "):
|
||||
continue
|
||||
line = line[len("Date: "):].decode()
|
||||
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
|
||||
now_time = datetime.datetime.utcnow()
|
||||
time_diff = (now_time-srv_time).total_seconds()
|
||||
if USE_MIDDLE_PROXY and abs(time_diff) > MAX_TIME_SKEW:
|
||||
print_err("Time skew detected, please set the clock")
|
||||
print_err("Server time:", srv_time, "your time:", now_time)
|
||||
print_err("Disabling advertising to continue serving")
|
||||
|
||||
USE_MIDDLE_PROXY = False
|
||||
want_to_reenable_advertising = True
|
||||
elif want_to_reenable_advertising and abs(time_diff) <= MAX_TIME_SKEW:
|
||||
print_err("Time is ok, reenabling advertising")
|
||||
USE_MIDDLE_PROXY = True
|
||||
want_to_reenable_advertising = False
|
||||
|
||||
except Exception as E:
|
||||
print_err("Error getting server time", E)
|
||||
|
||||
await asyncio.sleep(GET_TIME_PERIOD)
|
||||
|
||||
|
||||
async def update_middle_proxy_info():
|
||||
async def get_new_proxies(url):
|
||||
PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;")
|
||||
|
||||
ans = {}
|
||||
try:
|
||||
body = await make_https_req(url)
|
||||
except Exception:
|
||||
return ans
|
||||
headers, body = await make_https_req(url)
|
||||
|
||||
fields = PROXY_REGEXP.findall(body.decode("utf8"))
|
||||
if fields:
|
||||
@@ -1050,26 +1173,26 @@ async def update_middle_proxy_info():
|
||||
if not v4_proxies:
|
||||
raise Exception("no proxy data")
|
||||
TG_MIDDLE_PROXIES_V4 = v4_proxies
|
||||
except Exception:
|
||||
print_err("Error updating middle proxy list")
|
||||
except Exception as E:
|
||||
print_err("Error updating middle proxy list:", E)
|
||||
|
||||
try:
|
||||
v6_proxies = await get_new_proxies(PROXY_INFO_ADDR_V6)
|
||||
if not v6_proxies:
|
||||
raise Exception("no proxy data (ipv6)")
|
||||
TG_MIDDLE_PROXIES_V6 = v6_proxies
|
||||
except Exception:
|
||||
print_err("Error updating middle proxy list for IPv6")
|
||||
except Exception as E:
|
||||
print_err("Error updating middle proxy list for IPv6:", E)
|
||||
|
||||
try:
|
||||
secret = await make_https_req(PROXY_SECRET_ADDR)
|
||||
headers, secret = await make_https_req(PROXY_SECRET_ADDR)
|
||||
if not secret:
|
||||
raise Exception("no secret")
|
||||
if secret != PROXY_SECRET:
|
||||
PROXY_SECRET = secret
|
||||
print_err("Middle proxy secret updated")
|
||||
except Exception:
|
||||
print_err("Error updating middle proxy secret, using old")
|
||||
except Exception as E:
|
||||
print_err("Error updating middle proxy secret, using old", E)
|
||||
|
||||
await asyncio.sleep(PROXY_INFO_UPDATE_PERIOD)
|
||||
|
||||
@@ -1078,26 +1201,31 @@ def init_ip_info():
|
||||
global USE_MIDDLE_PROXY
|
||||
global PREFER_IPV6
|
||||
global my_ip_info
|
||||
TIMEOUT = 5
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen('http://ipv4.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||
if f.status != 200:
|
||||
raise Exception("Invalid status code")
|
||||
my_ip_info["ipv4"] = f.read().decode().strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if PREFER_IPV6:
|
||||
def get_ip_from_url(url):
|
||||
TIMEOUT = 5
|
||||
try:
|
||||
with urllib.request.urlopen('http://ipv6.myexternalip.com/raw', timeout=TIMEOUT) as f:
|
||||
with urllib.request.urlopen(url, timeout=TIMEOUT) as f:
|
||||
if f.status != 200:
|
||||
raise Exception("Invalid status code")
|
||||
my_ip_info["ipv6"] = f.read().decode().strip()
|
||||
return f.read().decode().strip()
|
||||
except Exception:
|
||||
PREFER_IPV6 = False
|
||||
else:
|
||||
return None
|
||||
|
||||
IPV4_URL1 = "http://v4.ident.me/"
|
||||
IPV4_URL2 = "http://ipv4.icanhazip.com/"
|
||||
|
||||
IPV6_URL1 = "http://v6.ident.me/"
|
||||
IPV6_URL2 = "http://ipv6.icanhazip.com/"
|
||||
|
||||
my_ip_info["ipv4"] = get_ip_from_url(IPV4_URL1) or get_ip_from_url(IPV4_URL2)
|
||||
my_ip_info["ipv6"] = get_ip_from_url(IPV6_URL1) or get_ip_from_url(IPV6_URL2)
|
||||
|
||||
if PREFER_IPV6:
|
||||
if my_ip_info["ipv6"]:
|
||||
print_err("IPv6 found, using it for external communication")
|
||||
else:
|
||||
PREFER_IPV6 = False
|
||||
|
||||
if USE_MIDDLE_PROXY:
|
||||
if ((not PREFER_IPV6 and not my_ip_info["ipv4"]) or
|
||||
@@ -1154,6 +1282,10 @@ def loop_exception_handler(loop, context):
|
||||
|
||||
|
||||
def main():
|
||||
setup_files_limit()
|
||||
setup_debug()
|
||||
try_setup_uvloop()
|
||||
|
||||
init_stats()
|
||||
|
||||
if sys.platform == "win32":
|
||||
@@ -1170,15 +1302,19 @@ def main():
|
||||
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
|
||||
asyncio.ensure_future(middle_proxy_updater_task)
|
||||
|
||||
if GET_TIME_PERIOD:
|
||||
time_get_task = asyncio.Task(get_srv_time())
|
||||
asyncio.ensure_future(time_get_task)
|
||||
|
||||
reuse_port = hasattr(socket, "SO_REUSEPORT")
|
||||
|
||||
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
|
||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
||||
server_v4 = loop.run_until_complete(task_v4)
|
||||
|
||||
if socket.has_ipv6:
|
||||
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
|
||||
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
|
||||
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
|
||||
server_v6 = loop.run_until_complete(task_v6)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user