-
Notifications
You must be signed in to change notification settings - Fork 0
/
orderbook.py
142 lines (124 loc) · 4.95 KB
/
orderbook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import json
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.inputs import PartitionedInput, StatefulSource
# pip install websocket-client
from websocket import create_connection
class CoinfbaseSource(StatefulSource):
def __init__(self, product_id):
self.product_id = product_id
self.ws = create_connection("wss://ws-feed.exchange.coinbase.com")
self.ws.send(
json.dumps(
{
"type": "subscribe",
"product_ids": [product_id],
"channels": ["level2_batch"],
}
)
)
# The first msg is just a confirmation that we have subscribed.
print(self.ws.recv())
def next_batch(self):
return [self.ws.recv()]
def snapshot(self):
return None
def close(self):
self.ws.close()
class CoinbaseFeedInput(PartitionedInput):
def __init__(self, product_ids):
self.product_ids = product_ids
def list_parts(self):
return set(self.product_ids)
def build_part(self, for_key, resume_state):
assert resume_state is None
return CoinfbaseSource(for_key)
def key_on_product(data):
return (data["product_id"], data)
class OrderBook:
def __init__(self):
# if using Python < 3.7 need to use OrderedDict here
self.bids = {}
self.asks = {}
self.bid_price = None
self.ask_price = None
def update(self, data):
if self.bids == {}:
self.bids = {float(price): float(size) for price, size in data["bids"]}
# The bid_price is the highest priced buy limit order.
# since the bids are in order, the first item of our newly constructed bids
# will have our bid price, so we can track the best bid
self.bid_price = next(iter(self.bids))
if self.asks == {}:
self.asks = {float(price): float(size) for price, size in data["asks"]}
# The ask price is the lowest priced sell limit order.
# since the asks are in order, the first item of our newly constructed
# asks will be our ask price, so we can track the best ask
self.ask_price = next(iter(self.asks))
else:
# We receive a list of lists here, normally it is only one change,
# but could be more than one.
for update in data["changes"]:
price = float(update[1])
size = float(update[2])
if update[0] == "sell":
# first check if the size is zero and needs to be removed
if size == 0.0:
try:
del self.asks[price]
# if it was the ask price removed,
# update with new ask price
if price <= self.ask_price:
self.ask_price = min(self.asks.keys())
except KeyError:
# don't need to add price with size zero
pass
else:
self.asks[price] = size
if price < self.ask_price:
self.ask_price = price
if update[0] == "buy":
# first check if the size is zero and needs to be removed
if size == 0.0:
try:
del self.bids[price]
# if it was the bid price removed,
# update with new bid price
if price >= self.bid_price:
self.bid_price = max(self.bids.keys())
except KeyError:
# don't need to add price with size zero
pass
else:
self.bids[price] = size
if price > self.bid_price:
self.bid_price = price
return self, {
"bid": self.bid_price,
"bid_size": self.bids[self.bid_price],
"ask": self.ask_price,
"ask_price": self.asks[self.ask_price],
"spread": self.ask_price - self.bid_price,
}
flow = Dataflow()
flow.input("input", CoinbaseFeedInput(["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"]))
flow.map(json.loads)
# {
# 'type': 'l2update',
# 'product_id': 'BTC-USD',
# 'changes': [['buy', '36905.39', '0.00334873']],
# 'time': '2022-05-05T17:25:09.072519Z',
# }
flow.map(key_on_product)
# ('BTC-USD', {
# 'type': 'l2update',
# 'product_id': 'BTC-USD',
# 'changes': [['buy', '36905.39', '0.00334873']],
# 'time': '2022-05-05T17:25:09.072519Z',
# })
flow.stateful_map("order_book", lambda: OrderBook(), OrderBook.update)
# ('BTC-USD', (36905.39, 0.00334873, 36905.4, 1.6e-05, 0.010000000002037268))
flow.filter(
lambda x: x[-1]["spread"] / x[-1]["ask"] > 0.0001
) # filter on 0.1% spread as a per
flow.output("out", StdOutput())