-
Notifications
You must be signed in to change notification settings - Fork 0
/
lob_dataset.py
221 lines (192 loc) · 12.3 KB
/
lob_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# -*- coding: utf-8 -*-
# @Time : 2024/4/16 16:41
# @Author : Karry Ren
""" The torch.Dataset of Future LOB dataset.
After the preprocessing raw Future LOB dataset (download from Qlib following `README.md`) by
run `python lob_preprocess.py` you will get the following Future LOB dataset directory:
FUTURE_LOB_DATASET_PATH/
├── 0.5_seconds
├── 05_seconds_20220104.csv
├── 05_seconds_20220105.csv
├── ...
└── 05_seconds_20221230.csv
├── 1_second
├── 1_second_20220104.csv
├── 1_second_20220105.csv
├── ...
└── 1_second_20221230.csv
├── 10_seconds
├── 10_seconds_20220104.csv
├── 10_seconds_20220105.csv
├── ...
└── 10_seconds_20221230.csv
├── 30_seconds
├── 30_seconds_20220104.csv
├── 30_seconds_20220105.csv
├── ...
└── 30_seconds_20221230.csv
└── 1_minute
├── 1_minute_20220104.csv
├── 1_minute_20220105.csv
├── ...
└── 1_minute_20221230.csv
In this dataset:
- during `__init__()`, we will READ target `.csv` files of multi-granularity data to memory.
- during `__getitem__()`, we will READ 1 item with multi-granularity data and lag it by `MINUTE` and do the Z-Score normalization.
"""
import os
from torch.utils import data
import pandas as pd
import numpy as np
class LOBDataset(data.Dataset):
""" The torch.Dataset of Future LOB dataset. """
def __init__(self, root_path: str, start_date: str, end_date: str, time_steps: int = 2, need_norm: bool = True):
""" The init function of LOBDataset. Will READ target `.csv` files of multi-granularity data to memory.
For this dataset, the task is predicting the 1-minute return, so let the 1-minute data be core !!
:param root_path: the root path of Future LOB dataset
:param start_date: the start date, format should be "yyyymmdd"
:param end_date: the end date, format should be "yyyymmdd"
:param time_steps: the time steps (lag steps)
:param need_norm: whether to normalize the data
NOTE:
- the start_date and end_date will be [start_date, end_date] meaning start close and end close.
~ for train: the start_date and end_date is ["20220101", "20220831"] meaning 8 months (161 days)
~ for valid: the start_date and end_date is ["20220901", "20221031"] meaning 2 months (37 days)
~ for test: the start_date and end_date is ["20221101", "20221231"] meaning 2 months (44 days)
"""
assert start_date < end_date, f"end_date muse be greater than start_date !!!"
# ---- Step 0. Set the params ---- #
self.T = time_steps # time steps (seq len)
self.date_num, self.total_minute_num = 0, 240 # the date_num and tick_num
self.need_norm = need_norm # whether to normalize the lob data
# ---- Step 1. Define the feature and label list ---- #
self.label_list = [] # label list, each item is a daily label array (T, 1) for one date
self.mg_features_list_dict = {
"feature_1_minute": [],
"feature_30_seconds": [],
"feature_10_seconds": [],
"feature_1_second": [],
"feature_0.5_seconds": []
} # features key-value pair, each item of dict is a list of features for one date
# ---- Step 2. Read target `.csv` data date by date ---- #
file_feature_1_minute_list = sorted(os.listdir(f"{root_path}/1_minute")) # read all 1_minute files
for file in file_feature_1_minute_list: # for loop the date to read
date = file.split("_")[-1].split(".")[0] # get the date
if start_date <= date <= end_date: # select the date in [start_date, end_date]
# append the label
self.label_list.append(pd.read_csv(f"{root_path}/1_minute_label/1_minute_label_{date}.csv").values)
# append the 1-minute feature
self.mg_features_list_dict["feature_1_minute"].append(pd.read_csv(f"{root_path}/1_minute/1_minute_{date}.csv").values)
# append the 30-seconds feature
self.mg_features_list_dict["feature_30_seconds"].append(pd.read_csv(f"{root_path}/30_seconds/30_seconds_{date}.csv").values)
# append the 10-seconds feature
self.mg_features_list_dict["feature_10_seconds"].append(pd.read_csv(f"{root_path}/10_seconds/10_seconds_{date}.csv").values)
# append the 1-second feature
self.mg_features_list_dict["feature_1_second"].append(pd.read_csv(f"{root_path}/1_second/1_second_{date}.csv").values)
# append the 0.5-seconds feature
self.mg_features_list_dict["feature_0.5_seconds"].append(pd.read_csv(f"{root_path}/0.5_seconds/0.5_seconds_{date}.csv").values)
# add the date number
self.date_num += 1
def __len__(self):
""" Get the length of dataset. """
return self.date_num * self.total_minute_num
def __getitem__(self, idx: int):
""" Get the item based on idx, and lag the item.
return: item_data (one lagged minute sample of one date)
- `mg_features`: the multi-granularity (5 kinds of granularity) features of Future LOB dataset, the format is:
{
"g1": , shape=(time_steps, 1, 1), # feature_1_minute
"g2": , shape=(time_steps, 2, 1), # feature_30_seconds
"g3": , shape=(time_steps, 6, 1), # feature_10_seconds
"g4": , shape=(time_steps, 60, 1), # feature_1_second
"g5": , shape=(time_steps, 120, 1) # feature_0.5_seconds
} shape is (T, K^g, D), please make sure REMEMBER the true time period of each granularity !!!
- `label`: the return label, shape=(1, )
- `weight`: the weight, shape=(1, )
"""
# ---- Compute the index pair [date_idx, minute_idx] to locate data ---- #
date_idx = idx // self.total_minute_num # get the date index to locate the date of data
minute_idx = idx % self.total_minute_num # get the minute index to locate the minute of daily data
second_30_idx = (minute_idx + 1) * 2 - 1 # get the 30 seconds index
second_10_idx = (minute_idx + 1) * 6 - 1 # get the 10 seconds index
second_1_idx = (minute_idx + 1) * 60 - 1 # get the 1-second index
second_05_idx = (minute_idx + 1) * 120 - 1 # get the 0.5 seconds index
# ---- Get the multi-granularity features, label and weight ---- #
# feature dict, each item is a list of ndarray with shape=(time_steps, feature_shape)
mg_features_dict = {"g1": None, "g2": None, "g3": None, "g4": None, "g5": None}
# meaningless data, features are made to all zeros, erasing the front and tail data
if minute_idx < self.T - 1 or minute_idx >= self.total_minute_num - 1:
# set features, all zeros, shape is different from granularity to granularity
mg_features_dict["g1"] = np.zeros((self.T, 1, 20)) # 1_minute granularity
mg_features_dict["g2"] = np.zeros((self.T, 2, 20)) # 30_seconds granularity
mg_features_dict["g3"] = np.zeros((self.T, 6, 20)) # 10_seconds granularity
mg_features_dict["g4"] = np.zeros((self.T, 60, 20)) # 1_second granularity
mg_features_dict["g5"] = np.zeros((self.T, 120, 20)) # 0.5_seconds granularity
# `label = 0.0` for loss computation, shape=(1)
label = np.zeros(1)
# `weight = 0.0` means data is meaningless, shape=(1)
weight = np.zeros(1)
# meaningful data, load the true feature and label
else:
# load features, shape is based on granularity, (T, K^g, D)
mg_features_dict["g1"] = self.mg_features_list_dict[
"feature_1_minute"][date_idx][minute_idx - self.T + 1:minute_idx + 1].reshape(self.T, 1, 20)
mg_features_dict["g2"] = self.mg_features_list_dict[
"feature_30_seconds"][date_idx][second_30_idx - self.T * 2 + 1:second_30_idx + 1].reshape(self.T, 2, 20)
mg_features_dict["g3"] = self.mg_features_list_dict[
"feature_10_seconds"][date_idx][second_10_idx - self.T * 6 + 1:second_10_idx + 1].reshape(self.T, 6, 20)
mg_features_dict["g4"] = self.mg_features_list_dict[
"feature_1_second"][date_idx][second_1_idx - self.T * 60 + 1:second_1_idx + 1].reshape(self.T, 60, 20)
mg_features_dict["g5"] = self.mg_features_list_dict[
"feature_0.5_seconds"][date_idx][second_05_idx - self.T * 120 + 1:second_05_idx + 1].reshape(self.T, 120, 20)
# get the label, shape=(1, )
label = self.label_list[date_idx][minute_idx].reshape(1)
# set `the weight = 1`, shape=(1, )
weight = np.ones(1)
# ---- Do the Z-Score Normalization ---- #
if self.need_norm:
for g in ["g1", "g2", "g3", "g4", "g5"]:
k_g = mg_features_dict[g].shape[1]
mg_feature = mg_features_dict[g].reshape(self.T, k_g, 5, 2, 2)
mean = mg_feature.mean(axis=(0, 1, 2, 3), keepdims=True) # compute the mean, shape=(1, 1, 1, 1, 2)
std = mg_feature.std(axis=(0, 1, 2, 3), keepdims=True) # compute the std, shape=(1, 1, 1, 1, 2)
normed_mg_feature = (mg_feature - mean) / (std + 1e-5) # Z-Score norm
mg_features_dict[g] = normed_mg_feature.reshape(self.T, k_g, 20) # reshape back
# ---- Construct item data ---- #
item_data = {
"mg_features": mg_features_dict,
"label": label,
"weight": weight
}
return item_data
if __name__ == "__main__": # a demo using LOBDataset
LOB_DATASET_PATH = "../../Data/Future_LOB_dataset/IF_M0"
data_set = LOBDataset(LOB_DATASET_PATH, start_date="20220901", end_date="20221031", time_steps=2, need_norm=False)
for i in range(1, len(data_set) - 1):
item_data = data_set[i]
g1_data = item_data["mg_features"]["g1"]
g2_data = item_data["mg_features"]["g2"]
g3_data = item_data["mg_features"]["g3"]
g4_data = item_data["mg_features"]["g4"]
g5_data = item_data["mg_features"]["g5"]
assert ((g1_data[:, :, 0].max(axis=1) - g5_data[:, :, 0].max(axis=1)) < 1e-3).all(), f"g1 error !! bid 1 price not max !!"
assert ((g1_data[:, :, 2].min(axis=1) - g5_data[:, :, 2].min(axis=1)) < 1e-3).all(), f"g1 error !! ask 1 price not min !!"
assert ((g2_data[:, :, 0].max(axis=1) - g5_data[:, :, 0].max(axis=1)) < 1e-3).all(), f"g2 error !! bid 1 price not max !!"
assert ((g2_data[:, :, 2].min(axis=1) - g5_data[:, :, 2].min(axis=1)) < 1e-3).all(), f"g2 error !! ask 1 price not min !!"
assert ((g3_data[:, :, 0].max(axis=1) - g5_data[:, :, 0].max(axis=1)) < 1e-3).all(), f"g3 error !! bid 1 price not max !!"
assert ((g3_data[:, :, 2].min(axis=1) - g5_data[:, :, 2].min(axis=1)) < 1e-3).all(), f"g3 error !! ask 1 price not min !!"
assert ((g4_data[:, :, 0].max(axis=1) - g5_data[:, :, 0].max(axis=1)) < 1e-3).all(), f"g4 error !! bid 1 price not max !!"
assert ((g4_data[:, :, 2].min(axis=1) - g5_data[:, :, 2].min(axis=1)) < 1e-3).all(), f"g4 error !! ask 1 price not min !!"
# print(g1_data, g2_data, g3_data, g4_data, g5_data)
# print(item_data["label"])
break
data_set = LOBDataset(LOB_DATASET_PATH, start_date="20220901", end_date="20221031", time_steps=2, need_norm=True)
for i in range(1, len(data_set) - 1):
item_data = data_set[i]
g1_data = item_data["mg_features"]["g1"]
g2_data = item_data["mg_features"]["g2"]
g3_data = item_data["mg_features"]["g3"]
g4_data = item_data["mg_features"]["g4"]
g5_data = item_data["mg_features"]["g5"]
print(g1_data.shape, g2_data.shape, g3_data.shape, g4_data.shape, g5_data.shape)
print(item_data["label"])