-
Notifications
You must be signed in to change notification settings - Fork 0
/
pma.py
175 lines (144 loc) · 7.08 KB
/
pma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from scipy.stats import multivariate_normal
from scipy.integrate import quad
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
import statsmodels.api as sm
class DilatedCausalConv1D(layers.Layer):
def __init__(self, filters, kernel_size, dilation_rate):
super(DilatedCausalConv1D, self).__init__()
self.conv = layers.Conv1D(filters=filters,
kernel_size=kernel_size,
padding='causal',
dilation_rate=dilation_rate)
def call(self, inputs):
return self.conv(inputs)
class MTCNLayer(layers.Layer):
def __init__(self, filters, kernel_size, dilation_rate, dropout_rate):
super(MTCNLayer, self).__init__()
self.conv = DilatedCausalConv1D(filters, kernel_size, dilation_rate)
self.norm = layers.LayerNormalization()
self.dropout = layers.Dropout(dropout_rate)
self.activation = layers.Activation('relu')
def call(self, inputs):
x = self.conv(inputs)
x = self.norm(x)
x = self.activation(x)
return self.dropout(x)
class MTCN(Model):
def __init__(self, num_layers, kernel_size, filters, dropout_rate, output_dim):
super(MTCN, self).__init__()
self.layers_list = [MTCNLayer(filters, kernel_size, 2**i, dropout_rate)
for i in range(num_layers)]
self.final_conv = layers.Conv1D(filters=output_dim, kernel_size=1)
def call(self, inputs):
x = inputs
for layer in self.layers_list:
x = layer(x)
return self.final_conv(x)
class PredictiveModelingAlgorithm:
def __init__(self, input_dim, output_dim, num_layers=8, kernel_size=3, num_filters=64, dropout_rate=0.2, num_ensemble=5):
self.input_dim = input_dim
self.output_dim = output_dim
self.num_ensemble = num_ensemble
self.models = [MTCN(num_layers, kernel_size, num_filters, dropout_rate, output_dim)
for _ in range(num_ensemble)]
self.compile_models()
def compile_models(self):
for model in self.models:
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
def train(self, X, y, epochs=100, batch_size=32, validation_split=0.2):
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=validation_split)
histories = []
for model in self.models:
history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size,
validation_data=(X_val, y_val), verbose=0)
histories.append(history)
return histories
def predict(self, X):
predictions = [model.predict(X) for model in self.models]
return np.mean(predictions, axis=0), np.std(predictions, axis=0)
def generate_scenarios(self, initial_state, num_scenarios, time_steps):
scenarios = []
for _ in range(num_scenarios):
scenario = [initial_state]
for _ in range(time_steps - 1):
next_state_mean, next_state_std = self.predict(np.array([scenario[-1]]))
next_state = np.random.normal(next_state_mean[0, -1, :], next_state_std[0, -1, :])
scenario.append(next_state)
scenarios.append(np.array(scenario))
return np.array(scenarios)
def calculate_posterior(self, prior, likelihood, data):
prior_mean, prior_cov = prior
likelihood_mean, likelihood_cov = likelihood
posterior_cov = np.linalg.inv(np.linalg.inv(prior_cov) + np.linalg.inv(likelihood_cov))
posterior_mean = posterior_cov @ (np.linalg.inv(prior_cov) @ prior_mean + np.linalg.inv(likelihood_cov) @ data)
return posterior_mean, posterior_cov
def integrate_posterior(self, posterior, func):
posterior_mean, posterior_cov = posterior
def integrand(x):
return func(x) * multivariate_normal.pdf(x, mean=posterior_mean, cov=posterior_cov)
# Monte Carlo integration
num_samples = 10000
samples = multivariate_normal.rvs(mean=posterior_mean, cov=posterior_cov, size=num_samples)
return np.mean([func(s) for s in samples])
def handle_black_swan(self, X, threshold=3):
# Detect and handle potential black swan events using extreme value theory
extreme_events = []
for i in range(X.shape[1]):
series = X[:, i]
z_scores = (series - np.mean(series)) / np.std(series)
extreme_events.append(np.where(np.abs(z_scores) > threshold)[0])
# Adjust predictions for extreme events
adjusted_X = X.copy()
for i, events in enumerate(extreme_events):
if len(events) > 0:
adjusted_X[:, i] = sm.robust.robust_linear_model.RLM(
sm.add_constant(np.arange(len(X))), X[:, i]
).fit().predict(sm.add_constant(np.arange(len(X))))
return adjusted_X, extreme_events
def evaluate(self, X_test, y_test):
y_pred_mean, y_pred_std = self.predict(X_test)
mse = mean_squared_error(y_test, y_pred_mean)
mae = mean_absolute_error(y_test, y_pred_mean)
# Calculate CRPS (Continuous Ranked Probability Score)
def crps_gaussian(y_true, mu, sig):
z = (y_true - mu) / sig
return sig * (z * (2 * norm.cdf(z) - 1) + 2 * norm.pdf(z) - 1 / np.sqrt(np.pi))
crps = np.mean([crps_gaussian(y_test[i], y_pred_mean[i], y_pred_std[i])
for i in range(len(y_test))])
return {
'MSE': mse,
'MAE': mae,
'CRPS': crps
}
def forecast_with_uncertainty(self, X, num_samples=1000):
predictions = []
for _ in range(num_samples):
model = np.random.choice(self.models)
predictions.append(model.predict(X))
mean_prediction = np.mean(predictions, axis=0)
lower_bound = np.percentile(predictions, 2.5, axis=0)
upper_bound = np.percentile(predictions, 97.5, axis=0)
return mean_prediction, lower_bound, upper_bound
def analyze_feature_importance(self, X, y):
feature_importance = []
for model in self.models:
# Use integrated gradients for feature importance
ig = IntegratedGradients(model)
attributions = ig.attribute(X, target=y, n_steps=50)
feature_importance.append(np.mean(np.abs(attributions), axis=0))
return np.mean(feature_importance, axis=0)
# Additional utility functions
def IntegratedGradients(model):
# Placeholder for the IntegratedGradients method
# In a real implementation, this would be imported from a library like captum
class IG:
def __init__(self, model):
self.model = model
def attribute(self, inputs, target, n_steps):
# Placeholder implementation
return np.random.rand(*inputs.shape)
return IG(model)