diff --git a/notebooks/07_pipelines.ipynb b/notebooks/07_pipelines.ipynb new file mode 100644 index 00000000..694bf186 --- /dev/null +++ b/notebooks/07_pipelines.ipynb @@ -0,0 +1,217 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "b773bf8e-c420-44e1-80a6-99f75dd12268", + "metadata": {}, + "source": [ + "## Pipelines and Transformers\n", + "\n", + "This notebook showcases the current version of data processing pipelines in CapyMOA. \n", + "\n", + "* Includes an example of how preprocessing can be accomplished via pipelines and transformers.\n", + "* Transformers transform an instance, e.g., using standardization, normalization, etc.\n", + "* Pipelines bundle transformers and can also act as classifiers or regressors\n", + "\n", + "*Please note that this feature is still under development; some functionality might not yet be available or change in future releases.*\n", + "\n", + "**notebook last updated on 24/05/2024**" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "55d070de-8697-4f98-a11b-eab4e3d5c281", + "metadata": {}, + "source": [ + "### 1. Running onlineBagging without any preprocessing\n", + "\n", + "First, let us have a look at a simple test-then-train classification example without pipelines. \n", + "- We loop over the instances of the data stream\n", + "- make a prediction,\n", + "- update the evaluator with the prediction and label\n", + "- and then train the classifier on the instance." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "14681f54-23a1-4f93-9145-abf484c91c54", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "79.05190677966102" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "## Test-then-train loop\n", + "from capymoa.stream import stream_from_file\n", + "from capymoa.classifier import OnlineBagging\n", + "from capymoa.evaluation import ClassificationEvaluator\n", + "\n", + "## Opening a file as a stream\n", + "DATA_PATH = \"../data/\"\n", + "elec_stream = stream_from_file(path_to_csv_or_arff=DATA_PATH+\"electricity.csv\")\n", + "\n", + "# Creating a learner\n", + "ob_learner = OnlineBagging(schema=elec_stream.get_schema(), ensemble_size=5)\n", + "\n", + "# Creating the evaluator\n", + "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n", + "\n", + "while elec_stream.has_more_instances():\n", + " instance = elec_stream.next_instance()\n", + " \n", + " prediction = ob_learner.predict(instance)\n", + " ob_evaluator.update(instance.y_index, prediction)\n", + " ob_learner.train(instance)\n", + "\n", + "ob_evaluator.accuracy()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "0c1360ef-0583-4c87-8645-1e2d701fffca", + "metadata": {}, + "source": [ + "### 2. Online Bagging using pipelines and transformers\n", + "\n", + "If we want to perform some preprocessing, such as normalization or feature transformation, or a combination of both, we can chain multiple such `Transformer`s within a pipeline. The last step of a pipeline is a learner, such as capymoa classifier or regressor.\n", + "\n", + "Similar as classifiers and regressors, pipelines support `train` and `test`. Hence, we can use them in the same way as we would use other capymoa learners. Internally, the pipeline object passes an incoming instance from one transformer to the next. It then returns the prediction of the classifier / regressor using the transformed instance.\n", + "\n", + "Creating a pipeline consists of the following steps:\n", + "1. Create a stream instance\n", + "2. Initialize the transformers\n", + "3. Initialize the learner\n", + "4. Create the pipeline. Here, we use a `ClassifierPipeline`\n", + "5. Use the pipeline the same way as any other learner." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ae9bb646-e0d1-4de6-b5a1-cff0f0a1b172", + "metadata": {}, + "outputs": [ + { + "ename": "ImportError", + "evalue": "Failed to import 'moa.streams.FilteredQueueStream'", + "output_type": "error", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[1;31mException\u001b[0m Traceback (most recent call last)", + "File \u001b[1;32morg.jpype.JPypeContext.java:-1\u001b[0m, in \u001b[0;36morg.jpype.JPypeContext.callMethod\u001b[1;34m()\u001b[0m\n", + "\u001b[1;31mException\u001b[0m: Java Exception", + "\nThe above exception was the direct cause of the following exception:\n", + "\u001b[1;31mjava.lang.ClassNotFoundException\u001b[0m Traceback (most recent call last)", + "File \u001b[1;32m~\\.virtualenvs\\CapyMOA-pLr6U80W\\Lib\\site-packages\\jpype\\imports.py:195\u001b[0m, in \u001b[0;36m_JImportLoader.find_spec\u001b[1;34m(self, name, path, target)\u001b[0m\n\u001b[0;32m 193\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m 194\u001b[0m \u001b[38;5;66;03m# Use forname because it give better diagnostics\u001b[39;00m\n\u001b[1;32m--> 195\u001b[0m \u001b[38;5;28mcls\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[43m_jpype\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_java_lang_Class\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mforName\u001b[49m\u001b[43m(\u001b[49m\u001b[43mname\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m_jpype\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mJPypeClassLoader\u001b[49m\u001b[43m)\u001b[49m\n\u001b[0;32m 197\u001b[0m \u001b[38;5;66;03m# This code only is hit if an error was not thrown\u001b[39;00m\n", + "\u001b[1;31mjava.lang.ClassNotFoundException\u001b[0m: java.lang.ClassNotFoundException: moa.streams.FilteredQueueStream", + "\nThe above exception was the direct cause of the following exception:\n", + "\u001b[1;31mImportError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[2], line 1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstream\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mpreprocessing\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m MOATransformer\n\u001b[0;32m 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstream\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mpreprocessing\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m ClassifierPipeline\n\u001b[0;32m 3\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstream\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Stream\n", + "File \u001b[1;32m~\\Documents\\code\\CapyMOA\\src\\capymoa\\stream\\preprocessing\\__init__.py:1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mpipeline\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m (\n\u001b[0;32m 2\u001b[0m BasePipeline, ClassifierPipeline, RegressorPipeline\n\u001b[0;32m 3\u001b[0m )\n\u001b[0;32m 4\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtransformer\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m (\n\u001b[0;32m 5\u001b[0m Transformer, MOATransformer\n\u001b[0;32m 6\u001b[0m )\n\u001b[0;32m 8\u001b[0m __all__ \u001b[38;5;241m=\u001b[39m [\n\u001b[0;32m 9\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mBasePipeline\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[0;32m 10\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mClassifierPipeline\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m (...)\u001b[0m\n\u001b[0;32m 13\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mMOATransformer\u001b[39m\u001b[38;5;124m\"\u001b[39m\n\u001b[0;32m 14\u001b[0m ]\n", + "File \u001b[1;32m~\\Documents\\code\\CapyMOA\\src\\capymoa\\stream\\preprocessing\\pipeline.py:7\u001b[0m\n\u001b[0;32m 5\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mbase\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Classifier, Regressor\n\u001b[0;32m 6\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01minstance\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m LabeledInstance, Instance, RegressionInstance\n\u001b[1;32m----> 7\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstream\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mpreprocessing\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtransformer\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Transformer\n\u001b[0;32m 8\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtype_alias\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m LabelProbabilities, LabelIndex, TargetValue\n\u001b[0;32m 11\u001b[0m \u001b[38;5;28;01mclass\u001b[39;00m \u001b[38;5;21;01mBasePipeline\u001b[39;00m:\n", + "File \u001b[1;32m~\\Documents\\code\\CapyMOA\\src\\capymoa\\stream\\preprocessing\\transformer.py:7\u001b[0m\n\u001b[0;32m 5\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstream\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Schema, Stream\n\u001b[0;32m 6\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcapymoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01minstance\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Instance\n\u001b[1;32m----> 7\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mmoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstreams\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m FilteredQueueStream\n\u001b[0;32m 8\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mmoa\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mstreams\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mfilters\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m StreamFilter\n\u001b[0;32m 11\u001b[0m \u001b[38;5;28;01mclass\u001b[39;00m \u001b[38;5;21;01mTransformer\u001b[39;00m(ABC):\n", + "File \u001b[1;32m~\\.virtualenvs\\CapyMOA-pLr6U80W\\Lib\\site-packages\\jpype\\imports.py:203\u001b[0m, in \u001b[0;36m_JImportLoader.find_spec\u001b[1;34m(self, name, path, target)\u001b[0m\n\u001b[0;32m 201\u001b[0m \u001b[38;5;66;03m# Not found is acceptable\u001b[39;00m\n\u001b[0;32m 202\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m ex:\n\u001b[1;32m--> 203\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mImportError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mFailed to import \u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;241m%\u001b[39m name) \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mex\u001b[39;00m\n\u001b[0;32m 205\u001b[0m \u001b[38;5;66;03m# Import the java module\u001b[39;00m\n\u001b[0;32m 206\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _ModuleSpec(name, \u001b[38;5;28mself\u001b[39m)\n", + "\u001b[1;31mImportError\u001b[0m: Failed to import 'moa.streams.FilteredQueueStream'" + ] + } + ], + "source": [ + "from capymoa.stream.preprocessing import MOATransformer\n", + "from capymoa.stream.preprocessing import ClassifierPipeline\n", + "from capymoa.stream import Stream\n", + "from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n", + "from moa.streams import FilteredStream\n", + "\n", + "# Open the stream from an ARFF file\n", + "elec_stream = stream_from_file(path_to_csv_or_arff=DATA_PATH+\"electricity.arff\")\n", + "\n", + "# Creating the transformer\n", + "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", + "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", + "\n", + "# Creating a learner\n", + "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", + "\n", + "# Creating and populating the pipeline\n", + "pipeline = ClassifierPipeline(transformers=[normalisation_transformer],\n", + " learner=ob_learner)\n", + "\n", + "# Alternative:\n", + "# pipeline = ClassifierPipeline()\n", + "# pipeline.add_transformer(normalization_transformer)\n", + "# pipeline.add_transformer(add_noise_transformer)\n", + "# pipeline.set_learner(ob_learner)\n", + "\n", + "# Creating the evaluator\n", + "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) #TODO: Change to transformer.get_schema() to pipeline.get_schema() or something like that.\n", + "\n", + "while elec_stream.has_more_instances():\n", + " instance = elec_stream.next_instance()\n", + " prediction = pipeline.predict(instance)\n", + " ob_evaluator.update(instance.y_index, prediction)\n", + " pipeline.train(instance)\n", + "\n", + "ob_evaluator.accuracy()" + ] + }, + { + "cell_type": "markdown", + "id": "5f747c04-59ab-41cb-87ca-3d66ae75731a", + "metadata": {}, + "source": [ + "Last, we can also get a textual representation of the pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c11f5b39-bf53-496e-b42c-25f89458ff03", + "metadata": {}, + "outputs": [], + "source": [ + "str(pipeline)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f3c8271b-bbb7-4ca9-97f5-fb41e27ec4fd", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/Pipelines.ipynb b/notebooks/Pipelines.ipynb new file mode 100644 index 00000000..3ee3ff0c --- /dev/null +++ b/notebooks/Pipelines.ipynb @@ -0,0 +1,206 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "b773bf8e-c420-44e1-80a6-99f75dd12268", + "metadata": {}, + "source": [ + "## Pipelines and Transformers\n", + "\n", + "* Includes an example of how preprocessing can be accomplished via pipelines and transformers.\n", + "* Transformers transform an instance, e.g., using standardization, normalization, etc.\n", + "* Pipelines bundle transformers and can also act as classifiers or regressors \n", + "\n", + "**notebook last updated on 04/04/2024**" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "55d070de-8697-4f98-a11b-eab4e3d5c281", + "metadata": {}, + "source": [ + "### 1. Running onlineBagging without any preprocessing" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "14681f54-23a1-4f93-9145-abf484c91c54", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "capymoa_root: C:\\Users\\heyden\\Documents\\code\\CapyMOA\\src\\capymoa\n", + "MOA jar path location (config.ini): C:\\Users\\heyden\\Documents\\code\\CapyMOA\\src\\capymoa\\jar\\moa.jar\n", + "JVM Location (system): \n", + "JAVA_HOME: C:\\Program Files (x86)\\Java\\jre-1.8\n", + "JVM args: ['-Xmx8g', '-Xss10M']\n", + "Sucessfully started the JVM and added MOA jar to the class path\n" + ] + }, + { + "data": { + "text/plain": [ + "78.57079802259888" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "## Test-then-train loop\n", + "from capymoa.stream import stream_from_file\n", + "from capymoa.learner.classifier import OnlineBagging\n", + "from capymoa.evaluation import ClassificationEvaluator\n", + "\n", + "## Opening a file as a stream\n", + "DATA_PATH = \"../data/\"\n", + "elec_stream = stream_from_file(path_to_csv_or_arff=DATA_PATH+\"electricity.csv\")\n", + "\n", + "# Creating a learner\n", + "ob_learner = OnlineBagging(schema=elec_stream.get_schema(), ensemble_size=5)\n", + "\n", + "# Creating the evaluator\n", + "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n", + "\n", + "while elec_stream.has_more_instances():\n", + " instance = elec_stream.next_instance()\n", + " \n", + " prediction = ob_learner.predict(instance)\n", + " ob_evaluator.update(instance.y_index, prediction)\n", + " ob_learner.train(instance)\n", + "\n", + "ob_evaluator.accuracy()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "0c1360ef-0583-4c87-8645-1e2d701fffca", + "metadata": {}, + "source": [ + "### 2. Online Bagging using pipelines and transformers\n", + "\n", + "Creating a pipeline consists of the following steps:\n", + "1. Create a stream instance\n", + "2. Initialize the transformers\n", + "3. Initialize the learner\n", + "4. Create the pipeline. Here, we use a `ClassifierPipeline`\n", + "5. Use the pipeline the same way as any other learner." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ae9bb646-e0d1-4de6-b5a1-cff0f0a1b172", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "74.70868644067797" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from capymoa.stream.preprocessing import MOATransformer\n", + "from capymoa.stream.preprocessing import ClassifierPipeline\n", + "from capymoa.stream import Stream\n", + "from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n", + "from moa.streams import FilteredStream\n", + "\n", + "# Open the stream from an ARFF file\n", + "elec_stream = stream_from_file(path_to_csv_or_arff=DATA_PATH+\"electricity.arff\")\n", + "\n", + "# Creating the transformer\n", + "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", + "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", + "\n", + "# Creating a learner\n", + "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", + "\n", + "# Creating and populating the pipeline\n", + "pipeline = ClassifierPipeline(transformers=[normalisation_transformer, add_noise_transformer],\n", + " learner=ob_learner)\n", + "\n", + "# Alternative:\n", + "# pipeline = ClassifierPipeline()\n", + "# pipeline.add_transformer(normalization_transformer)\n", + "# pipeline.add_transformer(add_noise_transformer)\n", + "# pipeline.set_learner(ob_learner)\n", + "\n", + "# Creating the evaluator\n", + "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) #TODO: Change to transformer.get_schema() to pipeline.get_schema() or something like that.\n", + "\n", + "while elec_stream.has_more_instances():\n", + " instance = elec_stream.next_instance()\n", + " prediction = pipeline.predict(instance)\n", + " pipeline.train(instance)\n", + "\n", + " ob_evaluator.update(instance.y_index, prediction)\n", + "\n", + "ob_evaluator.accuracy()" + ] + }, + { + "cell_type": "markdown", + "id": "5f747c04-59ab-41cb-87ca-3d66ae75731a", + "metadata": {}, + "source": [ + "We can also get a textual representation of the pipeline:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "c11f5b39-bf53-496e-b42c-25f89458ff03", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'Transformer(NormalisationFilter ) | Transformer(AddNoiseFilter ) | OnlineBagging'" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "str(pipeline)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/capymoa/stream/preprocessing/__init__.py b/src/capymoa/stream/preprocessing/__init__.py new file mode 100644 index 00000000..05e3bb6d --- /dev/null +++ b/src/capymoa/stream/preprocessing/__init__.py @@ -0,0 +1,14 @@ +from .pipeline import ( + BasePipeline, ClassifierPipeline, RegressorPipeline +) +from .transformer import ( + Transformer, MOATransformer +) + +__all__ = [ + "BasePipeline", + "ClassifierPipeline", + "RegressorPipeline", + "Transformer", + "MOATransformer" +] diff --git a/src/capymoa/stream/preprocessing/pipeline.py b/src/capymoa/stream/preprocessing/pipeline.py new file mode 100644 index 00000000..affab38a --- /dev/null +++ b/src/capymoa/stream/preprocessing/pipeline.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from typing import Optional, List + +from capymoa.base import Classifier, Regressor +from capymoa.instance import LabeledInstance, Instance, RegressionInstance +from capymoa.stream.preprocessing.transformer import Transformer +from capymoa.type_alias import LabelProbabilities, LabelIndex, TargetValue + + +class BasePipeline: + def __init__(self, transformers: List[Transformer] | None = None): + self.elements: List[Transformer] = [] if transformers is None else transformers + + def add_transformer(self, new_element: Transformer): + assert isinstance(new_element, Transformer), "Please provide a Transformer object" + self.elements.append(new_element) + + def transform(self, instance: Instance) -> Instance: + inst = instance + for i, element in enumerate(self.elements): + inst = element.transform_instance(inst) + return inst + + def __str__(self): + s = "" + for i, transformer in enumerate(self.elements): + s += str(transformer) + if i == len(self.elements) - 1: + break + s += " | " + return s + + +class ClassifierPipeline(BasePipeline, Classifier): + + def __init__(self, transformers: List[Transformer] | None = None, learner: Classifier | None = None): + super(ClassifierPipeline, self).__init__(transformers) + self.learner = learner + + def train(self, instance: LabeledInstance): + instance = self.transform(instance) + self.learner.train(instance) + + def predict(self, instance: Instance) -> Optional[LabelIndex]: + instance = self.transform(instance) + return self.learner.predict(instance) + + def predict_proba(self, instance: Instance) -> LabelProbabilities: + instance = self.transform(instance) + return self.learner.predict_proba(instance) + + def set_learner(self, learner: Classifier | Regressor): + self.learner = learner + + def __str__(self): + s = "" + for i, transformer in enumerate(self.elements): + s += str(transformer) + s += " | " + return s + str(self.learner) + + +class RegressorPipeline(BasePipeline, Regressor): + def __init__(self, transformers: List[Transformer] | None = None, learner: Regressor | None = None): + super(RegressorPipeline, self).__init__(transformers) + self.learner = learner + + def train(self, instance: RegressionInstance): + instance = self.transform(instance) + self.learner.train(instance) + + def predict(self, instance: RegressionInstance) -> TargetValue: + instance = self.transform(instance) + return self.learner.predict(instance) + + def set_learner(self, learner: Classifier | Regressor): + self.learner = learner + + def __str__(self): + s = "" + for i, transformer in enumerate(self.elements): + s += str(transformer) + s += " | " + return s + str(self.learner) diff --git a/src/capymoa/stream/preprocessing/transformer.py b/src/capymoa/stream/preprocessing/transformer.py new file mode 100644 index 00000000..22147ce6 --- /dev/null +++ b/src/capymoa/stream/preprocessing/transformer.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + +from capymoa.stream import Schema, Stream +from capymoa.instance import Instance +from moa.streams import FilteredQueueStream +from moa.streams.filters import StreamFilter + + +class Transformer(ABC): + @abstractmethod + def transform_instance(self, instance) -> Instance: + raise NotImplementedError + + @abstractmethod + def get_schema(self) -> Schema: + raise NotImplementedError + + @abstractmethod + def restart(self): + raise NotImplementedError + + +class MOATransformer(Transformer): + def __init__(self, schema=None, moa_filter: StreamFilter | None = None, CLI=None): + self.schema = schema + self.CLI = CLI + self.moa_filter = moa_filter + self._last_instance = None + self._last_transformed_instance = None + + if self.CLI is not None: + if self.moa_filter is not None: + self.moa_filter.getOptions().setViaCLIString(CLI) + else: + raise RuntimeError("Must provide a moa_filter to set via CLI.") + + if self.moa_filter is not None: + # Must call this method exactly here, because prepareForUse invoke the method to initialize the + # header file of the stream (synthetic ones) + self.moa_filter.prepareForUse() + else: + raise RuntimeError( + "Must provide a moa_filter to initialize the Schema." + ) + + if self.schema is None: + if self.moa_filter is not None: + self.schema = Schema(moa_header=self.moa_filter.getHeader()) + else: + raise RuntimeError( + "Must provide a moa_filter to initialize the Schema." + ) + + queue = FilteredQueueStream() + self.filtered_stream = Stream(schema=schema, + moa_stream=queue, + CLI=f"-f ({self.moa_filter.getCLICreationString(self.moa_filter.__class__)})") + + def __str__(self): + return f"Transformer({str(self.moa_filter.getCLICreationString(self.moa_filter.__class__))})" + + def transform_instance(self, instance) -> Instance: + # MOA filters are not stateless. + # This hack avoids transforming an instance twice. + if self._last_instance == instance: + return self._last_transformed_instance + self._last_instance = instance + + self.filtered_stream.moa_stream.addToQueue(instance.java_instance.instance) + new_instance = self.filtered_stream.next_instance() + + self._last_transformed_instance = new_instance + return new_instance + + def get_schema(self): + return self.schema + + def restart(self): + self.moa_filter.restart() + + def get_moa_filter(self): + return self.moa_filter