9 Commits

2 changed files with 151 additions and 43 deletions

View File

@@ -6,9 +6,9 @@ USERS = {
"tg2": "0123456789abcdef0123456789abcdef"
}
# Makes the proxy harder to detect
# Can be incompatible with very old clients
SECURE_ONLY = True
# Tag for advertising, obtainable from @MTProxybot
# AD_TAG = "3c09c680b76ee91a4c25ad51f742267d"
# Uncommenting this do make a proxy harder to detect
# But it can be incompatible with old clients
# SECURE_ONLY = True

View File

@@ -6,6 +6,7 @@ import urllib.parse
import urllib.request
import collections
import time
import datetime
import hashlib
import random
import binascii
@@ -43,16 +44,26 @@ FAST_MODE = config.get("FAST_MODE", True)
# doesn't allow to connect in not-secure mode
SECURE_ONLY = config.get("SECURE_ONLY", False)
# length of used handshake randoms for active fingerprinting protection
REPLAY_CHECK_LEN = config.get("REPLAY_CHECK_LEN", 32768)
# block short first packets to even more protect against replay-based fingerprinting
BLOCK_SHORT_FIRST_PKT = config.get("BLOCK_SHORT_FIRST_PKT", False)
# delay in seconds between stats printing
STATS_PRINT_PERIOD = config.get("STATS_PRINT_PERIOD", 600)
# delay in seconds between middle proxy info updates
PROXY_INFO_UPDATE_PERIOD = config.get("PROXY_INFO_UPDATE_PERIOD", 24*60*60)
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", 16384)
# delay in seconds between time getting, zero means disabled
GET_TIME_PERIOD = config.get("GET_TIME_PERIOD", 10*60)
# max socket buffer size to the telegram servers direction
# max socket buffer size to the client direction, the more the faster, but more RAM hungry
# can be the tuple (low, users_margin, high) for the adaptive case. If no much users, use high
TO_CLT_BUFSIZE = config.get("TO_CLT_BUFSIZE", (16384, 100, 131072))
# max socket buffer size to the telegram servers direction, also can be the tuple
TO_TG_BUFSIZE = config.get("TO_TG_BUFSIZE", 65536)
# keepalive period for clients in secs
@@ -73,6 +84,7 @@ LISTEN_ADDR_IPV4 = config.get("LISTEN_ADDR_IPV4", "0.0.0.0")
# listen address for IPv6
LISTEN_ADDR_IPV6 = config.get("LISTEN_ADDR_IPV6", "::")
TG_DATACENTER_PORT = 443
TG_DATACENTERS_V4 = [
@@ -130,6 +142,7 @@ MIN_MSG_LEN = 12
MAX_MSG_LEN = 2 ** 24
my_ip_info = {"ipv4": None, "ipv6": None}
used_handshakes = collections.OrderedDict()
def setup_files_limit():
@@ -269,6 +282,31 @@ def update_stats(user, connects=0, curr_connects=0, octets=0, msgs=0):
octets=octets, msgs=msgs)
def get_curr_connects_count():
global stats
all_connects = 0
for user, stat in stats.items():
all_connects += stat["curr_connects"]
return all_connects
def get_to_tg_bufsize():
if isinstance(TO_TG_BUFSIZE, int):
return TO_TG_BUFSIZE
low, margin, high = TO_TG_BUFSIZE
return high if get_curr_connects_count() < margin else low
def get_to_clt_bufsize():
if isinstance(TO_CLT_BUFSIZE, int):
return TO_CLT_BUFSIZE
low, margin, high = TO_CLT_BUFSIZE
return high if get_curr_connects_count() < margin else low
class LayeredStreamReaderBase:
def __init__(self, upstream):
self.upstream = upstream
@@ -601,18 +639,30 @@ class ProxyReqStreamWriter(LayeredStreamWriterBase):
async def handle_handshake(reader, writer):
global used_handshakes
EMPTY_READ_BUF_SIZE = 4096
handshake = await reader.readexactly(HANDSHAKE_LEN)
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
if dec_prekey_and_iv in used_handshakes:
ip = writer.get_extra_info('peername')[0]
print_err("Active fingerprinting detected from %s, freezing it" % ip)
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
return False
for user in USERS:
secret = bytes.fromhex(USERS[user])
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN]
dec_prekey, dec_iv = dec_prekey_and_iv[:PREKEY_LEN], dec_prekey_and_iv[PREKEY_LEN:]
dec_key = hashlib.sha256(dec_prekey + secret).digest()
decryptor = create_aes_ctr(key=dec_key, iv=int.from_bytes(dec_iv, "big"))
enc_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN+PREKEY_LEN+IV_LEN][::-1]
enc_prekey, enc_iv = enc_prekey_and_iv[:PREKEY_LEN], enc_prekey_and_iv[PREKEY_LEN:]
enc_key = hashlib.sha256(enc_prekey + secret).digest()
encryptor = create_aes_ctr(key=enc_key, iv=int.from_bytes(enc_iv, "big"))
@@ -627,11 +677,14 @@ async def handle_handshake(reader, writer):
dc_idx = int.from_bytes(decrypted[DC_IDX_POS:DC_IDX_POS+2], "little", signed=True)
while len(used_handshakes) >= REPLAY_CHECK_LEN:
used_handshakes.popitem(last=False)
used_handshakes[dec_prekey_and_iv] = True
reader = CryptoWrappedStreamReader(reader, decryptor)
writer = CryptoWrappedStreamWriter(writer, encryptor)
return reader, writer, proto_tag, user, dc_idx, enc_key + enc_iv
EMPTY_READ_BUF_SIZE = 4096
while await reader.read(EMPTY_READ_BUF_SIZE):
# just consume all the data
pass
@@ -700,7 +753,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
try:
reader_tgt, writer_tgt = await open_connection_tryer(
dc, TG_DATACENTER_PORT, limit=TO_CLT_BUFSIZE, timeout=TG_CONNECT_TIMEOUT)
dc, TG_DATACENTER_PORT, limit=get_to_clt_bufsize(), timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", dc, TG_DATACENTER_PORT)
return False
@@ -709,7 +762,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
while True:
rnd = bytearray([random.randrange(0, 256) for i in range(HANDSHAKE_LEN)])
@@ -800,7 +853,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
addr, port = random.choice(TG_MIDDLE_PROXIES_V4[dc_idx])
try:
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=TO_CLT_BUFSIZE,
reader_tgt, writer_tgt = await open_connection_tryer(addr, port, limit=get_to_clt_bufsize(),
timeout=TG_CONNECT_TIMEOUT)
except ConnectionRefusedError as E:
print_err("Got connection refused while trying to connect to", addr, port)
@@ -810,7 +863,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
return False
set_keepalive(writer_tgt.get_extra_info("socket"))
set_bufsizes(writer_tgt.get_extra_info("socket"), TO_CLT_BUFSIZE, TO_TG_BUFSIZE)
set_bufsizes(writer_tgt.get_extra_info("socket"), get_to_clt_bufsize(), get_to_tg_bufsize())
writer_tgt = MTProtoFrameStreamWriter(writer_tgt, START_SEQ_NO)
@@ -826,7 +879,7 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
old_reader = reader_tgt
reader_tgt = MTProtoFrameStreamReader(reader_tgt, START_SEQ_NO)
ans = await reader_tgt.read(TO_CLT_BUFSIZE)
ans = await reader_tgt.read(get_to_clt_bufsize())
if len(ans) != RPC_NONCE_ANS_LEN:
return False
@@ -909,8 +962,9 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def handle_client(reader_clt, writer_clt):
set_keepalive(writer_clt.get_extra_info("socket"), CLIENT_KEEPALIVE, attempts=3)
set_ack_timeout(writer_clt.get_extra_info("socket"), CLIENT_ACK_TIMEOUT)
set_bufsizes(writer_clt.get_extra_info("socket"), TO_TG_BUFSIZE, TO_CLT_BUFSIZE)
set_bufsizes(writer_clt.get_extra_info("socket"), get_to_tg_bufsize(), get_to_clt_bufsize())
cl_ip, cl_port = writer_clt.get_extra_info('peername')[:2]
try:
clt_data = await asyncio.wait_for(handle_handshake(reader_clt, writer_clt),
timeout=CLIENT_HANDSHAKE_TIMEOUT)
@@ -930,7 +984,6 @@ async def handle_client(reader_clt, writer_clt):
else:
tg_data = await do_direct_handshake(proto_tag, dc_idx)
else:
cl_ip, cl_port = writer_clt.upstream.get_extra_info('peername')[:2]
tg_data = await do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port)
if not tg_data:
@@ -963,7 +1016,8 @@ async def handle_client(reader_clt, writer_clt):
else:
return
async def connect_reader_to_writer(rd, wr, user, rd_buf_size):
async def connect_reader_to_writer(rd, wr, user, rd_buf_size, block_short_first_pkt=False):
is_first_pkt = True
try:
while True:
data = await rd.read(rd_buf_size)
@@ -972,6 +1026,19 @@ async def handle_client(reader_clt, writer_clt):
else:
extra = {}
if is_first_pkt:
# protection against replay-based fingerprinting
MIN_FIRST_PKT_SIZE = 12
if block_short_first_pkt and 0 < len(data) < MIN_FIRST_PKT_SIZE:
# print_err("Active fingerprinting detected from %s, dropping it" % cl_ip)
# print_err("If this causes problems set BLOCK_SHORT_FIRST_PKT = False "
# "in the config")
wr.write_eof()
await wr.drain()
return
is_first_pkt = False
if not data:
wr.write_eof()
await wr.drain()
@@ -984,8 +1051,9 @@ async def handle_client(reader_clt, writer_clt):
# print_err(e)
pass
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, TO_CLT_BUFSIZE)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, TO_TG_BUFSIZE)
tg_to_clt = connect_reader_to_writer(reader_tg, writer_clt, user, get_to_clt_bufsize(),
block_short_first_pkt=BLOCK_SHORT_FIRST_PKT)
clt_to_tg = connect_reader_to_writer(reader_clt, writer_tg, user, get_to_tg_bufsize())
task_tg_to_clt = asyncio.ensure_future(tg_to_clt)
task_clt_to_tg = asyncio.ensure_future(clt_to_tg)
@@ -1021,27 +1089,63 @@ async def stats_printer():
print(flush=True)
async def make_https_req(url, host="core.telegram.org"):
""" Make request, return resp body and headers. """
SSL_PORT = 443
url_data = urllib.parse.urlparse(url)
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: %s",
"Connection: close"]) + "\r\n\r\n"
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % (urllib.parse.quote(url_data.path), host)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1)
return headers, body
async def get_srv_time():
global USE_MIDDLE_PROXY
TIME_SYNC_ADDR = "https://core.telegram.org/getProxySecret"
MAX_TIME_SKEW = 30
want_to_reenable_advertising = False
while True:
try:
headers, secret = await make_https_req(TIME_SYNC_ADDR)
for line in headers.split(b"\r\n"):
if not line.startswith(b"Date: "):
continue
line = line[len("Date: "):].decode()
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
now_time = datetime.datetime.utcnow()
time_diff = (now_time-srv_time).total_seconds()
if USE_MIDDLE_PROXY and abs(time_diff) > MAX_TIME_SKEW:
print_err("Time skew detected, please set the clock")
print_err("Server time:", srv_time, "your time:", now_time)
print_err("Disabling advertising to continue serving")
USE_MIDDLE_PROXY = False
want_to_reenable_advertising = True
elif want_to_reenable_advertising and abs(time_diff) <= MAX_TIME_SKEW:
print_err("Time is ok, reenabling advertising")
USE_MIDDLE_PROXY = True
want_to_reenable_advertising = False
except Exception as E:
print_err("Error getting server time", E)
await asyncio.sleep(GET_TIME_PERIOD)
async def update_middle_proxy_info():
async def make_https_req(url):
# returns resp body
SSL_PORT = 443
url_data = urllib.parse.urlparse(url)
HTTP_REQ_TEMPLATE = "\r\n".join(["GET %s HTTP/1.1", "Host: core.telegram.org",
"Connection: close"]) + "\r\n\r\n"
reader, writer = await asyncio.open_connection(url_data.netloc, SSL_PORT, ssl=True)
req = HTTP_REQ_TEMPLATE % urllib.parse.quote(url_data.path)
writer.write(req.encode("utf8"))
data = await reader.read()
writer.close()
headers, body = data.split(b"\r\n\r\n", 1)
return body
async def get_new_proxies(url):
PROXY_REGEXP = re.compile(r"proxy_for\s+(-?\d+)\s+(.+):(\d+)\s*;")
ans = {}
body = await make_https_req(url)
headers, body = await make_https_req(url)
fields = PROXY_REGEXP.findall(body.decode("utf8"))
if fields:
@@ -1081,7 +1185,7 @@ async def update_middle_proxy_info():
print_err("Error updating middle proxy list for IPv6:", E)
try:
secret = await make_https_req(PROXY_SECRET_ADDR)
headers, secret = await make_https_req(PROXY_SECRET_ADDR)
if not secret:
raise Exception("no secret")
if secret != PROXY_SECRET:
@@ -1108,7 +1212,7 @@ def init_ip_info():
except Exception:
return None
IPV4_URL1 = "http://v4.ident.me/)"
IPV4_URL1 = "http://v4.ident.me/"
IPV4_URL2 = "http://ipv4.icanhazip.com/"
IPV6_URL1 = "http://v6.ident.me/"
@@ -1198,15 +1302,19 @@ def main():
middle_proxy_updater_task = asyncio.Task(update_middle_proxy_info())
asyncio.ensure_future(middle_proxy_updater_task)
if GET_TIME_PERIOD:
time_get_task = asyncio.Task(get_srv_time())
asyncio.ensure_future(time_get_task)
reuse_port = hasattr(socket, "SO_REUSEPORT")
task_v4 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV4, PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v4 = loop.run_until_complete(task_v4)
if socket.has_ipv6:
task_v6 = asyncio.start_server(handle_client_wrapper, LISTEN_ADDR_IPV6, PORT,
limit=TO_TG_BUFSIZE, reuse_port=reuse_port, loop=loop)
limit=get_to_tg_bufsize(), reuse_port=reuse_port, loop=loop)
server_v6 = loop.run_until_complete(task_v6)
try: