mirror of
https://github.com/alexbers/mtprotoproxy.git
synced 2026-03-14 07:13:09 +00:00
time based protection against replay attack
This commit is contained in:
@@ -77,6 +77,7 @@ MAX_MSG_LEN = 2 ** 24
|
||||
my_ip_info = {"ipv4": None, "ipv6": None}
|
||||
used_handshakes = collections.OrderedDict()
|
||||
disable_middle_proxy = False
|
||||
is_time_skewed = False
|
||||
fake_cert_len = random.randrange(1024, 4096)
|
||||
|
||||
config = {}
|
||||
@@ -842,6 +843,8 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
global used_handshakes
|
||||
global fake_cert_len
|
||||
|
||||
TIME_SKEW_TOLERANCE = 120
|
||||
|
||||
TLS_VERS = b"\x03\x03"
|
||||
TLS_CIPHERSUITE = b"\x13\x01"
|
||||
TLS_CHANGE_CIPHER = b"\x14" + TLS_VERS + b"\x00\x01\x01"
|
||||
@@ -875,10 +878,14 @@ async def handle_fake_tls_handshake(handshake, reader, writer, peer):
|
||||
xored_digest = bytes(digest[i] ^ computed_digest[i] for i in range(DIGEST_LEN))
|
||||
digest_good = xored_digest.startswith(b"\x00" * (DIGEST_LEN-4))
|
||||
|
||||
timestamp = int.from_bytes(xored_digest[-4:], "little")
|
||||
if not digest_good:
|
||||
continue
|
||||
|
||||
timestamp = int.from_bytes(xored_digest[-4:], "little")
|
||||
if not is_time_skewed and abs(time.time() - timestamp) > TIME_SKEW_TOLERANCE:
|
||||
print_err("Client with time skew detected from %s, can be a replay-attack" % peer[0])
|
||||
continue
|
||||
|
||||
http_data = bytearray([random.randrange(0, 256) for i in range(fake_cert_len)])
|
||||
|
||||
srv_hello = TLS_VERS + b"\x00"*DIGEST_LEN + bytes([sess_id_len]) + sess_id
|
||||
@@ -1453,6 +1460,7 @@ async def get_srv_time():
|
||||
MAX_TIME_SKEW = 30
|
||||
|
||||
global disable_middle_proxy
|
||||
global is_time_skewed
|
||||
|
||||
want_to_reenable_advertising = False
|
||||
while True:
|
||||
@@ -1465,15 +1473,16 @@ async def get_srv_time():
|
||||
line = line[len("Date: "):].decode()
|
||||
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
|
||||
now_time = datetime.datetime.utcnow()
|
||||
time_skew = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW
|
||||
if time_skew and config.USE_MIDDLE_PROXY and not disable_middle_proxy:
|
||||
is_time_skewed = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW
|
||||
if is_time_skewed and config.USE_MIDDLE_PROXY and not disable_middle_proxy:
|
||||
print_err("Time skew detected, please set the clock")
|
||||
print_err("Server time:", srv_time, "your time:", now_time)
|
||||
print_err("Disabling advertising to continue serving")
|
||||
print_err("Putting down the shields against replay attacks")
|
||||
|
||||
disable_middle_proxy = True
|
||||
want_to_reenable_advertising = True
|
||||
elif not time_skew and want_to_reenable_advertising:
|
||||
elif not is_time_skewed and want_to_reenable_advertising:
|
||||
print_err("Time is ok, reenabling advertising")
|
||||
disable_middle_proxy = False
|
||||
want_to_reenable_advertising = False
|
||||
|
||||
Reference in New Issue
Block a user