-
Notifications
You must be signed in to change notification settings - Fork 1
/
creek_interpolate.py
102 lines (95 loc) · 3.71 KB
/
creek_interpolate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import logging
import logging.handlers
import os
from datetime import date
from datetime import time
from datetime import datetime as dt
from datetime import timedelta as td
import pandas as pd
import pytz as tz
import multiprocessing as mp
import glob
import config as g
import refresh_bars as rb
# This global list will contain the symbols that have been interpolated
interpolated = []
def interpolate(symbol):
logger = logging.getLogger(__name__)
logger.info('Interpolating %s', symbol)
path = os.path.join(g.minute_bar_dir, symbol + '.csv')
try:
bars = pd.read_csv(path)
except FileNotFoundError:
logger.error('%s.csv not found' % symbol)
return []
if bars.empty:
return []
interpolated_bars = bars.drop(columns=['symbol','open','high','low','close','volume','trade_count'],axis=1)
interpolated_bars.set_index('timestamp', inplace=True)
interpolated_bars.index = pd.to_datetime(interpolated_bars.index)
start_date = date.today().replace(year=date.today().year-1)
buffer_start_date = start_date - td(days=7)
end_date = date.today() - td(days=2)
t = time(hour=0,minute=0,tzinfo=tz.timezone('UTC'))
start = dt.combine(start_date, t)
buffer_start = dt.combine(buffer_start_date, t)
end = dt.combine(end_date, t)
if (interpolated_bars.iloc[0].name > buffer_start): return []
if (interpolated_bars.iloc[-1].name < end):
interpolated_bars.loc[end] = interpolated_bars.iloc[-1]['vwap']
interpolated_bars = interpolated_bars.resample('1T').interpolate('linear')
interpolated_bars = interpolated_bars[start:end]
return interpolated_bars
def interpolate_wrapper(symbol, length):
logger = logging.getLogger(__name__)
b = interpolate(symbol)
if len(b) == length:
path = os.path.join(g.interpolated_bars_dir, symbol + '.csv')
b.to_csv(path)
return symbol, 1
elif len(b) > 0:
logger.error('%s has %s bars, should have %s bars' % (symbol, len(b), length))
else: return symbol, 0
def interpolated_callback(result):
global interpolated
if result[1]:
interpolated.append(result[0])
def pool_error_callback(error):
logger = logging.getLogger(__name__)
logger.error('Pool error: %s', error)
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
handlers=[logging.handlers.WatchedFileHandler(os.environ.get("LOGFILE", "creek-interpolate.log"))]
)
logger = logging.getLogger(__name__)
path = os.path.join(g.interpolated_bars_dir, '*')
files = glob.glob(path)
for f in files:
os.remove(f)
symbol_list = list(set(rb.get_shortable_equities() + rb.get_open_symbols()))
target_length = len(interpolate('AAPL'))
logger.info('Target length = %s' % target_length)
path = os.path.join(g.pearson_dir, 'pearson.config')
with open(path, 'w') as f:
f.write(str(target_length))
pool = mp.Pool(mp.cpu_count())
logger.info('Initializing %s pools', mp.cpu_count())
for symbol in symbol_list:
# A special problem is the construction of tuples containing 0 or 1
# items: the syntax has some extra quirks to accommodate these.
# Empty tuples are constructed by an empty pair of parentheses; a
# tuple with one item is constructed by following a value with a
# comma (it is not sufficient to enclose a single value in
# parentheses). Ugly, but effective.
pool.apply_async(interpolate_wrapper, args=(symbol, target_length), callback=interpolated_callback, error_callback=pool_error_callback)
pool.close()
# postpones the execution of next line of code until all processes in
# the queue are done.
pool.join()
interpolated_df = pd.DataFrame({'symbol': interpolated})
path = os.path.join(g.pearson_dir, 'interpolated.csv')
interpolated_df.to_csv(path)
if __name__ == '__main__':
main()