diff --git a/pypots/classification/raindrop.py b/pypots/classification/raindrop.py index 29fdc9e4..3424a822 100644 --- a/pypots/classification/raindrop.py +++ b/pypots/classification/raindrop.py @@ -43,7 +43,9 @@ class PositionalEncodingTF(nn.Module): def __init__(self, d_pe, max_len=500): super().__init__() - assert d_pe % 2 == 0, 'd_pe should be even, otherwise the output dims will be not equal to d_pe' + assert ( + d_pe % 2 == 0 + ), "d_pe should be even, otherwise the output dims will be not equal to d_pe" self.max_len = max_len self._num_timescales = d_pe // 2 @@ -64,7 +66,9 @@ def forward(self, time_vectors): times = time_vectors.unsqueeze(2) scaled_time = times / torch.Tensor(timescales[None, None, :]) - pe = torch.cat([torch.sin(scaled_time), torch.cos(scaled_time)], axis=-1) # T x B x d_model + pe = torch.cat( + [torch.sin(scaled_time), torch.cos(scaled_time)], axis=-1 + ) # T x B x d_model pe = pe.type(torch.FloatTensor) return pe @@ -72,11 +76,22 @@ def forward(self, time_vectors): class ObservationPropagation(MessagePassing): _alpha: OptTensor - def __init__(self, in_channels: Union[int, Tuple[int, int]], out_channels: int, - n_nodes: int, ob_dim: int, heads: int = 1, concat: bool = True, - beta: bool = False, dropout: float = 0., edge_dim: Optional[int] = None, - bias: bool = True, root_weight: bool = True, **kwargs): - kwargs.setdefault('aggr', 'add') + def __init__( + self, + in_channels: Union[int, Tuple[int, int]], + out_channels: int, + n_nodes: int, + ob_dim: int, + heads: int = 1, + concat: bool = True, + beta: bool = False, + dropout: float = 0.0, + edge_dim: Optional[int] = None, + bias: bool = True, + root_weight: bool = True, + **kwargs + ): + kwargs.setdefault("aggr", "add") super().__init__(node_dim=0, **kwargs) self.in_channels = in_channels @@ -97,21 +112,20 @@ def __init__(self, in_channels: Union[int, Tuple[int, int]], out_channels: int, if edge_dim is not None: self.lin_edge = Linear(edge_dim, heads * out_channels, bias=False) else: - self.lin_edge = self.register_parameter('lin_edge', None) + self.lin_edge = self.register_parameter("lin_edge", None) if concat: - self.lin_skip = Linear(in_channels[1], heads * out_channels, - bias=bias) + self.lin_skip = Linear(in_channels[1], heads * out_channels, bias=bias) if self.beta: self.lin_beta = Linear(3 * heads * out_channels, 1, bias=False) else: - self.lin_beta = self.register_parameter('lin_beta', None) + self.lin_beta = self.register_parameter("lin_beta", None) else: self.lin_skip = Linear(in_channels[1], out_channels, bias=bias) if self.beta: self.lin_beta = Linear(3 * out_channels, 1, bias=False) else: - self.lin_beta = self.register_parameter('lin_beta', None) + self.lin_beta = self.register_parameter("lin_beta", None) self.weight = Parameter(torch.Tensor(in_channels[1], heads * out_channels)) self.bias = Parameter(torch.Tensor(heads * out_channels)) @@ -145,8 +159,16 @@ def reset_parameters(self): glorot(self.map_weights) self.increase_dim.reset_parameters() - def forward(self, x: Union[Tensor, PairTensor], p_t: Tensor, edge_index: Adj, edge_weights=None, use_beta=False, - edge_attr: OptTensor = None, return_attention_weights=None): + def forward( + self, + x: Union[Tensor, PairTensor], + p_t: Tensor, + edge_index: Adj, + edge_weights=None, + use_beta=False, + edge_attr: OptTensor = None, + return_attention_weights=None, + ): r""" Args: @@ -165,7 +187,9 @@ def forward(self, x: Union[Tensor, PairTensor], p_t: Tensor, edge_index: Adj, ed if isinstance(x, Tensor): x: PairTensor = (x, x) - out = self.propagate(edge_index, x=x, edge_weights=edge_weights, edge_attr=edge_attr, size=None) + out = self.propagate( + edge_index, x=x, edge_weights=edge_weights, edge_attr=edge_attr, size=None + ) alpha = self._alpha self._alpha = None @@ -181,13 +205,20 @@ def forward(self, x: Union[Tensor, PairTensor], p_t: Tensor, edge_index: Adj, ed if isinstance(edge_index, Tensor): return out, (edge_index, alpha) elif isinstance(edge_index, SparseTensor): - return out, edge_index.set_value(alpha, layout='coo') + return out, edge_index.set_value(alpha, layout="coo") else: return out - def message_selfattention(self, x_i: Tensor, x_j: Tensor, edge_weights: Tensor, edge_attr: OptTensor, - index: Tensor, ptr: OptTensor, - size_i: Optional[int]) -> Tensor: + def message_selfattention( + self, + x_i: Tensor, + x_j: Tensor, + edge_weights: Tensor, + edge_attr: OptTensor, + index: Tensor, + ptr: OptTensor, + size_i: Optional[int], + ) -> Tensor: query = self.lin_query(x_i).view(-1, self.heads, self.out_channels) key = self.lin_key(x_j).view(-1, self.heads, self.out_channels) @@ -208,9 +239,16 @@ def message_selfattention(self, x_i: Tensor, x_j: Tensor, edge_weights: Tensor, out *= alpha.view(-1, self.heads, 1) return out - def message(self, x_i: Tensor, x_j: Tensor, edge_weights: Tensor, edge_attr: OptTensor, - index: Tensor, ptr: OptTensor, - size_i: Optional[int]) -> Tensor: + def message( + self, + x_i: Tensor, + x_j: Tensor, + edge_weights: Tensor, + edge_attr: OptTensor, + index: Tensor, + ptr: OptTensor, + size_i: Optional[int], + ) -> Tensor: use_beta = self.use_beta if use_beta: n_step = self.p_t.shape[0] @@ -221,7 +259,7 @@ def message(self, x_i: Tensor, x_j: Tensor, edge_weights: Tensor, edge_attr: Opt p_emb = self.p_t.unsqueeze(0) - aa = torch.cat([w_v.repeat(1, n_step, 1, ), p_emb.repeat(n_edges, 1, 1)], dim=-1) + aa = torch.cat([w_v.repeat(1, n_step, 1,), p_emb.repeat(n_edges, 1, 1)], dim=-1) beta = torch.mean(h_W * aa, dim=-1) if edge_weights is not None: @@ -264,9 +302,13 @@ def message(self, x_i: Tensor, x_j: Tensor, edge_weights: Tensor, edge_attr: Opt out = out * gamma.view(-1, self.heads, 1) return out - def aggregate(self, inputs: Tensor, index: Tensor, - ptr: Optional[Tensor] = None, - dim_size: Optional[int] = None) -> Tensor: + def aggregate( + self, + inputs: Tensor, + index: Tensor, + ptr: Optional[Tensor] = None, + dim_size: Optional[int] = None, + ) -> Tensor: r"""Aggregates messages from neighbors as :math:`\square_{j \in \mathcal{N}(i)}`. @@ -278,19 +320,31 @@ def aggregate(self, inputs: Tensor, index: Tensor, :meth:`__init__` by the :obj:`aggr` argument. """ index = self.index - return scatter(inputs, index, dim=self.node_dim, dim_size=dim_size, - reduce=self.aggr) + return scatter(inputs, index, dim=self.node_dim, dim_size=dim_size, reduce=self.aggr) def __repr__(self): - return '{}({}, {}, heads={})'.format(self.__class__.__name__, - self.in_channels, - self.out_channels, - self.heads) + return "{}({}, {}, heads={})".format( + self.__class__.__name__, self.in_channels, self.out_channels, self.heads + ) class _Raindrop(nn.Module): - def __init__(self, n_layers, n_features, d_model, d_inner, n_heads, n_classes, dropout=0.3, max_len=215, d_static=9, - aggregation='mean', sensor_wise_mask=False, static=False, device=None): + def __init__( + self, + n_layers, + n_features, + d_model, + d_inner, + n_heads, + n_classes, + dropout=0.3, + max_len=215, + d_static=9, + aggregation="mean", + sensor_wise_mask=False, + static=False, + device=None, + ): super().__init__() self.n_layers = n_layers self.n_features = n_features @@ -310,18 +364,20 @@ def __init__(self, n_layers, n_features, d_model, d_inner, n_heads, n_classes, d self.global_structure = torch.ones(n_features, n_features, device=self.device) if self.static: self.emb = nn.Linear(d_static, n_features) - assert d_model % n_features == 0, 'd_model must be divisible by n_features' + assert d_model % n_features == 0, "d_model must be divisible by n_features" self.d_ob = int(d_model / n_features) self.encoder = nn.Linear(n_features * self.d_ob, n_features * self.d_ob) d_pe = 16 self.pos_encoder = PositionalEncodingTF(d_pe, max_len) if self.sensor_wise_mask: dim_check = n_features * (self.d_ob + d_pe) - assert dim_check % n_heads == 0, 'dim_check must be divisible by n_heads' - encoder_layers = TransformerEncoderLayer(n_features * (self.d_ob + d_pe), n_heads, d_inner, dropout) + assert dim_check % n_heads == 0, "dim_check must be divisible by n_heads" + encoder_layers = TransformerEncoderLayer( + n_features * (self.d_ob + d_pe), n_heads, d_inner, dropout + ) else: dim_check = d_model + d_pe - assert dim_check % n_heads == 0, 'dim_check must be divisible by n_heads' + assert dim_check % n_heads == 0, "dim_check must be divisible by n_heads" encoder_layers = TransformerEncoderLayer(d_model + d_pe, n_heads, d_inner, dropout) self.transformer_encoder = TransformerEncoder(encoder_layers, n_layers) @@ -329,20 +385,27 @@ def __init__(self, n_layers, n_features, d_model, d_inner, n_heads, n_classes, d self.R_u = Parameter(torch.Tensor(1, self.n_features * self.d_ob)) - self.ob_propagation = ObservationPropagation(in_channels=max_len * self.d_ob, out_channels=max_len * self.d_ob, - heads=1, n_nodes=n_features, ob_dim=self.d_ob) - self.ob_propagation_layer2 = ObservationPropagation(in_channels=max_len * self.d_ob, - out_channels=max_len * self.d_ob, heads=1, - n_nodes=n_features, ob_dim=self.d_ob) + self.ob_propagation = ObservationPropagation( + in_channels=max_len * self.d_ob, + out_channels=max_len * self.d_ob, + heads=1, + n_nodes=n_features, + ob_dim=self.d_ob, + ) + self.ob_propagation_layer2 = ObservationPropagation( + in_channels=max_len * self.d_ob, + out_channels=max_len * self.d_ob, + heads=1, + n_nodes=n_features, + ob_dim=self.d_ob, + ) if static: d_final = d_model + d_pe + n_features else: d_final = d_model + d_pe self.mlp_static = nn.Sequential( - nn.Linear(d_final, d_final), - nn.ReLU(), - nn.Linear(d_final, n_classes), + nn.Linear(d_final, d_final), nn.ReLU(), nn.Linear(d_final, n_classes), ) self.dropout = nn.Dropout(dropout) @@ -376,29 +439,29 @@ def classify(self, inputs): Number of nonzero recordings. missing_mask : array, shape of [n_steps, n_samples, n_features] """ - src = inputs['X'] - static = inputs['static'] - times = inputs['timestamps'] - lengths = inputs['lengths'] - missing_mask = inputs['missing_mask'] + src = inputs["X"] + static = inputs["static"] + times = inputs["timestamps"] + lengths = inputs["lengths"] + missing_mask = inputs["missing_mask"] max_len, batch_size = src.shape[0], src.shape[1] src = torch.repeat_interleave(src, self.d_ob, dim=-1) h = F.relu(src * self.R_u) - pe = self.pos_encoder(times) + pe = self.pos_encoder(times).to(self.device) if static is not None: emb = self.emb(static) h = self.dropout(h) mask = torch.arange(max_len)[None, :] >= (lengths.cpu()[:, None]) - mask = mask.squeeze(1) + mask = mask.squeeze(1).to(self.device) x = h adj = self.global_structure - adj[torch.eye(self.n_features).byte()] = 1 + adj[torch.eye(self.n_features, dtype=torch.bool)] = 1 edge_index = torch.nonzero(adj).T edge_weights = adj[edge_index[0], edge_index[1]] @@ -417,18 +480,28 @@ def classify(self, inputs): step_data = step_data.reshape([n_step, self.n_features, self.d_ob]).permute(1, 0, 2) step_data = step_data.reshape(self.n_features, n_step * self.d_ob) - step_data, attention_weights = self.ob_propagation(step_data, p_t=p_t, edge_index=edge_index, - edge_weights=edge_weights, - use_beta=False, edge_attr=None, - return_attention_weights=True) + step_data, attention_weights = self.ob_propagation( + step_data, + p_t=p_t, + edge_index=edge_index, + edge_weights=edge_weights, + use_beta=False, + edge_attr=None, + return_attention_weights=True, + ) edge_index_layer2 = attention_weights[0] edge_weights_layer2 = attention_weights[1].squeeze(-1) - step_data, attention_weights = self.ob_propagation_layer2(step_data, p_t=p_t, edge_index=edge_index_layer2, - edge_weights=edge_weights_layer2, - use_beta=False, edge_attr=None, - return_attention_weights=True) + step_data, attention_weights = self.ob_propagation_layer2( + step_data, + p_t=p_t, + edge_index=edge_index_layer2, + edge_weights=edge_weights_layer2, + use_beta=False, + edge_attr=None, + return_attention_weights=True, + ) step_data = step_data.view([self.n_features, n_step, self.d_ob]) step_data = step_data.permute([1, 0, 2]) # [n_step, n_features, d_ob] @@ -452,7 +525,7 @@ def classify(self, inputs): sensor_wise_mask = self.sensor_wise_mask - lengths2 = lengths.unsqueeze(1) + lengths2 = lengths.unsqueeze(1).to(self.device) mask2 = mask.permute(1, 0).unsqueeze(2).long() if sensor_wise_mask: output = torch.zeros([batch_size, self.n_features, self.d_ob + 16], device=self.device) @@ -461,10 +534,12 @@ def classify(self, inputs): r_out = r_out.view(-1, batch_size, self.n_features, (self.d_ob + 16)) out = r_out[:, :, se, :] l_ = torch.sum(extended_missing_mask[:, :, se], dim=0).unsqueeze(1) # length - out_sensor = torch.sum(out * (1 - extended_missing_mask[:, :, se].unsqueeze(-1)), dim=0) / (l_ + 1) + out_sensor = torch.sum( + out * (1 - extended_missing_mask[:, :, se].unsqueeze(-1)), dim=0 + ) / (l_ + 1) output[:, se, :] = out_sensor output = output.view([-1, self.n_features * (self.d_ob + 16)]) - elif self.aggregation == 'mean': + elif self.aggregation == "mean": output = torch.sum(r_out * (1 - mask2), dim=0) / (lengths2 + 1) else: raise RuntimeError @@ -479,11 +554,11 @@ def classify(self, inputs): def forward(self, inputs): prediction = self.classify(inputs) - classification_loss = F.nll_loss(torch.log(prediction), inputs['label']) + classification_loss = F.nll_loss(torch.log(prediction), inputs["label"]) results = { - 'prediction': prediction, - 'loss': classification_loss + "prediction": prediction, + "loss": classification_loss # 'distance': distance, } @@ -510,32 +585,48 @@ class Raindrop(BaseNNClassifier): Run the model on which device. """ - def __init__(self, - n_features, - n_layers, - d_model, - d_inner, - n_heads, - n_classes, - dropout, - max_len, - d_static, - aggregation, - sensor_wise_mask, - static, - learning_rate=1e-3, - epochs=100, - patience=10, - batch_size=32, - weight_decay=1e-5, - device=None): - super().__init__(n_classes, learning_rate, epochs, patience, batch_size, - weight_decay, device) + def __init__( + self, + n_features, + n_layers, + d_model, + d_inner, + n_heads, + n_classes, + dropout, + max_len, + d_static, + aggregation, + sensor_wise_mask, + static, + learning_rate=1e-3, + epochs=100, + patience=10, + batch_size=32, + weight_decay=1e-5, + device=None, + ): + super().__init__( + n_classes, learning_rate, epochs, patience, batch_size, weight_decay, device + ) self.n_features = n_features self.n_steps = max_len - self.model = _Raindrop(n_layers, n_features, d_model, d_inner, n_heads, n_classes, dropout, max_len, d_static, - aggregation, sensor_wise_mask, static=static, device=self.device) + self.model = _Raindrop( + n_layers, + n_features, + d_model, + d_inner, + n_heads, + n_classes, + dropout, + max_len, + d_static, + aggregation, + sensor_wise_mask, + static=static, + device=self.device, + ) self.model = self.model.to(self.device) self._print_model_size() @@ -596,12 +687,12 @@ def assemble_input_data(self, data): times = times.permute(1, 0) inputs = { - 'X': X, - 'static': None, - 'timestamps': times, - 'lengths': lengths, - 'missing_mask': missing_mask, - 'label': label + "X": X, + "static": None, + "timestamps": times, + "lengths": lengths, + "missing_mask": missing_mask, + "label": label, } return inputs @@ -627,11 +718,11 @@ def classify(self, X): times = times.permute(1, 0) inputs = { - 'X': X, - 'static': None, - 'timestamps': times, - 'lengths': lengths, - 'missing_mask': missing_mask, + "X": X, + "static": None, + "timestamps": times, + "lengths": lengths, + "missing_mask": missing_mask, } prediction = self.model.classify(inputs) diff --git a/pypots/imputation/base.py b/pypots/imputation/base.py index c3aee779..a7c2a626 100644 --- a/pypots/imputation/base.py +++ b/pypots/imputation/base.py @@ -70,13 +70,15 @@ def __init__(self, learning_rate, epochs, patience, batch_size, weight_decay, de def assemble_input_data(self, data): pass - def _train_model(self, training_loader, val_loader=None, val_X_intact=None, val_indicating_mask=None): - self.optimizer = torch.optim.Adam(self.model.parameters(), - lr=self.lr, - weight_decay=self.weight_decay) + def _train_model( + self, training_loader, val_loader=None, val_X_intact=None, val_indicating_mask=None + ): + self.optimizer = torch.optim.Adam( + self.model.parameters(), lr=self.lr, weight_decay=self.weight_decay + ) # each training starts from the very beginning, so reset the loss and model dict here - self.best_loss = float('inf') + self.best_loss = float("inf") self.best_model_dict = None try: @@ -87,12 +89,14 @@ def _train_model(self, training_loader, val_loader=None, val_X_intact=None, val_ inputs = self.assemble_input_data(data) self.optimizer.zero_grad() results = self.model.forward(inputs) - results['loss'].backward() + results["loss"].backward() self.optimizer.step() - epoch_train_loss_collector.append(results['loss'].item()) + epoch_train_loss_collector.append(results["loss"].item()) - mean_train_loss = np.mean(epoch_train_loss_collector) # mean training loss of the current epoch - self.logger['training_loss'].append(mean_train_loss) + mean_train_loss = np.mean( + epoch_train_loss_collector + ) # mean training loss of the current epoch + self.logger["training_loss"].append(mean_train_loss) if val_loader is not None: self.model.eval() @@ -101,17 +105,21 @@ def _train_model(self, training_loader, val_loader=None, val_X_intact=None, val_ for idx, data in enumerate(val_loader): inputs = self.assemble_input_data(data) results = self.model.forward(inputs) - imputation_collector.append(results['imputed_data']) + imputation_collector.append(results["imputed_data"]) imputation_collector = torch.cat(imputation_collector) imputation_collector = imputation_collector - mean_val_loss = cal_mae(imputation_collector, val_X_intact, val_indicating_mask) - self.logger['validating_loss'].append(mean_val_loss) - print(f'epoch {epoch}: training loss {mean_train_loss:.4f}, validating loss {mean_val_loss:.4f}') + mean_val_loss = cal_mae( + imputation_collector, val_X_intact, val_indicating_mask + ) + self.logger["validating_loss"].append(mean_val_loss) + print( + f"epoch {epoch}: training loss {mean_train_loss:.4f}, validating loss {mean_val_loss:.4f}" + ) mean_loss = mean_val_loss else: - print(f'epoch {epoch}: training loss {mean_train_loss:.4f}') + print(f"epoch {epoch}: training loss {mean_train_loss:.4f}") mean_loss = mean_train_loss if mean_loss < self.best_loss: @@ -121,25 +129,29 @@ def _train_model(self, training_loader, val_loader=None, val_X_intact=None, val_ else: self.patience -= 1 - if os.getenv('enable_nni', False): + if os.getenv("enable_nni", False): nni.report_intermediate_result(mean_loss) if epoch == self.epochs - 1 or self.patience == 0: nni.report_final_result(self.best_loss) if self.patience == 0: - print('Exceeded the training patience. Terminating the training procedure...') + print("Exceeded the training patience. Terminating the training procedure...") break except Exception as e: - print(f'Exception: {e}') + print(f"Exception: {e}") if self.best_model_dict is None: - raise RuntimeError('Training got interrupted. Model was not get trained. Please try fit() again.') + raise RuntimeError( + "Training got interrupted. Model was not get trained. Please try fit() again." + ) else: - RuntimeWarning('Training got interrupted. ' - 'Model will load the best parameters so far for testing. ' - "If you don't want it, please try fit() again.") + RuntimeWarning( + "Training got interrupted. " + "Model will load the best parameters so far for testing. " + "If you don't want it, please try fit() again." + ) - if np.equal(self.best_loss, float('inf')): - raise ValueError('Something is wrong. best_loss is Nan after training.') + if np.equal(self.best_loss.item(), float("inf")): + raise ValueError("Something is wrong. best_loss is Nan after training.") - print('Finished training.') + print("Finished training.") diff --git a/pypots/imputation/locf.py b/pypots/imputation/locf.py index fbc1073b..322e91af 100644 --- a/pypots/imputation/locf.py +++ b/pypots/imputation/locf.py @@ -23,13 +23,13 @@ class LOCF(BaseImputer): """ def __init__(self, nan=0): - super().__init__('cpu') + super().__init__("cpu") self.nan = nan def fit(self, train_X, val_X=None): warnings.warn( - 'LOCF (Last Observed Carried Forward) imputation class has no parameter to train. ' - 'Please run func impute(X) directly.' + "LOCF (Last Observed Carried Forward) imputation class has no parameter to train. " + "Please run func impute(X) directly." ) def locf_numpy(self, X): @@ -86,7 +86,7 @@ def locf_torch(self, X): trans_X = X.permute((0, 2, 1)) mask = torch.isnan(trans_X) n_samples, n_steps, n_features = mask.shape - idx = torch.where(~mask, torch.arange(n_features), 0) + idx = torch.where(~mask, torch.arange(n_features, device=mask.device), 0) idx = torch.cummax(idx, dim=2) collector = [] @@ -116,8 +116,10 @@ def impute(self, X): array-like, Imputed time series. """ - assert len(X.shape) == 3, f'Input X should have 3 dimensions [n_samples, n_steps, n_features], ' \ - f'but the actual shape of X: {X.shape}' + assert len(X.shape) == 3, ( + f"Input X should have 3 dimensions [n_samples, n_steps, n_features], " + f"but the actual shape of X: {X.shape}" + ) if isinstance(X, list): X = np.asarray(X) @@ -126,6 +128,7 @@ def impute(self, X): elif isinstance(X, torch.Tensor): X_imputed = self.locf_torch(X).detach().cpu().numpy() else: - raise TypeError('X must be type of list/np.ndarray/torch.Tensor, ' - f'but got {type(X)}') + raise TypeError( + "X must be type of list/np.ndarray/torch.Tensor, " f"but got {type(X)}" + ) return X_imputed