diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9599e96..f56a633 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -29,7 +29,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ['3.8', '3.9', '3.10', '3.11'] + python-version: ['3.8'] os: [ubuntu-20.04] steps: - uses: actions/checkout@v1 diff --git a/sigpro/basic_primitives.py b/sigpro/basic_primitives.py new file mode 100644 index 0000000..e84ddd0 --- /dev/null +++ b/sigpro/basic_primitives.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- +"""Reference class implementations of existing primitives.""" +from sigpro import contributing, primitive + +# Transformations + + +class Identity(primitive.AmplitudeTransformation): + """Identity primitive class.""" + + def __init__(self): + super().__init__('sigpro.transformations.amplitude.identity.identity') + + +class PowerSpectrum(primitive.AmplitudeTransformation): + """PowerSpectrum primitive class.""" + + def __init__(self): + super().__init__('sigpro.transformations.amplitude.spectrum.power_spectrum') + primitive_spec = contributing._get_primitive_spec('transformation', 'frequency') + self.set_primitive_inputs(primitive_spec['args']) + self.set_primitive_outputs(primitive_spec['output']) + + +class FFT(primitive.FrequencyTransformation): + """FFT primitive class.""" + + def __init__(self): + super().__init__("sigpro.transformations.frequency.fft.fft") + + +class FFTReal(primitive.FrequencyTransformation): + """FFTReal primitive class.""" + + def __init__(self): + super().__init__("sigpro.transformations.frequency.fft.fft_real") + + +class FrequencyBand(primitive.FrequencyTransformation): + """ + FrequencyBand primitive class. + + Filter between a high and low band frequency and return the amplitude values and + frequency values for those. + + Args: + low (int): Lower band frequency of filter. + high (int): Higher band frequency of filter. + """ + + def __init__(self, low, high): + super().__init__("sigpro.transformations.frequency.band.frequency_band", + init_params={'low': low, 'high': high}) + self.set_primitive_inputs([{"name": "amplitude_values", "type": "numpy.ndarray"}, + {"name": "frequency_values", "type": "numpy.ndarray"}]) + self.set_primitive_outputs([{'name': 'amplitude_values', 'type': "numpy.ndarray"}, + {'name': 'frequency_values', 'type': "numpy.ndarray"}]) + self.set_fixed_hyperparameters({'low': {'type': 'int'}, 'high': {'type': 'int'}}) + + +class STFT(primitive.FrequencyTimeTransformation): + """STFT primitive class.""" + + def __init__(self): + super().__init__('sigpro.transformations.frequency_time.stft.stft') + self.set_primitive_outputs([{"name": "amplitude_values", "type": "numpy.ndarray"}, + {"name": "frequency_values", "type": "numpy.ndarray"}, + {"name": "time_values", "type": "numpy.ndarray"}]) + + +class STFTReal(primitive.FrequencyTimeTransformation): + """STFTReal primitive class.""" + + def __init__(self): + super().__init__('sigpro.transformations.frequency_time.stft.stft_real') + self.set_primitive_outputs([{"name": "real_amplitude_values", "type": "numpy.ndarray"}, + {"name": "frequency_values", "type": "numpy.ndarray"}, + {"name": "time_values", "type": "numpy.ndarray"}]) + +# Aggregations + + +class CrestFactor(primitive.AmplitudeAggregation): + """CrestFactor primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.crest_factor') + self.set_primitive_outputs([{'name': 'crest_factor_value', 'type': "float"}]) + + +class Kurtosis(primitive.AmplitudeAggregation): + """ + Kurtosis primitive class. + + Computes the kurtosis value of the input array. If all values are equal, return + `-3` for Fisher's definition and `0` for Pearson's definition. + + Args: + fisher (bool): + If ``True``, Fisher’s definition is used (normal ==> 0.0). If ``False``, + Pearson’s definition is used (normal ==> 3.0). Defaults to ``True``. + bias (bool): + If ``False``, then the calculations are corrected for statistical bias. + Defaults to ``True``. + """ + + def __init__(self, fisher=True, bias=True): + super().__init__('sigpro.aggregations.amplitude.statistical.kurtosis', + init_params={'fisher': fisher, 'bias': bias}) + self.set_primitive_outputs([{'name': 'kurtosis_value', 'type': "float"}]) + self.set_fixed_hyperparameters({'fisher': {'type': 'bool', 'default': True}, + 'bias': {'type': 'bool', 'default': True}}) + + +class Mean(primitive.AmplitudeAggregation): + """Mean primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.mean') + self.set_primitive_outputs([{'name': 'mean_value', 'type': "float"}]) + + +class RMS(primitive.AmplitudeAggregation): + """RMS primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.rms') + self.set_primitive_outputs([{'name': 'rms_value', 'type': "float"}]) + + +class Skew(primitive.AmplitudeAggregation): + """Skew primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.skew') + self.set_primitive_outputs([{'name': 'skew_value', 'type': "float"}]) + + +class Std(primitive.AmplitudeAggregation): + """Std primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.std') + self.set_primitive_outputs([{'name': 'std_value', 'type': "float"}]) + + +class Var(primitive.AmplitudeAggregation): + """Var primitive class.""" + + def __init__(self): + super().__init__('sigpro.aggregations.amplitude.statistical.var') + self.set_primitive_outputs([{'name': 'var_value', 'type': "float"}]) + + +class BandMean(primitive.FrequencyAggregation): + """ + BandMean primitive class. + + Filters between a high and low band and compute the mean value for this specific band. + + Args: + min_frequency (int or float): + Band minimum. + max_frequency (int or float): + Band maximum. + """ + + def __init__(self, min_frequency, max_frequency): + super().__init__('sigpro.aggregations.frequency.band.band_mean', init_params={ + 'min_frequency': min_frequency, 'max_frequency': max_frequency}) + self.set_fixed_hyperparameters({'min_frequency': {'type': 'float'}, + 'max_frequency': {'type': 'float'}}) diff --git a/sigpro/contributing.py b/sigpro/contributing.py index e6d9766..a4418cc 100644 --- a/sigpro/contributing.py +++ b/sigpro/contributing.py @@ -277,18 +277,11 @@ def _write_primitive(primitive_dict, primitive_name, primitives_path, primitives return primitive_path -def make_primitive(primitive, primitive_type, primitive_subtype, - context_arguments=None, fixed_hyperparameters=None, - tunable_hyperparameters=None, primitive_outputs=None, - primitives_path='sigpro/primitives', primitives_subfolders=True): - """Create a primitive JSON. - - During the JSON creation the primitive function signature is validated to - ensure that it matches the primitive type and subtype implicitly specified - by the primitive name. - - Any additional function arguments are also validated to ensure that the - function does actually expect them. +def _make_primitive_dict(primitive, primitive_type, primitive_subtype, + context_arguments=None, fixed_hyperparameters=None, + tunable_hyperparameters=None, primitive_inputs=None, + primitive_outputs=None): + """Create a primitive dict. Args: primitive (str): @@ -308,30 +301,27 @@ def make_primitive(primitive, primitive_type, primitive_subtype, A dictionary containing as key the name of the hyperparameter and as value a dictionary containing the type and the default value and the range of values that it can take. + primitive_inputs (list or None): + A list with dictionaries containing the name and type of the input values. If + ``None`` default values for those will be used. primitive_outputs (list or None): A list with dictionaries containing the name and type of the output values. If ``None`` default values for those will be used. - primitives_path (str): - Path to the root of the primitives folder, in which the primitives JSON will be stored. - Defaults to `sigpro/primitives`. - primitives_subfolders (bool): - Whether to store the primitive JSON in a subfolder tree (``True``) or to use a flat - primitive name (``False``). Defaults to ``True``. Raises: ValueError: If the primitive specification arguments are not valid. Returns: - str: - Path of the generated JSON file. + dict: + Generated JSON file as a Python dict. """ context_arguments = context_arguments or [] fixed_hyperparameters = fixed_hyperparameters or {} tunable_hyperparameters = tunable_hyperparameters or {} primitive_spec = _get_primitive_spec(primitive_type, primitive_subtype) - primitive_inputs = primitive_spec['args'] + primitive_inputs = primitive_inputs or primitive_spec['args'] primitive_outputs = primitive_outputs or primitive_spec['output'] primitive_function = _import_object(primitive) @@ -366,6 +356,69 @@ def make_primitive(primitive, primitive_type, primitive_subtype, } } + return primitive_dict + +# pylint: disable = too-many-arguments + + +def make_primitive(primitive, primitive_type, primitive_subtype, + context_arguments=None, fixed_hyperparameters=None, + tunable_hyperparameters=None, primitive_inputs=None, + primitive_outputs=None, primitives_path='sigpro/primitives', + primitives_subfolders=True): + """Create a primitive JSON. + + During the JSON creation the primitive function signature is validated to + ensure that it matches the primitive type and subtype implicitly specified + by the primitive name. + + Any additional function arguments are also validated to ensure that the + function does actually expect them. + + Args: + primitive (str): + The name of the primitive, the python path including the name of the + module and the name of the function. + primitive_type (str): + Type of primitive. + primitive_subtype (str): + Subtype of the primitive. + context_arguments (list or None): + A list with dictionaries containing the name and type of the context arguments. + fixed_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value that it + should take. + tunable_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value and the + range of values that it can take. + primitive_inputs (list or None): + A list with dictionaries containing the name and type of the input values. If + ``None`` default values for those will be used. + primitive_outputs (list or None): + A list with dictionaries containing the name and type of the output values. If + ``None`` default values for those will be used. + primitives_path (str): + Path to the root of the primitives folder, in which the primitives JSON will be stored. + Defaults to `sigpro/primitives`. + primitives_subfolders (bool): + Whether to store the primitive JSON in a subfolder tree (``True``) or to use a flat + primitive name (``False``). Defaults to ``True``. + + Raises: + ValueError: + If the primitive specification arguments are not valid. + + Returns: + str: + Path of the generated JSON file. + """ + primitive_dict = _make_primitive_dict(primitive, primitive_type, primitive_subtype, + context_arguments, fixed_hyperparameters, + tunable_hyperparameters, primitive_inputs, + primitive_outputs) + return _write_primitive(primitive_dict, primitive, primitives_path, primitives_subfolders) diff --git a/sigpro/contributing_primitive.py b/sigpro/contributing_primitive.py new file mode 100644 index 0000000..7e2473d --- /dev/null +++ b/sigpro/contributing_primitive.py @@ -0,0 +1,147 @@ +"""Contributing primitive classes.""" +import copy + +from sigpro.contributing import make_primitive +from sigpro.primitive import ( + AmplitudeAggregation, AmplitudeTransformation, FrequencyAggregation, FrequencyTimeAggregation, + FrequencyTimeTransformation, FrequencyTransformation) + +TAXONOMY = { + 'transformation': { + 'frequency': FrequencyTransformation, + 'amplitude': AmplitudeTransformation, + 'frequency_time': FrequencyTimeTransformation, + }, 'aggregation': { + 'frequency': FrequencyAggregation, + 'amplitude': AmplitudeAggregation, + 'frequency_time': FrequencyTimeAggregation, + } +} + + +def get_primitive_class(primitive, primitive_type, primitive_subtype, + context_arguments=None, fixed_hyperparameters=None, + tunable_hyperparameters=None, primitive_inputs=None, + primitive_outputs=None): + """ + Get a dynamically generated primitive class. + + Args: + primitive (str): + The name of the primitive, the python path including the name of the + module and the name of the function. + primitive_type (str): + Type of primitive. + primitive_subtype (str): + Subtype of the primitive. + context_arguments (list or None): + A list with dictionaries containing the name and type of the context arguments. + fixed_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value that it + should take. + tunable_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value and the + range of values that it can take. + primitive_inputs (list or None): + A list with dictionaries containing the name and type of the input values. If + ``None`` default values for those will be used. + primitive_outputs (list or None): + A list with dictionaries containing the name and type of the output values. If + ``None`` default values for those will be used. + + Raises: + ValueError: + If the primitive specification arguments are not valid. + + Returns: + type: + Dynamically-generated custom Primitive type. + """ + primitive_type_class = TAXONOMY[primitive_type][primitive_subtype] + + class UserPrimitive(primitive_type_class): # pylint: disable=too-few-public-methods + """User-defined Dynamic Primitive Class.""" + + def __init__(self, **kwargs): + init_params = {} + if fixed_hyperparameters is not None: + init_params = {param: kwargs[param] for param in fixed_hyperparameters} + super().__init__(primitive, init_params=init_params) + if fixed_hyperparameters is not None: + self.set_fixed_hyperparameters(copy.deepcopy(fixed_hyperparameters)) + if tunable_hyperparameters is not None: + self.set_tunable_hyperparameters(copy.deepcopy(tunable_hyperparameters)) + if primitive_inputs is not None: + self.set_primitive_inputs(copy.deepcopy(primitive_inputs)) + if primitive_outputs is not None: + self.set_primitive_outputs(copy.deepcopy(primitive_outputs)) + if context_arguments is not None: + self.set_context_arguments(copy.deepcopy(context_arguments)) + + type_name = f'Custom_{primitive}' + + return type(type_name, (UserPrimitive, ), {}) + +# pylint: disable = too-many-arguments + + +def make_primitive_class(primitive, primitive_type, primitive_subtype, + context_arguments=None, fixed_hyperparameters=None, + tunable_hyperparameters=None, primitive_inputs=None, + primitive_outputs=None, primitives_path='sigpro/primitives', + primitives_subfolders=True): + """ + Get a dynamically generated primitive class and make the primitive JSON. + + Args: + primitive (str): + The name of the primitive, the python path including the name of the + module and the name of the function. + primitive_type (str): + Type of primitive. + primitive_subtype (str): + Subtype of the primitive. + context_arguments (list or None): + A list with dictionaries containing the name and type of the context arguments. + fixed_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value that it + should take. + tunable_hyperparameters (dict or None): + A dictionary containing as key the name of the hyperparameter and as + value a dictionary containing the type and the default value and the + range of values that it can take. + primitive_inputs (list or None): + A list with dictionaries containing the name and type of the input values. If + ``None`` default values for those will be used. + primitive_outputs (list or None): + A list with dictionaries containing the name and type of the output values. If + ``None`` default values for those will be used. + primitives_path (str): + Path to the root of the primitives folder, in which the primitives JSON will be stored. + Defaults to `sigpro/primitives`. + primitives_subfolders (bool): + Whether to store the primitive JSON in a subfolder tree (``True``) or to use a flat + primitive name (``False``). Defaults to ``True``. + + Raises: + ValueError: + If the primitive specification arguments are not valid. + + Returns: + type: + Dynamically-generated custom Primitive type. + str: + Path of the generated JSON file. + """ + primitive_path = make_primitive(primitive, primitive_type, primitive_subtype, + context_arguments, fixed_hyperparameters, + tunable_hyperparameters, primitive_inputs, + primitive_outputs, primitives_path, + primitives_subfolders) + return get_primitive_class(primitive, primitive_type, primitive_subtype, + context_arguments, fixed_hyperparameters, + tunable_hyperparameters, primitive_inputs, + primitive_outputs), primitive_path diff --git a/sigpro/demo.py b/sigpro/demo.py index b8bf699..085ed37 100644 --- a/sigpro/demo.py +++ b/sigpro/demo.py @@ -87,7 +87,7 @@ def get_amplitude_demo(index=None): """ df = _load_demo() if index is None: - index = random.randint(0, len(df)) + index = random.randint(0, len(df) - 1) return np.array(df.iloc[index]['values']), 10000 diff --git a/sigpro/pipeline.py b/sigpro/pipeline.py new file mode 100644 index 0000000..ff2941b --- /dev/null +++ b/sigpro/pipeline.py @@ -0,0 +1,684 @@ +# -*- coding: utf-8 -*- +"""Pipeline signal processing functionality.""" + +import logging +from abc import ABC +from collections import Counter +from copy import copy, deepcopy +from itertools import product + +import pandas as pd +from mlblocks import MLPipeline + +from sigpro.primitive import Primitive + +# Temporary refactor from core, ignore duplicate code. +# pylint: disable = duplicate-code, too-many-statements, too-many-nested-blocks +DEFAULT_INPUT = [ + { + 'name': 'readings', + 'keyword': 'data', + 'type': 'pandas.DataFrame' + }, + { + 'name': 'feature_columns', + 'default': None, + 'type': 'list' + } +] + +DEFAULT_OUTPUT = [ + { + 'name': 'readings', + 'type': 'pandas.DataFrame' + }, + { + 'name': 'feature_columns', + 'type': 'list' + } +] +LOGGER = logging.getLogger(__name__) + + +def _get_primitive_metadata(primitive_object): + """ + Return the tag, name, and initial fixed hyperparameters of the primitive. + + Same information as get_hyperparam_dict but in a different format. + """ + hyperparam_dict = primitive_object.get_hyperparam_dict() + return (hyperparam_dict.get('name'), + hyperparam_dict.get('primitive'), + hyperparam_dict.get('init_params')) + + +class Pipeline(ABC): + """Abstract Pipeline class to apply multiple transformation and aggregation primitives.""" + + def __init__(self): + self.values_column_name = 'values' + self.input_is_dataframe = True + self.pipeline = None + + def get_pipeline(self): + """Return the MLPipeline in self.pipeline.""" + return self.pipeline + + def _set_values_column_name(self, values_column_name): + self.values_column_name = values_column_name + + def _accept_dataframe_input(self, input_is_dataframe): + self.input_is_dataframe = input_is_dataframe + + def _apply_pipeline(self, window, is_series=False): + """Apply a ``mlblocks.MLPipeline`` to a row. + + Apply a ``MLPipeline`` to a window of a ``pd.DataFrame``, this function can + be combined with the ``pd.DataFrame.apply`` method to be applied to the + entire data frame. + + Args: + window (pd.Series): + Row or multiple rows (window) used to apply the pipeline to. + is_series (bool): + Indicator whether window is formated as a series or dataframe. + """ + if is_series: + context = window.to_dict() + amplitude_values = context.pop(self.values_column_name) + else: + context = {} if window.empty else { + k: v for k, v in window.iloc[0].to_dict().items() if k != self.values_column_name + } + amplitude_values = list(window[self.values_column_name]) + + output = self.pipeline.predict( + amplitude_values=amplitude_values, + **context, + ) + output_names = self.pipeline.get_output_names() + + # ensure that we can iterate over output + output = output if isinstance(output, tuple) else (output, ) + + return pd.Series(dict(zip(output_names, output))) + + def get_primitive_names(self): + """Get a list of names of primitives in the pipeline.""" + return self.pipeline.primitives + + def get_primitives(self): + """Get a list of Primitive objects in the pipeline.""" + raise NotImplementedError + + def get_output_combinations(self): + """Get a list of output feature tuples produced by the pipeline.""" + raise NotImplementedError + + def get_output_features(self): + """Get a list of output feature strings produced by the pipeline.""" + combinations = self.get_output_combinations() + output_features = [] + for combination in combinations: + tags = [prim.get_tag() for prim in combination] + final_primitive_outputs = combination[-1].get_outputs() + for output_dict in final_primitive_outputs: + out_name = output_dict['name'] + output_features.append('.'.join(tags) + '.' + out_name) + + return output_features + + def process_signal(self, data=None, window=None, values_column_name='values', + time_index=None, groupby_index=None, feature_columns=None, + keep_columns=False, input_is_dataframe=True, **kwargs): + """Apply multiple transformation and aggregation primitives. + + The process_signals method is responsible for applying a Pipeline specified by the + user in order to create features for the given data. + + Args: + data (pandas.DataFrame): + Dataframe with a column that contains signal values. + window (str): + Duration of window size, e.g. ('1h'). + values_column_name (str): + Column in ``data`` that represents the signal values. + time_index (str): + Column in ``data`` that represents the time index. + groupby_index (str or list[str]): + Column(s) to group together and take the window over. + feature_columns (list): + List of column names from the input data frame that must be considered as + features and should not be dropped. + keep_columns (Union[bool, list]): + Whether to keep non-feature columns in the output DataFrame or not. + If a list of column names are passed, those columns are kept. + input_is_dataframe (bool): + Whether the input data is a Dataframe. Used for MLBlocks integration. + + Returns: + tuple: + pandas.DataFrame: + A data frame with new feature columns by applying the previous primitives. If + ``keep_columns`` is ``True`` the original signal values and non-feature + columns will be conserved in the data frame, otherwise any columns + not named in ``keep_columns`` will be deleted. + list: + A list with the feature names generated. + """ + # Error messages are hard to interpret. + self._set_values_column_name(values_column_name) + self._accept_dataframe_input(input_is_dataframe) + + if data is None: + window = pd.Series(kwargs) + values = self._apply_pipeline(window, is_series=True).values + return values if len(values) > 1 else values[0] + + data = data.copy() + if window is not None and groupby_index is not None: + features = data.set_index(time_index).groupby(groupby_index).resample( + rule=window, **kwargs).apply( + self._apply_pipeline + ).reset_index() + data = features + + else: + features = data.apply( + self._apply_pipeline, + axis=1, + is_series=True + ) + data = pd.concat([data, features], axis=1) + + if feature_columns: + feature_columns = feature_columns + list(features.columns) + else: + feature_columns = list(features.columns) + + if isinstance(keep_columns, list): + data = data[keep_columns + feature_columns] + elif not keep_columns: + data = data[feature_columns] + + return data, feature_columns + + def get_input_args(self): + """Return the pipeline input args.""" + if self.input_is_dataframe: + return deepcopy(DEFAULT_INPUT) + + return self.pipeline.get_predict_args() + + def get_output_args(self): + """Return the pipeline output args.""" + if self.input_is_dataframe: + return deepcopy(DEFAULT_OUTPUT) + + return self.pipeline.get_outputs() + + +class LinearPipeline(Pipeline): + """ + LinearPipeline class applies multiple transformation and aggregation primitives. + + The LinearPipeline class applies a sequence of transformation primitives and applies several + aggregation primitives in parallel to produce output features. + + Args: + transformations (list): + List of transformation primitive objects. + aggregations (list): + List of dictionaries containing the aggregation primitives. + + Returns: + sigpro.pipeline.LinearPipeline: + A ``LinearPipeline`` object that produces the features from applying each + aggregation individually to the result of applying all transformations to + the input data. + """ + + def __init__(self, transformations, aggregations): # pylint: disable=too-many-locals + + super().__init__() + self.transformations = transformations + self.aggregations = aggregations + + primitives = [] + init_params = {} + prefix = [] + outputs = [] + counter = Counter() + + for transformation in self.transformations: + transformation._validate_primitive_spec() + prefix.append(transformation.get_tag()) + primitive = transformation.get_name() + counter[primitive] += 1 + primitive_name = f'{primitive}#{counter[primitive]}' + primitives.append(primitive) + params = transformation.get_hyperparam_dict().get('init_params') + if params: + init_params[primitive_name] = params + + prefix = '.'.join(prefix) if prefix else '' + + for aggregation in self.aggregations: + aggregation._validate_primitive_spec() + aggregation_name = f'{prefix}.{aggregation.get_tag()}' if prefix \ + else aggregation.get_tag() + + primitive = aggregation.get_name() + counter[primitive] += 1 + primitive_name = f'{primitive}#{counter[primitive]}' + primitives.append(primitive) + + primitive_json = aggregation.make_primitive_json() + primitive_outputs = primitive_json['produce']['output'] + + params = aggregation.get_hyperparam_dict().get('init_params') + if params: + init_params[primitive_name] = params + + if not isinstance(primitive_outputs, str): + for output in primitive_outputs: + output = output['name'] + outputs.append({ + 'name': f'{aggregation_name}.{output}', + 'variable': f'{primitive_name}.{output}' + }) + + outputs = {'default': outputs} if outputs else None + + self.pipeline = MLPipeline( + primitives, + init_params=init_params, + outputs=outputs) + + def get_primitives(self): + """Get a list of primitives in the pipeline.""" + return copy(self.transformations.copy() + self.aggregations.copy()) + + def get_output_combinations(self): + """Get a list of output feature tuples produced by the pipeline.""" + return [tuple(self.transformations.copy() + [aggregation]) + for aggregation in self.aggregations] + + +def build_linear_pipeline(transformations, aggregations): + """ + Build a linear pipeline with given transformation and aggregation layers. + + Args: + transformations (list): + List of transformation primitives. + aggregations (list): + List of aggregation primitives. + + Returns: + sigpro.pipeline.LinearPipeline: + A ``LinearPipeline`` object that produces the features from applying each + aggregation individually to the result of applying all transformations to + the input data. + """ + pipeline_object = LinearPipeline(transformations, aggregations) + return pipeline_object + + +class LayerPipeline(Pipeline): + """ + Layer pipelines in SigPro. + + Args: + primitives (list): + List of primitive objects. All primitives should have distinct tags. + + primitive_combinations (list): + List of output features to be generated. Each feature + in primitive_combinations should be a tuple of primitive objects found as keys in + primitives, or a tuple of their string tags. All combinations should start w/ some + (possibly zero) number of transformations and end with a single aggregation. + + features_as_strings (bool): + True if primitive_combinations is defined w/ string names, + False if primitive_combinations is defined with primitive objects (default). + + Raises: + ValueError: + If the pipeline specification is invalid. + + Returns: + LayerPipeline that generates the primitives in primitive_combinations. + """ + + def __init__(self, primitives, primitive_combinations, features_as_strings=False): + """Initialize a LayerPipeline.""" + super().__init__() + + primitives_dict = {} + for primitive in primitives: + if primitive.get_tag() in primitives_dict: + error_str = f'Tag {primitive.get_tag()} is duplicated.' + error_str += ' All primitives must have distinct tags.' + raise ValueError(error_str) + + primitives_dict[primitive.get_tag()] = primitive + + if not primitive_combinations: # check if list is empty + raise ValueError('At least one non-empty output feature must be specified') + + length = max(len(combination) for combination in primitive_combinations) + if length == 0: + raise ValueError('At least one non-empty output feature must be specified') + + self.num_layers = length + self.primitives = copy(primitives[:]) # Will need to adjust API to remove primitives arg + self.primitive_combinations = None + + if features_as_strings: + primitive_combinations = [tuple(primitives_dict[tag] for tag in tag_tuple) + for tag_tuple in primitive_combinations] + + self.primitive_combinations = [tuple(combination) for combination + in primitive_combinations] + + for combination in self.primitive_combinations: + combo_length = len(combination) + for ind in range(combo_length - 1): + if combination[ind].get_type_subtype()[0] != 'transformation': + error_str = f'Primitive at non-terminal position #{ind+1}/{combo_length}' + error_str += ' is not a transformation' + raise ValueError(error_str) + + if combination[-1].get_type_subtype()[0] != 'aggregation': + raise ValueError('Last primitive is not an aggregation') + + for primitive in combination: + if primitive not in self.primitives: + error_str = f'Primitive with tag {primitive.get_tag()} not found in the' + error_str += ' given primitives' + raise ValueError(error_str) + + self.pipeline = self._build_pipeline() + + def _build_pipeline(self): # pylint: disable=too-many-locals, too-many-branches + """ + Build the layer pipeline. + + Returns: + mlblocks.MLPipeline: + An ``MLPipeline`` object that produces the features in primitives_combinations. + """ + prefixes = {} + primitive_counter = Counter() + final_primitives_list = [] + final_init_params = {} + final_primitive_inputs = {} + final_primitive_outputs = {} + final_outputs = [] + + for layer in range(1, 1 + self.num_layers): + for combination in self.primitive_combinations: + combination_length = len(combination) + if layer > combination_length: + continue + + if (tuple(combination[:layer]) not in prefixes) or layer == combination_length: + # Since all features are T.T...T.A, no combo should be a prefix of another. + if layer == combination_length: + assert tuple(combination[:layer]) not in prefixes + + final_primitive = combination[layer - 1] + prefixes[(tuple(combination[:layer]))] = final_primitive + + final_primitive_str, final_primitive_name, final_primitive_params = \ + _get_primitive_metadata(final_primitive) + + primitive_counter[final_primitive_name] += 1 + numbered_primitive_name = \ + f'{final_primitive_name}#{primitive_counter[final_primitive_name]}' + + final_init_params[numbered_primitive_name] = \ + final_primitive_params + final_primitives_list.append(final_primitive_name) + # Map primitive inputs and outputs. + + final_primitive_inputs[numbered_primitive_name] = {} + final_primitive_outputs[numbered_primitive_name] = {} + + for input_dict in final_primitive.get_inputs(): + final_primitive_inputs[numbered_primitive_name][input_dict['name']] = \ + f'{final_primitive_str}.' + str(input_dict['name']) + + in_name = input_dict['name'] + is_required = True + if 'optional' in input_dict: + is_required = not input_dict['optional'] + + # Context arguments should be named properly in the input data. + if in_name not in final_primitive.get_context_arguments() and \ + in_name != 'amplitude_values' and is_required: + + # We need to hook up the primitive input to the proper output in chain + if layer == 1: + npn = numbered_primitive_name[:] # lint + final_primitive_inputs[npn][in_name] = str(in_name) + continue + prev_prim = None + prev_ind = None + + # Approach: find the most recent predecessor primitive in the feature + # that could have generated the specific input_name. + + for p in reversed(range(0, layer)): # pylint: disable=invalid-name + prev_prim_cand = combination[p - 1] + test_ops = [op['name'] for op in prev_prim_cand.get_outputs()] + if in_name in test_ops: + prev_prim = prev_prim_cand + prev_ind = p + break + + if prev_prim is None and layer > 1: + # If we can't find the predecessor, assume that the value is given. + + final_primitive_inputs[numbered_primitive_name][in_name] = \ + f'{final_primitive_str}.' + str(in_name) + warning_str = f'expecting {in_name} to be given by the user.' + LOGGER.warning(warning_str) + # fps = final_primitive_str # lint + # raise ValueError(f'Arg {in_name} of primitive {fps} \ + # not produced by any predecessor primitive.') + + else: + previous_comb = combination[:prev_ind] + previous_comb_str = '.'.join([pr.get_tag() for pr in + previous_comb]) + previous_comb_str += f'.{prev_ind}' + final_primitive_inputs[numbered_primitive_name][in_name] = \ + f'{previous_comb_str}.{in_name}' + + if layer == 1: + final_primitive_inputs[numbered_primitive_name]['amplitude_values'] = \ + 'amplitude_values' + + else: + input_column_name = '.'.join([pr.get_tag() for pr in + combination[:layer - 1]]) + input_column_name += f'.{layer-1}' + final_primitive_inputs[numbered_primitive_name]['amplitude_values'] = \ + input_column_name + '.amplitude_values' + + output_column_name = '.'.join([prim.get_tag() for prim in combination[:layer]]) + + if layer <= combination_length - 1: + output_column_name += '.' + str(layer) + for output_dict in final_primitive.get_outputs(): + out_name = output_dict['name'] + final_primitive_outputs[numbered_primitive_name][out_name] = \ + f'{output_column_name}.' + str(out_name) + + else: + npn = numbered_primitive_name[:] # lint + for output_dict in final_primitive.get_outputs(): + out_name = output_dict['name'] + final_outputs.append({'name': output_column_name + '.' + str(out_name), + 'variable': f'{npn}.{out_name}'}) + + return MLPipeline( + primitives=final_primitives_list, + init_params=final_init_params, + input_names=final_primitive_inputs, + output_names=final_primitive_outputs, + outputs={'default': final_outputs} + ) + + def get_primitives(self): + """Get a list of primitives in the pipeline.""" + return self.primitives.copy() + + def get_output_combinations(self): + """Get a list of output feature tuples produced by the pipeline.""" + return [x[:] for x in self.primitive_combinations] + + +def build_tree_pipeline(transformation_layers, aggregation_layer): + """ + Build a tree pipeline using given transformation and aggregation layers. + + Args: + transformation_layers (list): + List of transformation layers, each a list of transformation primitives. + aggregation_layer (list): + List of aggregation primitives. + + Raises: + ValueError: + If the pipeline specification is invalid. + + Returns: + sigpro.pipeline.LayerPipeline: + A ``LayerPipeline`` object that produces the features in the Cartesian product + of all transformation layers and the aggregation layer. + """ + primitives_all = set() + all_layers = [] + + if not isinstance(transformation_layers, list): + raise ValueError('transformation_layers must be a list') + for layer in transformation_layers: + if isinstance(layer, list): + for primitive_ in layer: + if not isinstance(primitive_, Primitive): + raise ValueError('Non-primitive specified in transformation_layers') + + all_layers.append(layer.copy()) + primitives_all.update(layer) + + else: + raise ValueError('Each layer in transformation_layers must be a list') + + if isinstance(aggregation_layer, list): + for primitive_ in aggregation_layer: + if not isinstance(primitive_, Primitive): + raise ValueError('Non-primitive specified in aggregation_layer') + + all_layers.append(aggregation_layer.copy()) + primitives_all.update(aggregation_layer) + + else: + raise ValueError('aggregation_layer must be a list') + + primitive_combinations = list(product(*all_layers)) + return LayerPipeline(primitives=list(primitives_all), + primitive_combinations=primitive_combinations) + + +def build_layer_pipeline(primitives, primitive_combinations): + """ + Layer pipeline building. + + Build a layer pipeline from a list of primitives and a list of + combination features, each a tuple of Primitives. + + Args: + primitives (list): + List of primitive objects. All primitives should have distinct tags. + primitive_combinations (list): + List of output features to be generated. Each feature + in primitive_combinations should be a tuple of primitive objects found as keys in + primitives, or a tuple of their string tags. All combinations should start w/ some + (possibly zero) number of transformations and end with a single aggregation. + + features_as_strings (bool): + True if primitive_combinations is defined w/ string names, + False if primitive_combinations is defined with primitive objects (default). + + Raises: + ValueError: + If the pipeline specification is invalid. + + Returns: + LayerPipeline that generates the primitives in primitive_combinations. + """ + return LayerPipeline(primitives=primitives, + primitive_combinations=primitive_combinations, + features_as_strings=False) + + +def build_layer_pipeline_str(primitives, primitive_combinations): + """ + Layer pipeline building. + + Build a layer pipeline from a list of primitives and a list of + combination features, each a tuple of string tags of primitives. + + Args: + primitives (list): + List of primitive objects. All primitives should have distinct tags. + primitive_combinations (list): + List of output features to be generated. Each feature + in primitive_combinations should be a tuple of string primitive tags. + All combinations should start w/ some (possibly zero) number of transformations + and end with a single aggregation. + features_as_strings (bool): + True if primitive_combinations is defined w/ string names, + False if primitive_combinations is defined with primitive objects (default). + + Raises: + ValueError: + If the pipeline specification is invalid. + + Returns: + LayerPipeline that generates the primitives in primitive_combinations. + """ + return LayerPipeline(primitives=primitives, + primitive_combinations=primitive_combinations, + features_as_strings=True) + + +def merge_pipelines(pipelines): + """ + Create a single layer pipeline that is the 'union' of several other pipelines. + + In other words, the pipeline generates all features generated by + at least one input pipeline. + + Args: + pipelines (list): + A list of Pipeline objects whose output features should be merged. + + Returns: + sigpro.pipeline.LayerPipeline: + A ``LayerPipeline`` object that produces the features in the union of + all features generated by the input pipelines. + + """ + primitives_all = set() + primitive_combinations = set() + + for pipeline in (pipelines)[::-1]: + primitives_all.update(pipeline.get_primitives()) + primitive_combinations.update(pipeline.get_output_combinations()) + + return LayerPipeline(primitives=list(primitives_all), + primitive_combinations=list(primitive_combinations)) diff --git a/sigpro/primitive.py b/sigpro/primitive.py new file mode 100644 index 0000000..a6119be --- /dev/null +++ b/sigpro/primitive.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +"""SigPro Primitive class.""" + +import copy + +from mlblocks.mlblock import import_object + +from sigpro.contributing import ( + _check_primitive_type_and_subtype, _get_primitive_args, _get_primitive_spec, + _make_primitive_dict, _write_primitive) + + +class Primitive(): # pylint: disable=too-many-instance-attributes + """ + Represents a SigPro primitive. + + Each primitive object represents a specific transformation or aggregation. Moreover, + a Primitive maintains all the information in its JSON annotation as well as its + hyperparameter values. + + Args: + primitive (str): + The name of the primitive, the python path including the name of the + module and the name of the function. + primitive_type (str): + Type of primitive. + primitive_subtype (str): + Subtype of the primitive. + init_params (dict): + Initial (fixed) hyperparameter values of the primitive in + {hyperparam_name: hyperparam_value} format. + """ + + def __init__(self, primitive, primitive_type, primitive_subtype, init_params=None): + + self.primitive = primitive + self.tag = primitive.split('.')[-1] + self.primitive_type = primitive_type + self.primitive_subtype = primitive_subtype + self.tunable_hyperparameters = {} + self.fixed_hyperparameters = {} + self.context_arguments = [] + primitive_spec = _get_primitive_spec(primitive_type, primitive_subtype) + self.primitive_inputs = primitive_spec['args'] + self.primitive_outputs = primitive_spec['output'] + + _check_primitive_type_and_subtype(primitive_type, primitive_subtype) + + self.primitive_function = import_object(primitive) + if init_params is None: + init_params = {} + self.hyperparameter_values = init_params + + def get_name(self): + """Get the name of the primitive.""" + return self.primitive + + def get_tag(self): + """Get the tag of the primitive.""" + return self.tag + + def get_inputs(self): + """Get the inputs of the primitive.""" + return copy.deepcopy(self.primitive_inputs) + + def get_outputs(self): + """Get the outputs of the primitive.""" + return copy.deepcopy(self.primitive_outputs) + + def get_type_subtype(self): + """Get the type and subtype of the primitive.""" + return self.primitive_type, self.primitive_subtype + + def get_context_arguments(self): + """Get the context arguments of the primitive.""" + return copy.deepcopy(self.context_arguments) + + def _validate_primitive_spec(self): # check compatibility of given parameters. + _get_primitive_args( + self.primitive_function, + self.primitive_inputs, + self.context_arguments, + self.fixed_hyperparameters, + self.tunable_hyperparameters) + + def get_hyperparam_dict(self): + """Return the dictionary of fixed hyperparameters for use in Pipelines.""" + return {'name': self.get_tag(), 'primitive': self.get_name(), + 'init_params': copy.deepcopy(self.hyperparameter_values)} + + def set_tag(self, tag): + """Set the tag of a primitive.""" + self.tag = tag + return self + + def set_primitive_inputs(self, primitive_inputs): + """Set primitive inputs.""" + self.primitive_inputs = primitive_inputs + + def set_primitive_outputs(self, primitive_outputs): + """Set primitive outputs.""" + self.primitive_outputs = primitive_outputs + + def _set_primitive_type(self, primitive_type): + self.primitive_type = primitive_type + + def _set_primitive_subtype(self, primitive_subtype): + self.primitive_subtype = primitive_subtype + + def set_context_arguments(self, context_arguments): + """Set context_arguments of a primitive.""" + self.context_arguments = context_arguments + + def set_tunable_hyperparameters(self, tunable_hyperparameters): + """Set tunable hyperparameters of a primitive.""" + self.tunable_hyperparameters = tunable_hyperparameters + + def set_fixed_hyperparameters(self, fixed_hyperparameters): + """Set fixed hyperparameters of a primitive.""" + self.fixed_hyperparameters = fixed_hyperparameters + + def make_primitive_json(self): + """ + View the primitive json produced by a Primitive object. + + Raises: + ValueError: + If the primitive specification arguments are not valid (as in sigpro.contributing). + + Returns: + dict: + Dictionary containing the JSON annotation for the primitive. + """ + self._validate_primitive_spec() + return _make_primitive_dict(self.primitive, self.primitive_type, + self.primitive_subtype, self.context_arguments, + self.fixed_hyperparameters, self.tunable_hyperparameters, + self.primitive_inputs, self.primitive_outputs) + + def write_primitive_json(self, primitives_path='sigpro/primitives', + primitives_subfolders=True): + """ + Write the primitive json produced by a Primitive object and return the path. + + Args: + primitives_path (str): + Path to the root of the primitives folder, in which the primitives JSON will be + stored. Defaults to `sigpro/primitives`. + primitives_subfolders (bool): + Whether to store the primitive JSON in a subfolder tree (``True``) or to use a flat + primitive name (``False``). Defaults to ``True``. + + Returns: + str: + Path of the generated JSON file. + """ + return _write_primitive(self.make_primitive_json(), self.primitive, + primitives_path, primitives_subfolders) + +# Primitive inheritance subclasses +# pylint: disable=super-init-not-called +# Transformations + + +class TransformationPrimitive(Primitive): + """Generic transformation primitive.""" + + def __init__(self, primitive, primitive_subtype, init_params=None): + super().__init__(primitive, 'transformation', primitive_subtype, init_params=init_params) + + +class AmplitudeTransformation(TransformationPrimitive): + """Generic amplitude transformation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'amplitude', init_params=init_params) + + +class FrequencyTransformation(TransformationPrimitive): + """Generic frequency transformation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'frequency', init_params=init_params) + + +class FrequencyTimeTransformation(TransformationPrimitive): + """Generic frequency-time transformation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'frequency_time', init_params=init_params) + + +class ComparativeTransformation(TransformationPrimitive): + """Generic comparative transformation primitive.""" + + def __init__(self, primitive, init_params=None): + raise NotImplementedError +# Aggregations + + +class AggregationPrimitive(Primitive): + """Generic aggregation primitive.""" + + def __init__(self, primitive, primitive_subtype, init_params=None): + super().__init__(primitive, 'aggregation', primitive_subtype, init_params=init_params) + + +class AmplitudeAggregation(AggregationPrimitive): + """Generic amplitude aggregation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'amplitude', init_params=init_params) + + +class FrequencyAggregation(AggregationPrimitive): + """Generic frequency aggregation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'frequency', init_params=init_params) + + +class FrequencyTimeAggregation(AggregationPrimitive): + """Generic frequency-time aggregation primitive.""" + + def __init__(self, primitive, init_params=None): + super().__init__(primitive, 'frequency_time', init_params=init_params) + + +class ComparativeAggregation(AggregationPrimitive): + """Generic comparative aggregation primitive.""" + + def __init__(self, primitive, init_params=None): + raise NotImplementedError diff --git a/sigpro/primitives/sigpro/transformations/frequency/band/frequency_band.json b/sigpro/primitives/sigpro/transformations/frequency/band/frequency_band.json index 50fd317..ff44a7e 100644 --- a/sigpro/primitives/sigpro/transformations/frequency/band/frequency_band.json +++ b/sigpro/primitives/sigpro/transformations/frequency/band/frequency_band.json @@ -2,7 +2,7 @@ "name": "sigpro.transformations.frequency.band.frequency_band", "primitive": "sigpro.transformations.frequency.band.frequency_band", "classifiers": { - "type": "aggregation", + "type": "transformation", "subtype": "frequency" }, "produce": { diff --git a/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft.json b/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft.json index e205ad3..43c1dbe 100644 --- a/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft.json +++ b/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft.json @@ -1,6 +1,6 @@ { - "name": "sigpro.transformations.frequency.stft.stft", - "primitive": "sigpro.transformations.frequency.stft.stft", + "name": "sigpro.transformations.frequency_time.stft.stft", + "primitive": "sigpro.transformations.frequency_time.stft.stft", "classifiers": { "type": "transformation", "subtype": "frequency_time" diff --git a/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft_real.json b/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft_real.json index dc064b8..24c730d 100644 --- a/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft_real.json +++ b/sigpro/primitives/sigpro/transformations/frequency_time/stft/stft_real.json @@ -1,6 +1,6 @@ { - "name": "sigpro.transformations.frequency.stft.stft_real", - "primitive": "sigpro.transformations.frequency.stft.stft_real", + "name": "sigpro.transformations.frequency_time.stft.stft_real", + "primitive": "sigpro.transformations.frequency_time.stft.stft_real", "classifiers": { "type": "transformation", "subtype": "frequency_time" diff --git a/tests/integration/test_contributing_primitive.py b/tests/integration/test_contributing_primitive.py new file mode 100644 index 0000000..bf27346 --- /dev/null +++ b/tests/integration/test_contributing_primitive.py @@ -0,0 +1,80 @@ +"""Test module for SigPro contributing_primitive module.""" + +import json +import os +import tempfile + +from sigpro.basic_primitives import Mean +from sigpro.contributing_primitive import make_primitive_class + +EXPECTED_PRIMITIVE_DICT = { + "name": "sigpro.aggregations.amplitude.statistical.mean", + "primitive": "sigpro.aggregations.amplitude.statistical.mean", + "classifiers": { + "type": "aggregation", + "subtype": "amplitude" + }, + "produce": { + "args": [ + { + "name": "amplitude_values", + "type": "numpy.ndarray" + } + ], + "output": [ + { + "name": "mean_value", + "type": "float" + } + ] + }, + 'hyperparameters': { + 'fixed': {}, + 'tunable': {} + } +} + + +def test_make_primitive_class_primitives_subfolders_true(): + with tempfile.TemporaryDirectory('sigpro') as tmp_dir: + expected_result = ['sigpro', 'aggregations', 'amplitude', 'statistical', 'mean.json'] + expected_result = os.path.join(tmp_dir, *expected_result) + primitive_outputs = [{'name': 'mean_value', 'type': 'float'}] + Mean_dynamic, result = make_primitive_class( + 'sigpro.aggregations.amplitude.statistical.mean', + 'aggregation', + 'amplitude', + primitives_path=tmp_dir, + primitive_outputs=primitive_outputs + ) + assert result == expected_result + with open(result, 'rb') as created_primitive: + primitive_dict = json.load(created_primitive) + assert primitive_dict == EXPECTED_PRIMITIVE_DICT + + mean_instance = Mean_dynamic() + mean_default = Mean() + assert mean_instance.make_primitive_json() == mean_default.make_primitive_json() + + +def test_make_primitive_class_primitives_subfolders_false(): + with tempfile.TemporaryDirectory('sigpro') as tmp_dir: + expected_result = 'sigpro.aggregations.amplitude.statistical.mean.json' + expected_result = os.path.join(tmp_dir, expected_result) + primitive_outputs = [{'name': 'mean_value', 'type': 'float'}] + Mean_dynamic, result = make_primitive_class( + 'sigpro.aggregations.amplitude.statistical.mean', + 'aggregation', + 'amplitude', + primitives_path=tmp_dir, + primitive_outputs=primitive_outputs, + primitives_subfolders=False + ) + assert result == expected_result + with open(result, 'rb') as created_primitive: + primitive_dict = json.load(created_primitive) + assert primitive_dict == EXPECTED_PRIMITIVE_DICT + + mean_instance = Mean_dynamic() + mean_default = Mean() + assert mean_instance.make_primitive_json() == mean_default.make_primitive_json() diff --git a/tests/integration/test_pipeline.py b/tests/integration/test_pipeline.py new file mode 100644 index 0000000..e855ed4 --- /dev/null +++ b/tests/integration/test_pipeline.py @@ -0,0 +1,196 @@ +"""Test module for SigPro pipeline module.""" +import pandas as pd +import pytest + +from sigpro import pipeline, primitive +from sigpro.basic_primitives import FFT, BandMean, FFTReal, Identity, Kurtosis, Mean + +TEST_INPUT = pd.DataFrame({'timestamp': pd.to_datetime(['2020-01-01 00:00:00']), + 'values': [[1, 2, 3, 4, 5, 6]], + 'sampling_frequency': [10000], + 'dummy': [1], }) + +TEST_OUTPUT = pd.DataFrame({'fftr.id1.bm.value': [(-3 + 0j)], + 'fftr.id1.mean.mean_value': [(1 + 0j)], + 'fftr.mean.mean_value': [(1 + 0j)], + 'fftr.id1.kurtosis.kurtosis_value': [(4.2 + 0j)], + 'fftr.id2.bm.value': [(-3 + 0j)], + 'fftr.id2.mean.mean_value': [(1 + 0j)], + 'fftr.id2.kurtosis.kurtosis_value': [(4.2 + 0j)], + 'fft.id1.bm.value': [(-3 + 3.4641016151377544j)], + 'fft.id1.mean.mean_value': [(1 + 0j)], + 'fft.id1.kurtosis.kurtosis_value': [(5.34 + 3.866899242231838e-18j)], + 'fft.id2.bm.value': [(-3 + 3.4641016151377544j)], + 'fft.id2.mean.mean_value': [(1 + 0j)], + 'fft.id2.kurtosis.kurtosis_value': [(5.34 + 3.866899242231838e-18j)]}) + + +def _verify_pipeline_outputs(sigpro_pipeline, input_data, output_data, columns_to_check=None): + """Verify that a pipeline produces the output data on a set of dataframe inputs.""" + assert isinstance(sigpro_pipeline, pipeline.Pipeline) + assert input_data is not None + assert output_data is not None + assert columns_to_check != [] + + processed_signal, feature_list = sigpro_pipeline.process_signal(input_data) + if columns_to_check is None: + columns_to_check = feature_list[:] + for column in columns_to_check: + assert column in feature_list + assert column in processed_signal.columns + cols_reduced = [i for i in columns_to_check if i in output_data.columns] + pd.testing.assert_frame_equal(processed_signal[cols_reduced], output_data[cols_reduced]) + + +def test_linear_pipeline(): + """build_linear_pipeline test.""" + + transformations = [Identity().set_tag('id1'), FFTReal().set_tag('fftr')] + aggregations = [Mean(), Kurtosis(fisher=False)] + + assert isinstance(transformations[0], primitive.Primitive) + + sample_pipeline = pipeline.build_linear_pipeline(transformations, aggregations) # incomplete + + assert isinstance(sample_pipeline, pipeline.LinearPipeline) + + _verify_pipeline_outputs(sample_pipeline, TEST_INPUT, TEST_OUTPUT) + + +def test_tree_pipeline(): + """build_tree_pipeline test.""" + + t_layer1 = [FFTReal().set_tag('fftr'), FFT()] + t_layer2 = [Identity().set_tag('id1'), Identity().set_tag('id2')] + + a_layer = [BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False)] + + sample_pipeline = pipeline.build_tree_pipeline([t_layer1, t_layer2], a_layer) + + assert isinstance(sample_pipeline, pipeline.LayerPipeline) + assert len(set(sample_pipeline.get_output_features())) == 12 # 2 * 2 * 3 + + _verify_pipeline_outputs(sample_pipeline, TEST_INPUT, TEST_OUTPUT) + + +def test_layer_pipeline(): + """build_layer_pipeline test.""" + + p1, p2 = FFTReal().set_tag('fftr'), FFT() + p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') + p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) + p8 = Identity().set_tag('id3') # unused primitive + + all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8] + + features = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7), (p1, p6)] + + sample_pipeline = pipeline.build_layer_pipeline(all_primitives, features) + + assert isinstance(sample_pipeline, pipeline.LayerPipeline) + + out_features = sample_pipeline.get_output_combinations() + for feature in features: + assert feature in out_features + + _verify_pipeline_outputs(sample_pipeline, TEST_INPUT, TEST_OUTPUT) + + +def test_merge_pipelines(): + """merge_pipelines test.""" + p1, p2 = FFTReal().set_tag('fftr'), FFT() + p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') + p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) + p8 = Identity().set_tag('id3') # unused primitive + + all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8] + + features1 = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)] + + sample_pipeline1 = pipeline.build_layer_pipeline(all_primitives, features1) + + sample_pipeline2 = pipeline.build_tree_pipeline([[p1, p2], [p3]], [p5]) + + sample_pipeline3 = pipeline.build_linear_pipeline([p1, p4], [p6]) + + features = features1 + [(p2, p3, p5), (p1, p4, p6)] + sample_pipeline = pipeline.merge_pipelines([sample_pipeline1, + sample_pipeline2, + sample_pipeline3]) + + assert isinstance(sample_pipeline, pipeline.LayerPipeline) + + out_features = sample_pipeline.get_output_combinations() + for feature in features: + assert feature in out_features + + +def test_invalid_tree_pipelines(): + """Test invalid tree pipelines.""" + + p1, p2, p2_duplicate = FFTReal().set_tag('fftr'), FFT(), FFT() + p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') + p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) + + t_layer1 = [p1, p2] + t_layer2 = [p3, p4] + + a_layer = [p5, p6, p7] + + # Empty Cartesian product + with pytest.raises(ValueError): + pipeline.build_tree_pipeline([t_layer1, []], a_layer) + with pytest.raises(ValueError): + pipeline.build_tree_pipeline([[], t_layer2], a_layer) + with pytest.raises(ValueError): + pipeline.build_tree_pipeline([t_layer1, t_layer2], []) + + # Duplicate tags + with pytest.raises(ValueError): + pipeline.build_tree_pipeline([t_layer1 + [p2_duplicate], t_layer2], a_layer) + + # Incorrect primitive order + with pytest.raises(ValueError): + pipeline.build_tree_pipeline([t_layer1, a_layer], t_layer2) + + +def test_invalid_layer_pipelines(): + """Test invalid pipeline formation.""" + + p1, p2, p2_duplicate = FFTReal().set_tag('fftr'), FFT(), FFT() + p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') + p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) + p8 = Identity().set_tag('id3') # unused primitive + + all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8] + + all_primitives_duplicate = all_primitives + [p2_duplicate] + + features = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)] + + no_agg_end = (p1, p3, p4) + intermediate_agg = (p1, p6, p7) + + blank_features = [tuple(), tuple()] + + # Primitive in combination not contained in primitives + with pytest.raises(ValueError): + pipeline.build_layer_pipeline([p1, p2, p3, p5, p6, p7, p8], features) + + # Duplicate primitive + with pytest.raises(ValueError): + pipeline.build_layer_pipeline(all_primitives_duplicate, features) + + # No nontrivial features + with pytest.raises(ValueError): + pipeline.build_layer_pipeline([p1, p3, p5], blank_features) + + with pytest.raises(ValueError): + pipeline.build_layer_pipeline([p1, p3, p5], []) + + # At least one feature in incorrect format + with pytest.raises(ValueError): + pipeline.build_layer_pipeline(all_primitives, features + [no_agg_end]) + + with pytest.raises(ValueError): + pipeline.build_layer_pipeline(all_primitives, features + [intermediate_agg]) diff --git a/tests/integration/test_primitive.py b/tests/integration/test_primitive.py new file mode 100644 index 0000000..befb121 --- /dev/null +++ b/tests/integration/test_primitive.py @@ -0,0 +1,86 @@ +"""Test module for SigPro primitive and basic_primitives modules.""" + +from sigpro import basic_primitives, primitive + + +def test_basic_primitives(): + """Test basic_primitives module.""" + + identity = basic_primitives.Identity() + power_spectrum = basic_primitives.PowerSpectrum() + assert isinstance(identity, primitive.Primitive) + assert isinstance(power_spectrum, primitive.Primitive) + assert identity.get_type_subtype() == ('transformation', 'amplitude') + assert power_spectrum.get_type_subtype() == ('transformation', 'amplitude') + identity.make_primitive_json() + power_spectrum.make_primitive_json() + + fft = basic_primitives.FFT() + fft_real = basic_primitives.FFTReal() + + assert isinstance(fft, primitive.Primitive) + assert isinstance(fft_real, primitive.Primitive) + assert fft.get_type_subtype() == ('transformation', 'frequency') + assert fft_real.get_type_subtype() == ('transformation', 'frequency') + fft.make_primitive_json() + fft_real.make_primitive_json() + + frequency_band = basic_primitives.FrequencyBand(low=10, high=20) + assert isinstance(frequency_band, primitive.Primitive) + assert frequency_band.get_type_subtype() == ('transformation', 'frequency') + frequency_band.make_primitive_json() + + stft = basic_primitives.STFT() + stft_real = basic_primitives.STFTReal() + assert isinstance(stft, primitive.Primitive) + assert isinstance(stft_real, primitive.Primitive) + assert stft.get_type_subtype() == ('transformation', 'frequency_time') + assert stft_real.get_type_subtype() == ('transformation', 'frequency_time') + stft.make_primitive_json() + stft_real.make_primitive_json() + + std = basic_primitives.Std() + var = basic_primitives.Var() + rms = basic_primitives.RMS() + cf = basic_primitives.CrestFactor() + skew = basic_primitives.Skew() + assert isinstance(std, primitive.Primitive) + assert isinstance(var, primitive.Primitive) + assert isinstance(rms, primitive.Primitive) + assert isinstance(cf, primitive.Primitive) + assert isinstance(skew, primitive.Primitive) + assert std.get_type_subtype() == ('aggregation', 'amplitude') + assert var.get_type_subtype() == ('aggregation', 'amplitude') + assert rms.get_type_subtype() == ('aggregation', 'amplitude') + assert cf.get_type_subtype() == ('aggregation', 'amplitude') + assert skew.get_type_subtype() == ('aggregation', 'amplitude') + std.make_primitive_json() + var.make_primitive_json() + rms.make_primitive_json() + cf.make_primitive_json() + skew.make_primitive_json() + + band_mean = basic_primitives.BandMean(min_frequency=0, max_frequency=100) + assert isinstance(band_mean, primitive.Primitive) + assert band_mean.get_type_subtype() == ('aggregation', 'frequency') + band_mean.make_primitive_json() + + +def test_primitives(): + """Test primitives module.""" + + kurtosis = basic_primitives.Kurtosis(bias=False) + kurtosis.set_tag('kurtosis_test') + primitive_str = 'sigpro.aggregations.amplitude.statistical.kurtosis' + init_params = {'fisher': True, 'bias': False} + assert kurtosis.get_hyperparam_dict() == {'name': 'kurtosis_test', + 'primitive': primitive_str, + 'init_params': init_params} + kurtosis.make_primitive_json() + frequency_band = basic_primitives.FrequencyBand(low=10, high=50) + frequency_band.set_tag('frequency_band_test') + primitive_str = 'sigpro.transformations.frequency.band.frequency_band' + init_params = {'low': 10, 'high': 50} + assert frequency_band.get_hyperparam_dict() == {'name': 'frequency_band_test', + 'primitive': primitive_str, + 'init_params': init_params} diff --git a/tutorials/primitives_pipelines_tutorial.ipynb b/tutorials/primitives_pipelines_tutorial.ipynb new file mode 100644 index 0000000..3e941cb --- /dev/null +++ b/tutorials/primitives_pipelines_tutorial.ipynb @@ -0,0 +1,1212 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "898000f4", + "metadata": {}, + "source": [ + "# Using SigPro\n", + "\n", + "In this notebook, we will walk through the process of defining and using primitives in SigPro." + ] + }, + { + "cell_type": "markdown", + "id": "30fb7355", + "metadata": {}, + "source": [ + "## Primitives \n", + "Feature engineering in `SigPro` centers around the **primitive**. In `SigPro`, primitives fall under two main types: transformations, and aggregations. Each type of primitive is further broken down into several primitive subtypes." + ] + }, + { + "cell_type": "markdown", + "id": "f5edcc7c", + "metadata": {}, + "source": [ + "## Inheritance\n", + "\n", + "All primitives are instances of the `sigpro.primitive.Primitive` base class. Furthermore, depending on their respective types and subtypes, each primitive inherits from a specific subclass. For example, a frequency-time transformation primitive would inherit from `sigpro.primitive.FrequencyTimeTransformation`, while an amplitude aggrigation would inherit from `sigpro.primitive.AmplitudeAggregation`." + ] + }, + { + "cell_type": "markdown", + "id": "1ce86073", + "metadata": {}, + "source": [ + "### Initializing Primitives\n", + "\n", + "Let's view a simple primitive and see how to use it." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "caa6c468", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Import primitive\n", + "from sigpro.basic_primitives import Mean\n", + "\n", + "# Initialize the primitive object\n", + "mean_primitive = Mean()\n", + "\n", + "mean_primitive" + ] + }, + { + "cell_type": "markdown", + "id": "7e079aec", + "metadata": {}, + "source": [ + "The `Mean` primitive we just defined is an example of an `AmplitudeAggregation`; in other words, its type is `'aggregation'`, and its subtype is `'amplitude'`. To see this, we call its `get_type_subtype` method:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "6282a2ca", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "('aggregation', 'amplitude')" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Check the type and subtype of mean_primitive\n", + "mean_primitive.get_type_subtype()" + ] + }, + { + "cell_type": "markdown", + "id": "7f896595", + "metadata": {}, + "source": [ + "By default, `mean_primitive` will be given the tag `mean`. To observe or change this tag, call the `get_tag` and `set_tag` methods, respectively." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "81579de6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Old tag: mean\n", + "New tag: mean_tag\n", + "Last tag: mean\n" + ] + } + ], + "source": [ + "#Re-initialize mean_primitive\n", + "mean_primitive = Mean()\n", + "\n", + "# Observe the current tag\n", + "print('Old tag: ', mean_primitive.get_tag())\n", + "\n", + "# Re-tag mean_primitive with a custom string\n", + "mean_primitive.set_tag('mean_tag')\n", + "\n", + "# Observe the new tag\n", + "print('New tag: ', mean_primitive.get_tag())\n", + "\n", + "# Set the tag of mean_primitive back to mean\n", + "mean_primitive = mean_primitive.set_tag('mean')\n", + "\n", + "# Observe the new tag\n", + "print('Last tag: ', mean_primitive.get_tag())" + ] + }, + { + "cell_type": "markdown", + "id": "a2bd74c8", + "metadata": {}, + "source": [ + "Tagging primitives will be useful when building pipelines." + ] + }, + { + "cell_type": "markdown", + "id": "8bf797e6", + "metadata": {}, + "source": [ + "### Primitives with hyperparameters\n", + "\n", + "In our previous example, the `Mean` primitive did not offer any hyperparameters to set by the user. Let's consider the `FrequencyBand` primitive, which accepts two hyperparameters as input arguments: `low` and `high`.\n", + "\n", + "To initialize the primitive, we pass in the `low` and `high` values as (keyword) arguments." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "b3e02f31", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Import primitive\n", + "from sigpro.basic_primitives import FrequencyBand\n", + "\n", + "# Initialize the primitive object\n", + "fb_primitive = FrequencyBand(low = 10, high = 20)\n", + "\n", + "fb_primitive" + ] + }, + { + "cell_type": "markdown", + "id": "753ce936", + "metadata": {}, + "source": [ + "We can preview the hyperparameters with the `get_hyperparam_dict` method." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "e477f6aa", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'frequency_band',\n", + " 'primitive': 'sigpro.transformations.frequency.band.frequency_band',\n", + " 'init_params': {'low': 10, 'high': 20}}" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fb_primitive.get_hyperparam_dict()" + ] + }, + { + "cell_type": "markdown", + "id": "a09b99a3", + "metadata": {}, + "source": [ + "### Primitive JSON Annotations\n", + "\n", + "Each SigPro `Primitive` is accompanied by a corresponding JSON annotation. To preview the JSON annotation, we use the `make_primitive_json` method." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "892d0f69", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"name\": \"sigpro.transformations.frequency.band.frequency_band\",\n", + " \"primitive\": \"sigpro.transformations.frequency.band.frequency_band\",\n", + " \"classifiers\": {\n", + " \"type\": \"transformation\",\n", + " \"subtype\": \"frequency\"\n", + " },\n", + " \"produce\": {\n", + " \"args\": [\n", + " {\n", + " \"name\": \"amplitude_values\",\n", + " \"type\": \"numpy.ndarray\"\n", + " },\n", + " {\n", + " \"name\": \"frequency_values\",\n", + " \"type\": \"numpy.ndarray\"\n", + " }\n", + " ],\n", + " \"output\": [\n", + " {\n", + " \"name\": \"amplitude_values\",\n", + " \"type\": \"numpy.ndarray\"\n", + " },\n", + " {\n", + " \"name\": \"frequency_values\",\n", + " \"type\": \"numpy.ndarray\"\n", + " }\n", + " ]\n", + " },\n", + " \"hyperparameters\": {\n", + " \"fixed\": {\n", + " \"low\": {\n", + " \"type\": \"int\"\n", + " },\n", + " \"high\": {\n", + " \"type\": \"int\"\n", + " }\n", + " },\n", + " \"tunable\": {}\n", + " }\n", + "}\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "# More readable output\n", + "print(json.dumps(fb_primitive.make_primitive_json(), indent = 2))" + ] + }, + { + "cell_type": "markdown", + "id": "cbe98dfb", + "metadata": {}, + "source": [ + "### Primitive Interface\n", + "\n", + "We summarize the public interface of the `sigpro.primitive.Primitive` class below.\n", + "\n", + "| Method name | Additional arguments | Description |\n", + "| --- | --- | --- |\n", + "| `get_name` | | Return the name of the primitive. |\n", + "| `get_tag` | | Return the user-given tag of the primitive. |\n", + "| `get_inputs` | | Return the inputs of the primitive. |\n", + "| `get_outputs` | | Return the outputs of the primitive. |\n", + "| `get_type_subtype` | | Return the type and subtype of the primitive. |\n", + "| `get_hyperparam_dict` | | Return the hyperparameters of the primitive. |\n", + "| `get_context_arguments` | | Return the context arguments of the primitive. |\n", + "| `get_fixed_hyperparameters` | | Return the fixed hyperparameters of the primitive. |\n", + "| `get_tunable_hyperparameters` | | Return the tunable hyperparameters of the primitive. |\n", + "| `set_tag` | `tag` | Set the tag of the primitive and return the primitive itself. |\n", + "| `set_context_arguments` | `context_arguments` | Set the context arguments of the primitive to args. |\n", + "| `set_fixed_hyperparameters` | `fixed_hyperparameters` | Set the fixed hyperparameters of the primitive. |\n", + "| `set_tunable_hyperparameters (params)` | `tunable_hyperparameters` | Set the tunable hyperparameters of the primitive. |\n", + "| `make_primitive_json` | | Return the JSON representation of the primitive. |\n", + "| `write_primitive_json` | `primitives_path`, `primitives_subfolders` | Write the JSON representation of the primitive to the given path. |" + ] + }, + { + "cell_type": "markdown", + "id": "36d5699a", + "metadata": {}, + "source": [ + "## Custom Primitives\n", + "\n", + "In certain cases, we may be interested in writing a custom primitive class to implement our own primitive function. \n", + "\n", + "Suppose that we have already written the `mean` function within the `sigpro.aggregations.amplitude.statistical` module:\n", + "\n", + "```python\n", + "import numpy as np \n", + "\n", + "...\n", + "\n", + "def mean(amplitude_values):\n", + " return np.mean(amplitude_values)\n", + " \n", + "...\n", + "```\n", + "We have two alternatives for creating a subclass of `Primitive`:\n", + "\n", + "1. Call `sigpro.contributing_primitive.make_primitive_class` while passing in any necessary additional parameters.\n", + "2. Write a subclass of the appropriate `Primitive` subclass directly and call `write_primitive_json` to record the primitive JSON.\n", + "\n", + "As we can see below, both approaches lead to the same primitive json annotation and functionality." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "818291e9", + "metadata": {}, + "outputs": [], + "source": [ + "# Imports\n", + "\n", + "from sigpro.primitive import AmplitudeAggregation\n", + "from sigpro.contributing_primitive import get_primitive_class" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "eda056fb", + "metadata": {}, + "outputs": [], + "source": [ + "# Approach 1: create the primitive using SigPro.\n", + "\n", + "mean_path = \"sigpro.aggregations.amplitude.statistical.mean\"\n", + "mean_outputs = [{'name': 'mean_value', 'type': 'float'}]\n", + "\n", + "# Since the JSON annotation already exists in SigPro, we call get_primitive_class instead of make_primitive_class.\n", + "# This is only for the example.\n", + "MeanDynamic = get_primitive_class(mean_path, 'aggregation', 'amplitude', primitive_outputs=mean_outputs)\n", + "mean_dynamic = MeanDynamic()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "13d7d637", + "metadata": {}, + "outputs": [], + "source": [ + "# Approach 2: write the primitive class directly.\n", + "\n", + "class MeanClass(AmplitudeAggregation):\n", + " def __init__(self):\n", + " super().__init__(\"sigpro.aggregations.amplitude.statistical.mean\")\n", + " self.set_primitive_outputs([{\"name\": \"mean_value\", \"type\": \"float\" }])\n", + "\n", + "mean_class = MeanClass()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "4a9ab309", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "True\n" + ] + } + ], + "source": [ + "# Check that JSON annotations are equal.\n", + "\n", + "print(mean_class.make_primitive_json() == mean_dynamic.make_primitive_json())" + ] + }, + { + "cell_type": "markdown", + "id": "09e14b2a", + "metadata": {}, + "source": [ + "## Pipelines\n", + "\n", + "While primitives can be quite useful on their own, the true power of `SigPro` arises in\n", + "the development of a feature engineering pipeline. These are represented by the abstract `sigpro.pipeline.Pipeline` class." + ] + }, + { + "cell_type": "markdown", + "id": "36314730", + "metadata": {}, + "source": [ + "In general, feature pipelines will apply a sequence of transformation primitives consecutively, followed by a single aggregation primitive, to generate a single given feature. In the simplest scenario, we have a single defined sequence of transformation primitives we would like to apply to a signal, as well as a set of aggregations to apply to the transformed signal. This can be done with a `LinearPipeline`, which we create with the `sigpro.pipeline.build_linear_pipeline` function." + ] + }, + { + "cell_type": "markdown", + "id": "564402df", + "metadata": {}, + "source": [ + "### Building Linear Pipelines" + ] + }, + { + "cell_type": "markdown", + "id": "f7cfff69", + "metadata": {}, + "source": [ + "Let's consider an example pipeline where we apply the `Identity` and `FFT` transformations and the `Std` and `Var` aggregations.\n", + "\n", + "First, we need to import all necessary modules and define the primitives we would like to use." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1aa88ea8", + "metadata": {}, + "outputs": [], + "source": [ + "# Imports\n", + "\n", + "from sigpro.basic_primitives import Identity, FFT, Std, Var\n", + "from sigpro.pipeline import build_linear_pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "5667b7f7", + "metadata": {}, + "outputs": [], + "source": [ + "transformations = [Identity(), FFT()]\n", + "aggregations = [Std(), Var()]" + ] + }, + { + "cell_type": "markdown", + "id": "8e5e6813", + "metadata": {}, + "source": [ + "To build a linear pipeline, simply pass in the list of transformations and aggregations." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "440e3802", + "metadata": {}, + "outputs": [], + "source": [ + "mypipeline = build_linear_pipeline(transformations, aggregations)" + ] + }, + { + "cell_type": "markdown", + "id": "c0d6198a", + "metadata": {}, + "source": [ + "### Inspecting Pipelines\n", + "\n", + "To better understand the contents of pipelines, we can call the `get_primitives` and `get_output_features` methods to obtain the list of primitives and output features, respectively, associated with the pipeline. In particular, each feature is represented as a string of primitives separated by a period `.`, representing the sequence of operations applied to the input signal, followed by the output name of the final aggregation." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "af47ef64", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[,\n", + " ,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Used Primitives\n", + "\n", + "used_primitives = mypipeline.get_primitives()\n", + "used_primitives" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1446912c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['identity.fft.std.std_value', 'identity.fft.var.var_value']" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "#Output features\n", + "\n", + "output_features = mypipeline.get_output_features()\n", + "output_features" + ] + }, + { + "cell_type": "markdown", + "id": "a88cf6a9", + "metadata": {}, + "source": [ + "### More Complex Pipelines\n", + "\n", + "In certain cases, we may wish to build more complex pipeline architectures. Such architectures are represented with the `LayerPipeline` subclass.\n", + "\n", + "`SigPro` provides the `build_tree_pipeline(transformation_layers, aggregation_layer)` method to build tree-shaped pipelines, which generate all features in the Cartesian product of the transformation layers and aggregation layer; in other words, any possible sequence of transformations and aggregation chosen one from each layer is represented in the final feature output." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "3344eeda", + "metadata": {}, + "outputs": [], + "source": [ + "# Import packages\n", + "from sigpro.basic_primitives import (\n", + " Identity, FFT, FFTReal, Mean, Kurtosis)\n", + "from sigpro.pipeline import build_tree_pipeline\n", + "\n", + "# Define primitive objects\n", + "identity_tfm = Identity().set_tag('id1') #\n", + "identity2_tfm = Identity().set_tag('id2') #Avoid duplicate tags\n", + "fft_tfm, fft_real_tfm = FFT(), FFTReal()\n", + "mean_agg, kurtosis_agg = Mean(), Kurtosis(bias=False)\n", + "\n", + "# Instantiate tree pipeline\n", + "tfmlayer1 = [identity_tfm, identity2_tfm]\n", + "tfmlayer2 = [fft_tfm, fft_real_tfm]\n", + "agglayer = [mean_agg, kurtosis_agg]\n", + "tree_pipeline = build_tree_pipeline([tfmlayer1, tfmlayer2], agglayer)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "90b1a120", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tree_pipeline.get_primitives()" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "c953f449", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['id1.fft.mean.mean_value',\n", + " 'id1.fft.kurtosis.kurtosis_value',\n", + " 'id1.fft_real.mean.mean_value',\n", + " 'id1.fft_real.kurtosis.kurtosis_value',\n", + " 'id2.fft.mean.mean_value',\n", + " 'id2.fft.kurtosis.kurtosis_value',\n", + " 'id2.fft_real.mean.mean_value',\n", + " 'id2.fft_real.kurtosis.kurtosis_value']" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tree_pipeline.get_output_features()" + ] + }, + { + "cell_type": "markdown", + "id": "ee027dfc", + "metadata": {}, + "source": [ + "`SigPro` allows for the generation of any arbitrary list of feature combination tuples with the `build_layer_pipeline(primitives, primitive_combinations)` method." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "fbf40758", + "metadata": {}, + "outputs": [], + "source": [ + "from sigpro.basic_primitives import (\n", + " BandMean, Identity, FFT, FFTReal, Mean, Kurtosis)\n", + "from sigpro.pipeline import build_layer_pipeline\n", + "\n", + "\n", + "p1, p2 = FFTReal().set_tag('fftr'), FFT()\n", + "p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2')\n", + "p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False)\n", + "p8 = Identity().set_tag('id3') # unused primitive\n", + "\n", + "all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8]\n", + "\n", + "features = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)]\n", + "\n", + "layer_pipeline = build_layer_pipeline(all_primitives, features)" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "5a045991", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ]" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "layer_pipeline.get_primitives()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "d23e0f68", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['fftr.id1.bm.value',\n", + " 'fftr.id1.mean.mean_value',\n", + " 'fft.id1.mean.mean_value',\n", + " 'fft.id2.mean.mean_value',\n", + " 'fft.id2.kurtosis.kurtosis_value']" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "layer_pipeline.get_output_features()" + ] + }, + { + "cell_type": "markdown", + "id": "aea62cec", + "metadata": {}, + "source": [ + "### Combining pipelines\n", + "\n", + "If we do not wish to specify the exact combination of features to produce a `LayerPipeline`, we can still customize our feature engineering using the `sigpro.pipeline.merge_pipelines` function. By passing in a list of pipelines, we can generate a single pipeline to generate all features produced by at least one feature input.\n", + "\n", + "For our example, we first initialize several primitive objects:" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "b5b15b07", + "metadata": {}, + "outputs": [], + "source": [ + "# Import\n", + "from sigpro.pipeline import merge_pipelines\n", + "\n", + "# Initialize some primitives\n", + "p1, p2 = FFTReal().set_tag('fftr'), FFT()\n", + "p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2')\n", + "p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False)\n", + "p8 = Identity().set_tag('id3') # unused primitive" + ] + }, + { + "cell_type": "markdown", + "id": "954c8eb0", + "metadata": {}, + "source": [ + "We next initialize three separate pipelines using the specified primitives and merge them into a single pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "11d94b77", + "metadata": {}, + "outputs": [], + "source": [ + "all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8]\n", + "\n", + "layer_combinations = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)]\n", + "\n", + "sub_pipeline1 = build_layer_pipeline(all_primitives, layer_combinations)\n", + "sub_pipeline2 = build_tree_pipeline([[p1, p2], [p3]], [p5])\n", + "sub_pipeline3 = build_linear_pipeline([p1, p4], [p6])\n", + "\n", + "merged_pipeline = merge_pipelines([sub_pipeline1, \n", + " sub_pipeline2,\n", + " sub_pipeline3])" + ] + }, + { + "cell_type": "markdown", + "id": "0b9a6c1a", + "metadata": {}, + "source": [ + "Lastly, we check that the merged pipeline indeed generates the union of all of the features of the sub-pipelines." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "a0648b36", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "True\n" + ] + } + ], + "source": [ + "expected_features = set(sub_pipeline1.get_output_features() + \n", + " sub_pipeline2.get_output_features() + \n", + " sub_pipeline3.get_output_features())\n", + "actual_features = set(merged_pipeline.get_output_features())\n", + "\n", + "print(expected_features == actual_features)" + ] + }, + { + "cell_type": "markdown", + "id": "822d7d5e", + "metadata": {}, + "source": [ + "## Applying a Pipeline with `process_signal`\n", + "\n", + "Once our pipeline is correctly defined, we apply the `process_signal` method to a demo dataset. Recall that `process_signal` is defined as follows:\n", + "\n", + "\n", + "```python\n", + "def process_signal(self, data=None, window=None, values_column_name='values',\n", + " time_index=None, groupby_index=None, feature_columns=None,\n", + " keep_columns=False, input_is_dataframe=True, **kwargs):\n", + " \n", + "\n", + "\t\t...\n", + "\t\treturn data, feature_columns\n", + "```\n", + "\n", + "`process_signal` accepts as input the following arguments:\n", + "\n", + "- `data (pd.Dataframe)` : Dataframe with a column containing signal values.\n", + "- `window (str)`: Duration of window size, e.g. ('1h').\n", + "- `vaues_column_name (str)`: Name of the column in `data` containing signal values.\n", + "- `time_index (str)`: Name of column in `data` that represents the time index.\n", + "- `groupby_index (str or list[str])`: List of column names to group together and take the window over.\n", + "- `feature_columns (list)`: List of columns from the input data that should be considered as features (and not dropped).\n", + "- `keep_columns (bool or list[str])`: Whether to keep non-feature columns in the output DataFrame or not. If a list of column names are passed, those columns are kept.\n", + "- `input_is_dataframe (bool)`: Whether the input data is a Dataframe. Used for MLBlocks integration.\n", + "\n", + "`process_signal` outputs the following:\n", + "\n", + "- `data (pd.Dataframe)`: Dataframe containing output feature values as constructed from the signal\n", + "- `feature_columns (list)`: list of (generated) feature names.\n", + "\n", + "We now apply our first pipeline `mypipeline` to a toy dataset in the `xvalues`, `yvalues` format. We will define our toy dataset as follows. " + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "4ef6ab59", + "metadata": {}, + "outputs": [], + "source": [ + "# Redefine mypipeline\n", + "\n", + "transformations = [Identity(), FFT()]\n", + "aggregations = [Std(), Var()]\n", + "\n", + "mypipeline = build_linear_pipeline(transformations, aggregations)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "20c341ab", + "metadata": {}, + "outputs": [], + "source": [ + "from sigpro.demo import get_demo_data" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "dbb7545a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
turbine_idsignal_idxvaluesyvaluessampling_frequency
0T001Sensor1_signal1[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...[0.43616983763682876, -0.17662312586241055, 0....1000.0
1T001Sensor1_signal1[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...[0.8023828754411122, -0.14122063493312714, -0....1000.0
2T001Sensor1_signal1[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...[-1.3143142430046044, -1.1055740033788437, -0....1000.0
3T001Sensor1_signal1[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...[-0.45981995520032104, -0.3255426061995603, -0...1000.0
4T001Sensor1_signal1[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...[-0.6380405111460377, -0.11924167777027689, 0....1000.0
\n", + "
" + ], + "text/plain": [ + " turbine_id signal_id \\\n", + "0 T001 Sensor1_signal1 \n", + "1 T001 Sensor1_signal1 \n", + "2 T001 Sensor1_signal1 \n", + "3 T001 Sensor1_signal1 \n", + "4 T001 Sensor1_signal1 \n", + "\n", + " xvalues \\\n", + "0 [2020-01-01 00:00:00, 2020-01-01 00:00:01, 202... \n", + "1 [2020-01-01 01:00:00, 2020-01-01 01:00:01, 202... \n", + "2 [2020-01-01 02:00:00, 2020-01-01 02:00:01, 202... \n", + "3 [2020-01-01 03:00:00, 2020-01-01 03:00:01, 202... \n", + "4 [2020-01-01 04:00:00, 2020-01-01 04:00:01, 202... \n", + "\n", + " yvalues sampling_frequency \n", + "0 [0.43616983763682876, -0.17662312586241055, 0.... 1000.0 \n", + "1 [0.8023828754411122, -0.14122063493312714, -0.... 1000.0 \n", + "2 [-1.3143142430046044, -1.1055740033788437, -0.... 1000.0 \n", + "3 [-0.45981995520032104, -0.3255426061995603, -0... 1000.0 \n", + "4 [-0.6380405111460377, -0.11924167777027689, 0.... 1000.0 " + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "demo_dataset = get_demo_data()\n", + "demo_dataset['xvalues'] = demo_dataset['timestamp'].copy()\n", + "demo_dataset['yvalues'] = demo_dataset['values'].copy()\n", + "demo_dataset = (demo_dataset.set_index('timestamp').resample(rule = '60T').apply(lambda x: x.to_list())).reset_index()\n", + "demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']] = demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']].apply(lambda x: x[0])\n", + "demo_dataset = demo_dataset[['turbine_id', 'signal_id', 'xvalues', 'yvalues', 'sampling_frequency']]\n", + "demo_dataset.head()" + ] + }, + { + "cell_type": "markdown", + "id": "cefb58a1", + "metadata": {}, + "source": [ + "We now call the `process_signal` method using `mypipeline`." + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "843a4647", + "metadata": {}, + "outputs": [], + "source": [ + "processed_data, feature_columns = mypipeline.process_signal(demo_dataset,\n", + " values_column_name='yvalues',\n", + " time_index = 'xvalues',\n", + " keep_columns = True )" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "453a2401", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
turbine_idsignal_idxvaluesyvaluessampling_frequencyidentity.fft.std.std_valueidentity.fft.var.var_value
0T001Sensor1_signal1[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...[0.43616983763682876, -0.17662312586241055, 0....1000.014.444991208.657778
1T001Sensor1_signal1[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...[0.8023828754411122, -0.14122063493312714, -0....1000.012.326223151.935764
2T001Sensor1_signal1[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...[-1.3143142430046044, -1.1055740033788437, -0....1000.012.051415145.236607
3T001Sensor1_signal1[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...[-0.45981995520032104, -0.3255426061995603, -0...1000.010.657243113.576820
4T001Sensor1_signal1[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...[-0.6380405111460377, -0.11924167777027689, 0....1000.012.640728159.787993
\n", + "
" + ], + "text/plain": [ + " turbine_id signal_id \\\n", + "0 T001 Sensor1_signal1 \n", + "1 T001 Sensor1_signal1 \n", + "2 T001 Sensor1_signal1 \n", + "3 T001 Sensor1_signal1 \n", + "4 T001 Sensor1_signal1 \n", + "\n", + " xvalues \\\n", + "0 [2020-01-01 00:00:00, 2020-01-01 00:00:01, 202... \n", + "1 [2020-01-01 01:00:00, 2020-01-01 01:00:01, 202... \n", + "2 [2020-01-01 02:00:00, 2020-01-01 02:00:01, 202... \n", + "3 [2020-01-01 03:00:00, 2020-01-01 03:00:01, 202... \n", + "4 [2020-01-01 04:00:00, 2020-01-01 04:00:01, 202... \n", + "\n", + " yvalues sampling_frequency \\\n", + "0 [0.43616983763682876, -0.17662312586241055, 0.... 1000.0 \n", + "1 [0.8023828754411122, -0.14122063493312714, -0.... 1000.0 \n", + "2 [-1.3143142430046044, -1.1055740033788437, -0.... 1000.0 \n", + "3 [-0.45981995520032104, -0.3255426061995603, -0... 1000.0 \n", + "4 [-0.6380405111460377, -0.11924167777027689, 0.... 1000.0 \n", + "\n", + " identity.fft.std.std_value identity.fft.var.var_value \n", + "0 14.444991 208.657778 \n", + "1 12.326223 151.935764 \n", + "2 12.051415 145.236607 \n", + "3 10.657243 113.576820 \n", + "4 12.640728 159.787993 " + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "processed_data.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "14d42ee9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['identity.fft.std.std_value', 'identity.fft.var.var_value']" + ] + }, + "execution_count": 30, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "feature_columns" + ] + }, + { + "cell_type": "markdown", + "id": "8ec7792a", + "metadata": {}, + "source": [ + "Success! We have managed to apply the primitives to generate features on the input dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03440ac", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tutorials/primitives_pipelines_tutorial.md b/tutorials/primitives_pipelines_tutorial.md new file mode 100644 index 0000000..b3a8c2d --- /dev/null +++ b/tutorials/primitives_pipelines_tutorial.md @@ -0,0 +1,754 @@ +# Using SigPro + +In this notebook, we will walk through the process of defining and using primitives in SigPro. + +## Primitives +Feature engineering in `SigPro` centers around the **primitive**. In `SigPro`, primitives fall under two main types: transformations, and aggregations. Each type of primitive is further broken down into several primitive subtypes. + +## Inheritance + +All primitives are instances of the `sigpro.primitive.Primitive` base class. Furthermore, depending on their respective types and subtypes, each primitive inherits from a specific subclass. For example, a frequency-time transformation primitive would inherit from `sigpro.primitive.FrequencyTimeTransformation`, while an amplitude aggrigation would inherit from `sigpro.primitive.AmplitudeAggregation`. + +### Initializing Primitives + +Let's view a simple primitive and see how to use it. + + +```python +# Import primitive +from sigpro.basic_primitives import Mean + +# Initialize the primitive object +mean_primitive = Mean() + +mean_primitive +``` + + + + + + + + +The `Mean` primitive we just defined is an example of an `AmplitudeAggregation`; in other words, its type is `'aggregation'`, and its subtype is `'amplitude'`. To see this, we call its `get_type_subtype` method: + + +```python +# Check the type and subtype of mean_primitive +mean_primitive.get_type_subtype() +``` + + + + + ('aggregation', 'amplitude') + + + +By default, `mean_primitive` will be given the tag `mean`. To observe or change this tag, call the `get_tag` and `set_tag` methods, respectively. + + +```python +#Re-initialize mean_primitive +mean_primitive = Mean() + +# Observe the current tag +print('Old tag: ', mean_primitive.get_tag()) + +# Re-tag mean_primitive with a custom string +mean_primitive.set_tag('mean_tag') + +# Observe the new tag +print('New tag: ', mean_primitive.get_tag()) + +# Set the tag of mean_primitive back to mean +mean_primitive = mean_primitive.set_tag('mean') + +# Observe the new tag +print('Last tag: ', mean_primitive.get_tag()) +``` + + Old tag: mean + New tag: mean_tag + Last tag: mean + + +Tagging primitives will be useful when building pipelines. + +### Primitives with hyperparameters + +In our previous example, the `Mean` primitive did not offer any hyperparameters to set by the user. Let's consider the `FrequencyBand` primitive, which accepts two hyperparameters as input arguments: `low` and `high`. + +To initialize the primitive, we pass in the `low` and `high` values as (keyword) arguments. + + +```python +# Import primitive +from sigpro.basic_primitives import FrequencyBand + +# Initialize the primitive object +fb_primitive = FrequencyBand(low = 10, high = 20) + +fb_primitive +``` + + + + + + + + +We can preview the hyperparameters with the `get_hyperparam_dict` method. + + +```python +fb_primitive.get_hyperparam_dict() +``` + + + + + {'name': 'frequency_band', + 'primitive': 'sigpro.transformations.frequency.band.frequency_band', + 'init_params': {'low': 10, 'high': 20}} + + + +### Primitive JSON Annotations + +Each SigPro `Primitive` is accompanied by a corresponding JSON annotation. To preview the JSON annotation, we use the `make_primitive_json` method. + + +```python +import json + +# More readable output +print(json.dumps(fb_primitive.make_primitive_json(), indent = 2)) +``` + + { + "name": "sigpro.transformations.frequency.band.frequency_band", + "primitive": "sigpro.transformations.frequency.band.frequency_band", + "classifiers": { + "type": "transformation", + "subtype": "frequency" + }, + "produce": { + "args": [ + { + "name": "amplitude_values", + "type": "numpy.ndarray" + }, + { + "name": "frequency_values", + "type": "numpy.ndarray" + } + ], + "output": [ + { + "name": "amplitude_values", + "type": "numpy.ndarray" + }, + { + "name": "frequency_values", + "type": "numpy.ndarray" + } + ] + }, + "hyperparameters": { + "fixed": { + "low": { + "type": "int" + }, + "high": { + "type": "int" + } + }, + "tunable": {} + } + } + + +### Primitive Interface + +We summarize the public interface of the `sigpro.primitive.Primitive` class below. + +| Method name | Additional arguments | Description | +| --- | --- | --- | +| `get_name` | | Return the name of the primitive. | +| `get_tag` | | Return the user-given tag of the primitive. | +| `get_inputs` | | Return the inputs of the primitive. | +| `get_outputs` | | Return the outputs of the primitive. | +| `get_type_subtype` | | Return the type and subtype of the primitive. | +| `get_hyperparam_dict` | | Return the hyperparameters of the primitive. | +| `get_context_arguments` | | Return the context arguments of the primitive. | +| `get_fixed_hyperparameters` | | Return the fixed hyperparameters of the primitive. | +| `get_tunable_hyperparameters` | | Return the tunable hyperparameters of the primitive. | +| `set_tag` | `tag` | Set the tag of the primitive and return the primitive itself. | +| `set_context_arguments` | `context_arguments` | Set the context arguments of the primitive to args. | +| `set_fixed_hyperparameters` | `fixed_hyperparameters` | Set the fixed hyperparameters of the primitive. | +| `set_tunable_hyperparameters (params)` | `tunable_hyperparameters` | Set the tunable hyperparameters of the primitive. | +| `make_primitive_json` | | Return the JSON representation of the primitive. | +| `write_primitive_json` | `primitives_path`, `primitives_subfolders` | Write the JSON representation of the primitive to the given path. | + +## Custom Primitives + +In certain cases, we may be interested in writing a custom primitive class to implement our own primitive function. + +Suppose that we have already written the `mean` function within the `sigpro.aggregations.amplitude.statistical` module: + +```python +import numpy as np + +... + +def mean(amplitude_values): + return np.mean(amplitude_values) + +... +``` +We have two alternatives for creating a subclass of `Primitive`: + +1. Call `sigpro.contributing_primitive.make_primitive_class` while passing in any necessary additional parameters. +2. Write a subclass of the appropriate `Primitive` subclass directly and call `write_primitive_json` to record the primitive JSON. + +As we can see below, both approaches lead to the same primitive json annotation and functionality. + + +```python +# Imports + +from sigpro.primitive import AmplitudeAggregation +from sigpro.contributing_primitive import get_primitive_class +``` + + +```python +# Approach 1: create the primitive using SigPro. + +mean_path = "sigpro.aggregations.amplitude.statistical.mean" +mean_outputs = [{'name': 'mean_value', 'type': 'float'}] + +# Since the JSON annotation already exists in SigPro, we call get_primitive_class instead of make_primitive_class. +# This is only for the example. +MeanDynamic = get_primitive_class(mean_path, 'aggregation', 'amplitude', primitive_outputs=mean_outputs) +mean_dynamic = MeanDynamic() +``` + + +```python +# Approach 2: write the primitive class directly. + +class MeanClass(AmplitudeAggregation): + def __init__(self): + super().__init__("sigpro.aggregations.amplitude.statistical.mean") + self.set_primitive_outputs([{"name": "mean_value", "type": "float" }]) + +mean_class = MeanClass() +``` + + +```python +# Check that JSON annotations are equal. + +print(mean_class.make_primitive_json() == mean_dynamic.make_primitive_json()) +``` + + True + + +## Pipelines + +While primitives can be quite useful on their own, the true power of `SigPro` arises in +the development of a feature engineering pipeline. These are represented by the abstract `sigpro.pipeline.Pipeline` class. + +In general, feature pipelines will apply a sequence of transformation primitives consecutively, followed by a single aggregation primitive, to generate a single given feature. In the simplest scenario, we have a single defined sequence of transformation primitives we would like to apply to a signal, as well as a set of aggregations to apply to the transformed signal. This can be done with a `LinearPipeline`, which we create with the `sigpro.pipeline.build_linear_pipeline` function. + +### Building Linear Pipelines + +Let's consider an example pipeline where we apply the `Identity` and `FFT` transformations and the `Std` and `Var` aggregations. + +First, we need to import all necessary modules and define the primitives we would like to use. + + +```python +# Imports + +from sigpro.basic_primitives import Identity, FFT, Std, Var +from sigpro.pipeline import build_linear_pipeline +``` + + +```python +transformations = [Identity(), FFT()] +aggregations = [Std(), Var()] +``` + +To build a linear pipeline, simply pass in the list of transformations and aggregations. + + +```python +mypipeline = build_linear_pipeline(transformations, aggregations) +``` + +### Inspecting Pipelines + +To better understand the contents of pipelines, we can call the `get_primitives` and `get_output_features` methods to obtain the list of primitives and output features, respectively, associated with the pipeline. In particular, each feature is represented as a string of primitives separated by a period `.`, representing the sequence of operations applied to the input signal, followed by the output name of the final aggregation. + + +```python +# Used Primitives + +used_primitives = mypipeline.get_primitives() +used_primitives +``` + + + + + [, + , + , + ] + + + + +```python +#Output features + +output_features = mypipeline.get_output_features() +output_features +``` + + + + + ['identity.fft.std.std_value', 'identity.fft.var.var_value'] + + + +### More Complex Pipelines + +In certain cases, we may wish to build more complex pipeline architectures. Such architectures are represented with the `LayerPipeline` subclass. + +`SigPro` provides the `build_tree_pipeline(transformation_layers, aggregation_layer)` method to build tree-shaped pipelines, which generate all features in the Cartesian product of the transformation layers and aggregation layer; in other words, any possible sequence of transformations and aggregation chosen one from each layer is represented in the final feature output. + + +```python +# Import packages +from sigpro.basic_primitives import ( + Identity, FFT, FFTReal, Mean, Kurtosis) +from sigpro.pipeline import build_tree_pipeline + +# Define primitive objects +identity_tfm = Identity().set_tag('id1') # +identity2_tfm = Identity().set_tag('id2') #Avoid duplicate tags +fft_tfm, fft_real_tfm = FFT(), FFTReal() +mean_agg, kurtosis_agg = Mean(), Kurtosis(bias=False) + +# Instantiate tree pipeline +tfmlayer1 = [identity_tfm, identity2_tfm] +tfmlayer2 = [fft_tfm, fft_real_tfm] +agglayer = [mean_agg, kurtosis_agg] +tree_pipeline = build_tree_pipeline([tfmlayer1, tfmlayer2], agglayer) +``` + + +```python +tree_pipeline.get_primitives() +``` + + + + + [, + , + , + , + , + ] + + + + +```python +tree_pipeline.get_output_features() +``` + + + + + ['id1.fft.mean.mean_value', + 'id1.fft.kurtosis.kurtosis_value', + 'id1.fft_real.mean.mean_value', + 'id1.fft_real.kurtosis.kurtosis_value', + 'id2.fft.mean.mean_value', + 'id2.fft.kurtosis.kurtosis_value', + 'id2.fft_real.mean.mean_value', + 'id2.fft_real.kurtosis.kurtosis_value'] + + + +`SigPro` allows for the generation of any arbitrary list of feature combination tuples with the `build_layer_pipeline(primitives, primitive_combinations)` method. + + +```python +from sigpro.basic_primitives import ( + BandMean, Identity, FFT, FFTReal, Mean, Kurtosis) +from sigpro.pipeline import build_layer_pipeline + + +p1, p2 = FFTReal().set_tag('fftr'), FFT() +p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') +p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) +p8 = Identity().set_tag('id3') # unused primitive + +all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8] + +features = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)] + +layer_pipeline = build_layer_pipeline(all_primitives, features) +``` + + +```python +layer_pipeline.get_primitives() +``` + + + + + [, + , + , + , + , + , + , + ] + + + + +```python +layer_pipeline.get_output_features() +``` + + + + + ['fftr.id1.bm.value', + 'fftr.id1.mean.mean_value', + 'fft.id1.mean.mean_value', + 'fft.id2.mean.mean_value', + 'fft.id2.kurtosis.kurtosis_value'] + + + +### Combining pipelines + +If we do not wish to specify the exact combination of features to produce a `LayerPipeline`, we can still customize our feature engineering using the `sigpro.pipeline.merge_pipelines` function. By passing in a list of pipelines, we can generate a single pipeline to generate all features produced by at least one feature input. + +For our example, we first initialize several primitive objects: + + +```python +# Import +from sigpro.pipeline import merge_pipelines + +# Initialize some primitives +p1, p2 = FFTReal().set_tag('fftr'), FFT() +p3, p4 = Identity().set_tag('id1'), Identity().set_tag('id2') +p5, p6, p7 = BandMean(200, 50000).set_tag('bm'), Mean(), Kurtosis(fisher=False) +p8 = Identity().set_tag('id3') # unused primitive +``` + +We next initialize three separate pipelines using the specified primitives and merge them into a single pipeline. + + +```python +all_primitives = [p1, p2, p3, p4, p5, p6, p7, p8] + +layer_combinations = [(p1, p3, p5), (p1, p3, p6), (p2, p3, p6), (p2, p4, p6), (p2, p4, p7)] + +sub_pipeline1 = build_layer_pipeline(all_primitives, layer_combinations) +sub_pipeline2 = build_tree_pipeline([[p1, p2], [p3]], [p5]) +sub_pipeline3 = build_linear_pipeline([p1, p4], [p6]) + +merged_pipeline = merge_pipelines([sub_pipeline1, + sub_pipeline2, + sub_pipeline3]) +``` + +Lastly, we check that the merged pipeline indeed generates the union of all of the features of the sub-pipelines. + + +```python +expected_features = set(sub_pipeline1.get_output_features() + + sub_pipeline2.get_output_features() + + sub_pipeline3.get_output_features()) +actual_features = set(merged_pipeline.get_output_features()) + +print(expected_features == actual_features) +``` + + True + + +## Applying a Pipeline with `process_signal` + +Once our pipeline is correctly defined, we apply the `process_signal` method to a demo dataset. Recall that `process_signal` is defined as follows: + + +```python +def process_signal(self, data=None, window=None, values_column_name='values', + time_index=None, groupby_index=None, feature_columns=None, + keep_columns=False, input_is_dataframe=True, **kwargs): + + + ... + return data, feature_columns +``` + +`process_signal` accepts as input the following arguments: + +- `data (pd.Dataframe)` : Dataframe with a column containing signal values. +- `window (str)`: Duration of window size, e.g. ('1h'). +- `vaues_column_name (str)`: Name of the column in `data` containing signal values. +- `time_index (str)`: Name of column in `data` that represents the time index. +- `groupby_index (str or list[str])`: List of column names to group together and take the window over. +- `feature_columns (list)`: List of columns from the input data that should be considered as features (and not dropped). +- `keep_columns (bool or list[str])`: Whether to keep non-feature columns in the output DataFrame or not. If a list of column names are passed, those columns are kept. +- `input_is_dataframe (bool)`: Whether the input data is a Dataframe. Used for MLBlocks integration. + +`process_signal` outputs the following: + +- `data (pd.Dataframe)`: Dataframe containing output feature values as constructed from the signal +- `feature_columns (list)`: list of (generated) feature names. + +We now apply our first pipeline `mypipeline` to a toy dataset in the `xvalues`, `yvalues` format. We will define our toy dataset as follows. + + +```python +# Redefine mypipeline + +transformations = [Identity(), FFT()] +aggregations = [Std(), Var()] + +mypipeline = build_linear_pipeline(transformations, aggregations) +``` + + +```python +from sigpro.demo import get_demo_data +``` + + +```python +demo_dataset = get_demo_data() +demo_dataset['xvalues'] = demo_dataset['timestamp'].copy() +demo_dataset['yvalues'] = demo_dataset['values'].copy() +demo_dataset = (demo_dataset.set_index('timestamp').resample(rule = '60T').apply(lambda x: x.to_list())).reset_index() +demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']] = demo_dataset[['turbine_id', 'signal_id', 'sampling_frequency']].apply(lambda x: x[0]) +demo_dataset = demo_dataset[['turbine_id', 'signal_id', 'xvalues', 'yvalues', 'sampling_frequency']] +demo_dataset.head() +``` + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
turbine_idsignal_idxvaluesyvaluessampling_frequency
0T001Sensor1_signal1[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...[0.43616983763682876, -0.17662312586241055, 0....1000.0
1T001Sensor1_signal1[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...[0.8023828754411122, -0.14122063493312714, -0....1000.0
2T001Sensor1_signal1[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...[-1.3143142430046044, -1.1055740033788437, -0....1000.0
3T001Sensor1_signal1[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...[-0.45981995520032104, -0.3255426061995603, -0...1000.0
4T001Sensor1_signal1[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...[-0.6380405111460377, -0.11924167777027689, 0....1000.0
+
+ + + +We now call the `process_signal` method using `mypipeline`. + + +```python +processed_data, feature_columns = mypipeline.process_signal(demo_dataset, + values_column_name='yvalues', + time_index = 'xvalues', + keep_columns = True ) +``` + + +```python +processed_data.head() +``` + + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
turbine_idsignal_idxvaluesyvaluessampling_frequencyidentity.fft.std.std_valueidentity.fft.var.var_value
0T001Sensor1_signal1[2020-01-01 00:00:00, 2020-01-01 00:00:01, 202...[0.43616983763682876, -0.17662312586241055, 0....1000.014.444991208.657778
1T001Sensor1_signal1[2020-01-01 01:00:00, 2020-01-01 01:00:01, 202...[0.8023828754411122, -0.14122063493312714, -0....1000.012.326223151.935764
2T001Sensor1_signal1[2020-01-01 02:00:00, 2020-01-01 02:00:01, 202...[-1.3143142430046044, -1.1055740033788437, -0....1000.012.051415145.236607
3T001Sensor1_signal1[2020-01-01 03:00:00, 2020-01-01 03:00:01, 202...[-0.45981995520032104, -0.3255426061995603, -0...1000.010.657243113.576820
4T001Sensor1_signal1[2020-01-01 04:00:00, 2020-01-01 04:00:01, 202...[-0.6380405111460377, -0.11924167777027689, 0....1000.012.640728159.787993
+
+ + + + +```python +feature_columns +``` + + + + + ['identity.fft.std.std_value', 'identity.fft.var.var_value'] + + + +Success! We have managed to apply the primitives to generate features on the input dataset. + + +```python + +```