-
Notifications
You must be signed in to change notification settings - Fork 0
/
addrman.py
107 lines (93 loc) · 4.6 KB
/
addrman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# https://cryptobook.nakov.com/cryptographic-hash-functions/secure-hash-algorithms#sha-3-sha3-256-sha3-512-keccak-256
# SHA-3 "Keccak" hashes family was winner in SHA-3 NIST competition 2013.
from hashlib import sha256, sha3_256
from base64 import b32decode
from binascii import crc32
ADDRMAN_NEW_BUCKETS_PER_SOURCE_GROUP = 64
ADDRMAN_NEW_BUCKET_COUNT = 1024
print("======")
print("getgroup")
print("======\n")
"""ipv4 address is made of 4 chunks (8 bit each)"""
def get_group_ipv4(ip):
ip_as_bytes = bytes(map(int, ip.split('.')))
return bytes([1]) + ip_as_bytes[:2]
print("ipv4", get_group_ipv4("1.2.3.4"))
"""ipv6 address is made of 8 chunks (16 bit each)"""
def get_group_ipv6(ip):
ip_as_bytes = bytes.fromhex(ip.replace(':',''))
return bytes([2]) + ip_as_bytes[:4]
print("ipv6", get_group_ipv6("2001:2001:9999:9999:9999:9999:9999:9999"))
"""
tor v3 address is 56 characters long. A tor v3 address will always end in a d due to the way v3 onion service names are encoded.
onion_address = base32(PUBKEY | CHECKSUM | VERSION) + ".onion"
CHECKSUM = H(".onion checksum" | PUBKEY | VERSION)[:2]
where:
- PUBKEY is the 32 bytes ed25519 master pubkey of the hidden service.
- VERSION is a one byte version field (default value '\x03')
- ".onion checksum" is a constant string
- CHECKSUM is truncated to two bytes before inserting it in onion_address
"""
def get_group_tor(ip):
ip = ip.replace('.onion','')
raw_bytes = b32decode(ip, casefold=True)
pubkey, checksum, version = raw_bytes[:32], raw_bytes[32:34], raw_bytes[-1:]
assert version == b'\x03'
assert checksum == sha3_256(b".onion checksum" + pubkey + version).digest()[:2]
return bytes([3]) + bytes([pubkey[0]|0xf]) # only first 4 bits of 1st byte taken
print("tor", get_group_tor("pg6mmjiyjmcrsslvykfwnntlaru7p5svn6y2ymmju6nubxndf4pscryd.onion")) #b'\x03\x7f'
def get_group_i2p(ip):
ip = ip.replace('.b32.i2p','')
# traditional b32 I2P addresses has 52 Base32 characters
raw_bytes = b32decode(ip+"====", casefold=True) # pad so that b32decode works
return bytes([4]) + bytes([raw_bytes[0]|0xf])
print("i2p", get_group_i2p("ukeu3k5oycgaauneqgtnvselmt4yemvoilkln7jpvamvfx7dnkdq.b32.i2p")) #b'\x04\xaf'
"""
CJDNS IPv6 address is generated by using the first 16 bytes of a double SHA-512 of your public key.
All cjdns IPv6 addresses must begin with "fc" or else they are invalid"""
def get_group_cjdns(ip):
ip_as_bytes = bytes.fromhex(ip.replace(':',''))
return bytes([5]) + bytes([ip_as_bytes[0]]) + bytes([ip_as_bytes[1]|0xf])
print("cjdns", get_group_cjdns("fc4b:50:7661:cccd:8697:40a4:5498:c51c"))# 05fc4f or b'\x05\xfcO'
print("======")
print("bucketing algo")
print("======\n")
def double_hash(bytes):
return sha256(sha256(bytes).digest()).digest()
def get_new_bucket(key, addr, src):
addr_group = get_group_ipv4(addr)
src_group = get_group_ipv4(src)
hash1 = int.from_bytes(double_hash(key + bytes([len(addr_group)]) + addr_group
+ bytes([len(src_group)]) + src_group)[:8], 'little')
hash1 = hash1 % ADDRMAN_NEW_BUCKETS_PER_SOURCE_GROUP #64
hash2 = int.from_bytes(double_hash(key + bytes([len(src_group)]) + src_group
+ hash1.to_bytes(8, 'little'))[:8], 'little')
return hash2 % ADDRMAN_NEW_BUCKET_COUNT #1024
key = bytes.fromhex("41f758f2e5cc078d3795b4fc0cb60c2d735fa92cc020572bdc982dd2d564d11b")
addr = "250.1.2.1"
src = "250.1.2.1"
bucket = get_new_bucket(key, addr, src)
print("bucket is", bucket) # 786
ADDRMAN_TRIED_BUCKETS_PER_GROUP = 8
ADDRMAN_TRIED_BUCKET_COUNT = 256
def get_key(ip, port):
ip_as_bytes = bytes(map(int, ip.split('.')))
print(ip_as_bytes.hex())
print((port // 0x100).to_bytes(1, 'little').hex())
print((port & 0x0FF).to_bytes(1, 'little').hex())
return ip_as_bytes + (port // 0x100).to_bytes(1, 'little') + (port & 0x0FF).to_bytes(1, 'little')
def get_tried_bucket(key, addr, port):
addr_group = get_group_ipv4(addr)
addr_id = get_key(addr, port) #00000000000000000000ffff fa010101208d
print(addr_id.hex())
hash1 = int.from_bytes(double_hash(key + bytes([len(addr_id)]) + addr_id), 'little')
hash1 = hash1 % ADDRMAN_TRIED_BUCKETS_PER_GROUP
hash2 = int.from_bytes(double_hash(key + bytes([len(addr_group)]) + addr_group
+ hash1.to_bytes(8, 'little'))[:8], 'little')
return hash2 % ADDRMAN_TRIED_BUCKET_COUNT
key = bytes.fromhex("1bd164d5d22d98dc2b5720c02ca95f732d0cb60cfcb495378d07cce5f258f741")
addr = "250.1.1.1"
port = 8333
src = "250.1.1.1"
bucket = get_tried_bucket(key, addr, port)
print("bucket is", bucket)