Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command line examples for module-based DFP pipelines #1154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ inference tasks against a specific set of data, and the capacity for real-time l
that can be injected back into the training pipeline.

The following content will track the pipeline declared in
`examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py`
`examples/digital_fingerprinting/production/morpheus/dfp_modules_streaming_pipeline.py`

```python
# Setup and command line argument parsing
Expand Down Expand Up @@ -514,3 +514,134 @@ def write_to_file(builder: mrc.Builder):
```

For a complete reference, refer to: [Write to File](../../modules/core/write_to_file.md)

## Running Example Modular DFP Pipelines

The following are steps to run modular DFP pipelines with example Azure and Duo datasets.

### System requirements
* [Docker](https://docs.docker.com/get-docker/) and [docker-compose](https://docs.docker.com/compose/) installed on the host machine​
* Supported GPU with [nvidia-docker runtime​](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html#docker)

> **Note:** For GPU Requirements refer to [getting_started](../../getting_started.md#requirements)

### Building the services
From the root of the Morpheus repo, run:
```bash
cd examples/digital_fingerprinting/production
docker compose build
```

> **Note:** This requires version 1.28.0 or higher of Docker Compose, and preferably v2. If you encounter an error similar to:
>
> ```
> ERROR: The Compose file './docker-compose.yml' is invalid because:
> services.jupyter.deploy.resources.reservations value Additional properties are not allowed ('devices' was
> unexpected)
> ```
>
> This is most likely due to using an older version of the `docker-compose` command, instead re-run the build with `docker compose`. Refer to [Migrate to Compose V2](https://docs.docker.com/compose/migrate/) for more information.

### Downloading the example datasets
First, we will need to install `s3fs` and then run the `examples/digital_fingerprinting/fetch_example_data.py` script. This will download the example data into the `examples/data/dfp` dir.

From the Morpheus repo, run:
```bash
pip install s3fs
python examples/digital_fingerprinting/fetch_example_data.py all
```

### Morpheus Pipeline
From the `examples/digital_fingerprinting/production` dir, run:
```bash
docker compose run morpheus_pipeline bash
```
To run the DFP pipelines with the example datasets within the container, run:

* Duo Training Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source duo \
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/duo_payload_training.json"
```

* Duo Inference Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source duo \
--start_time "2022-08-30" \
--input_file "./control_messages/duo_payload_inference.json"
```

* Duo Training + Inference Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source duo \
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/duo_payload_load_training_inference.json"
```

* Azure Training Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source azure \
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/azure_payload_training.json"
```

* Azure Inference Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source azure \
--start_time "2022-08-30" \
--input_file "./control_messages/azure_payload_inference.json"
```

* Azure Training + Inference Pipeline
```bash
python dfp_integrated_training_batch_pipeline.py \
--log_level DEBUG \
--use_cpp=true \
--source azure \
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/azure_payload_load_train_inference.json"
```

### Output Fields
The output files, `dfp_detectiions_duo.csv` and `dfp_detections_azure.csv`, will contain those logs from the input dataset for which an anomaly was detected; this is determined by the z-score in the `mean_abs_z` field. By default, any logs with a z-score of 2.0 or higher are considered anomalous. Refer to [`DFPPostprocessingStage`](6_digital_fingerprinting_reference.md#post-processing-stage-dfppostprocessingstage).

Most of the fields in the output files generated by running the above examples are input fields or derived from input fields. The additional output fields are:
| Field | Type | Description |
| ----- | ---- | ----------- |
| event_time | TEXT | ISO 8601 formatted date string, the time the anomaly was detected by Morpheus |
| model_version | TEXT | Name and version of the model used to performed the inference, in the form of `<model name>:<version>` |
| max_abs_z | FLOAT | Max z-score across all features |
| mean_abs_z | FLOAT | Average z-score across all features |

In addition to this, for each input feature the following output fields will exist:
| Field | Type | Description |
| ----- | ---- | ----------- |
| `<feature name>_loss` | FLOAT | The loss |
| `<feature name>_z_loss` | FLOAT | The loss z-score |
| `<feature name>_pred` | FLOAT | The predicted value |

Refer to [DFPInferenceStage](6_digital_fingerprinting_reference.md#inference-stage-dfpinferencestage) for more on these fields.
6 changes: 6 additions & 0 deletions examples/digital_fingerprinting/production/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ Run Azure Inference Pipeline:
python dfp_azure_pipeline.py --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json"
```

##### Module-based DFP pipelines

The commands in the previous section run stage-based example DFP pipelines. The Morpheus 23.03 release introduced a new, more flexible module-based approach to build pipelines through the use of control messages. More information about modular DFP pipelines can be found [here](../../../docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md).

Commands to run equivalent module-based DFP pipelines can be found [here](../../../docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md#running-example-modular-dfp-pipelines).

#### Optional MLflow Service
Starting either the `morpheus_pipeline` or the `jupyter` service, will start the `mlflow` service in the background. For debugging purposes it can be helpful to view the logs of the running MLflow service.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ To provide your own calibration or use other `pytest-benchmark` features with th
Morpheus pipeline configurations for each workflow are managed using [pipelines_conf.json](./resource/pipelines_conf.json). For example, this is the Morpheus configuration for `dfp_modules_duo_payload_inference`:
```
"test_dfp_modules_duo_payload_inference_e2e": {
"message_path": "./resource/control_messages/duo_payload_inference.json",
"message_path": "../control_messages/duo_payload_inference.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_json_lines_count(filename):
return len(lines)


def pytest_benchmark_update_json(_, __, output_json):
def pytest_benchmark_update_json(config, benchmarks, output_json): # pylint:disable=unused-argument
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved

curr_dir = path.dirname(path.abspath(__file__))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"tracking_uri": "http://localhost:5000",
"test_dfp_modules_azure_payload_inference_e2e": {
"message_path": "./resource/control_messages/azure_payload_inference.json",
"message_path": "../control_messages/azure_payload_inference.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -13,7 +13,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_payload_lti_e2e": {
"message_path": "./resource/control_messages/azure_payload_lti.json",
"message_path": "../control_messages/azure_payload_lti.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -25,7 +25,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_payload_lti_s3_e2e": {
"message_path": "./resource/control_messages/azure_payload_lti_s3.json",
"message_path": "../control_messages/azure_payload_lti_s3.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -37,7 +37,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_payload_training_e2e": {
"message_path": "./resource/control_messages/azure_payload_training.json",
"message_path": "../control_messages/azure_payload_training.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -49,7 +49,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_streaming_inference_e2e": {
"message_path": "./resource/control_messages/azure_streaming_inference.json",
"message_path": "../control_messages/azure_streaming_inference.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -61,7 +61,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_streaming_lti_e2e": {
"message_path": "./resource/control_messages/azure_streaming_lti.json",
"message_path": "../control_messages/azure_streaming_lti.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -73,7 +73,7 @@
"use_cpp": true
},
"test_dfp_modules_azure_streaming_training_e2e": {
"message_path": "./resource/control_messages/azure_streaming_training.json",
"message_path": "../control_messages/azure_streaming_training.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -85,7 +85,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_payload_inference_e2e": {
"message_path": "./resource/control_messages/duo_payload_inference.json",
"message_path": "../control_messages/duo_payload_inference.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -97,7 +97,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_payload_lti_e2e": {
"message_path": "./resource/control_messages/duo_payload_lti.json",
"message_path": "../control_messages/duo_payload_lti.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -109,7 +109,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_payload_only_load_e2e": {
"message_path": "./resource/control_messages/duo_payload_only_load.json",
"message_path": "../control_messages/duo_payload_only_load.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -121,7 +121,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_payload_training_e2e": {
"message_path": "./resource/control_messages/duo_payload_training.json",
"message_path": "../control_messages/duo_payload_training.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -133,7 +133,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_streaming_inference_e2e": {
"message_path": "./resource/control_messages/duo_streaming_inference.json",
"message_path": "../control_messages/duo_streaming_inference.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -145,7 +145,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_streaming_lti_e2e": {
"message_path": "./resource/control_messages/duo_streaming_lti.json",
"message_path": "../control_messages/duo_streaming_lti.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -157,7 +157,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_streaming_only_load_e2e": {
"message_path": "./resource/control_messages/duo_streaming_only_load.json",
"message_path": "../control_messages/duo_streaming_only_load.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -169,7 +169,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_streaming_payload_e2e": {
"message_path": "./resource/control_messages/duo_streaming_payload.json",
"message_path": "../control_messages/duo_streaming_payload.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand All @@ -181,7 +181,7 @@
"use_cpp": true
},
"test_dfp_modules_duo_streaming_training_e2e": {
"message_path": "./resource/control_messages/duo_streaming_training.json",
"message_path": "../control_messages/duo_streaming_training.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
"../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
"../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
Expand All @@ -28,7 +28,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
"../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
"../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
"../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
Expand All @@ -34,7 +34,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
"../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
"../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
"../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
Expand Down
Loading