diff --git a/mtprotoproxy.py b/mtprotoproxy.py index e016438..0cc59dd 100755 --- a/mtprotoproxy.py +++ b/mtprotoproxy.py @@ -77,6 +77,7 @@ MAX_MSG_LEN = 2 ** 24 my_ip_info = {"ipv4": None, "ipv6": None} used_handshakes = collections.OrderedDict() disable_middle_proxy = False +is_time_skewed = False fake_cert_len = random.randrange(1024, 4096) config = {} @@ -842,6 +843,8 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer): global used_handshakes global fake_cert_len + TIME_SKEW_TOLERANCE = 120 + TLS_VERS = b"\x03\x03" TLS_CIPHERSUITE = b"\x13\x01" TLS_CHANGE_CIPHER = b"\x14" + TLS_VERS + b"\x00\x01\x01" @@ -875,10 +878,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer): xored_digest = bytes(digest[i] ^ computed_digest[i] for i in range(DIGEST_LEN)) digest_good = xored_digest.startswith(b"\x00" * (DIGEST_LEN-4)) - timestamp = int.from_bytes(xored_digest[-4:], "little") if not digest_good: continue + timestamp = int.from_bytes(xored_digest[-4:], "little") + if not is_time_skewed and abs(time.time() - timestamp) > TIME_SKEW_TOLERANCE: + print_err("Client with time skew detected from %s, can be a replay-attack" % peer[0]) + continue + http_data = bytearray([random.randrange(0, 256) for i in range(fake_cert_len)]) srv_hello = TLS_VERS + b"\x00"*DIGEST_LEN + bytes([sess_id_len]) + sess_id @@ -1453,6 +1460,7 @@ async def get_srv_time(): MAX_TIME_SKEW = 30 global disable_middle_proxy + global is_time_skewed want_to_reenable_advertising = False while True: @@ -1465,15 +1473,16 @@ async def get_srv_time(): line = line[len("Date: "):].decode() srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z") now_time = datetime.datetime.utcnow() - time_skew = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW - if time_skew and config.USE_MIDDLE_PROXY and not disable_middle_proxy: + is_time_skewed = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW + if is_time_skewed and config.USE_MIDDLE_PROXY and not disable_middle_proxy: print_err("Time skew detected, please set the clock") print_err("Server time:", srv_time, "your time:", now_time) print_err("Disabling advertising to continue serving") + print_err("Putting down the shields against replay attacks") disable_middle_proxy = True want_to_reenable_advertising = True - elif not time_skew and want_to_reenable_advertising: + elif not is_time_skewed and want_to_reenable_advertising: print_err("Time is ok, reenabling advertising") disable_middle_proxy = False want_to_reenable_advertising = False