-
Notifications
You must be signed in to change notification settings - Fork 3
/
eth_jsonrpc_http.py
45 lines (35 loc) · 1.4 KB
/
eth_jsonrpc_http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import requests
class EthRPCClient:
def __init__(self, uri):
self.rpcEndpoint = uri
def __enter__(self):
self.session = requests.Session()
def __exit__(self, exc_type, exc_value, exc_traceback):
self.session.close()
def debugAccountRange(self, startHash, count) -> ([(str, bool)], str):
payload = {
'method': 'debug_accountRange',
'params': ["latest", startHash, count, True, True, True],
'jsonrpc': '2.0',
"id": 0,
}
response = self.session.post(self.rpcEndpoint, json=payload)
result = response.json()['result']
zeroCodeHash = 'c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470'
result_accounts = []
for address, account in zip(result['accounts'].keys(), result['accounts'].values()):
result_accounts.append((address, account['codeHash'] != zeroCodeHash))
if not 'next' in result:
return (result_accounts, None)
else:
return (result_accounts, result['next'])
def ethGetCode(self, address) -> ([(str, bool)], str):
payload = {
'method': 'eth_getCode',
'params': [address, "latest"],
'jsonrpc': '2.0',
"id": 0,
}
response = self.session.post(self.rpcEndpoint, json=payload)
result = response.json()['result']
return result