Skip to content

Commit

Permalink
nlp_si_detection example improvements (#193)
Browse files Browse the repository at this point in the history
* Add a second monitor stage to report progress for the back-half of the pipeline
* Increase threads to match number of stages
* Change triton port to 8000 avoiding a warning about the grpc port
* Add EdgeConnectors for derived classes of MultiMessage to MultiMessage
* Remove filter stage from example #83 & #197

fixes #186

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #193
  • Loading branch information
dagardner-nv authored Jun 27, 2022
1 parent 9fba09c commit d35593c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
80 changes: 59 additions & 21 deletions examples/nlp_si_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,30 @@ With the Morpheus CLI, an entire pipeline can be configured and run without writ

The following command line is the entire command to build and launch the pipeline. Each new line represents a new stage. The comment above each stage gives information about why the stage was added and configured this way.

From the Morpheus repo root directory run:
```bash
export MORPHEUS_ROOT=../..
export MORPHEUS_ROOT=$(pwd)
# Launch Morpheus printing debug messages
morpheus --debug --log_level=DEBUG \
morpheus --log_level=DEBUG \
`# Run a pipeline with 8 threads and a model batch size of 32 (Must match Triton config)` \
run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=32 \
`# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \
pipeline-nlp --model_seq_length=256 \
`# 1st Stage: Read from file` \
from-file --filename=$MORPHEUS_ROOT/examples/data/pcap_dump.jsonlines \
from-file --filename=examples/data/pcap_dump.jsonlines \
`# 2nd Stage: Deserialize from JSON strings to objects` \
deserialize \
`# 3rd Stage: Preprocessing converts the input data into BERT tokens` \
preprocess --vocab_hash_file=$MORPHEUS_ROOT/morpheus/data/bert-base-uncased-hash.txt --do_lower_case=True --truncation=True \
preprocess --vocab_hash_file=morpheus/data/bert-base-uncased-hash.txt --do_lower_case=True --truncation=True \
`# 4th Stage: Send messages to Triton for inference. Specify the model loaded in Setup` \
inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8001 --force_convert_inputs=True \
inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8000 --force_convert_inputs=True \
`# 5th Stage: Monitor stage prints throughput information to the console` \
monitor --description "Inference Rate" --smoothing=0.001 --unit inf \
`# 6th Stage: Add results from inference to the messages` \
add-class \
`# 7th Stage: Filtering removes any messages that did not detect SI` \
filter \
`# 8th Stage: Convert from objects back into strings` \
`# 7th Stage: Convert from objects back into strings` \
serialize --exclude '^_ts_' \
`# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \
`# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
```

Expand All @@ -150,7 +149,7 @@ Config:
"secret_keys",
"user"
],
"debug": true,
"debug": false,
"edge_buffer_size": 128,
"feature_length": 256,
"fil": null,
Expand All @@ -165,29 +164,68 @@ CPP Enabled: True
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=/home/dagardner/work/examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, cudf_kwargs=None)>
Added source: <from-file-0; FileSourceStage(filename=examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, cudf_kwargs=None)>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage()>
└─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <preprocess-nlp-2; PreprocessNLPStage(vocab_hash_file=/home/dagardner/work/morpheus/data/bert-base-uncased-hash.txt, truncation=True, do_lower_case=True, add_special_tokens=False, stride=-1)>
Added stage: <preprocess-nlp-2; PreprocessNLPStage(vocab_hash_file=morpheus/data/bert-base-uncased-hash.txt, truncation=True, do_lower_case=True, add_special_tokens=False, stride=-1)>
└─ morpheus.MultiMessage -> morpheus.MultiInferenceNLPMessage
Added stage: <inference-3; TritonInferenceStage(model_name=sid-minibert-onnx, server_url=localhost:8001, force_convert_inputs=True, use_shared_memory=False)>
Added stage: <inference-3; TritonInferenceStage(model_name=sid-minibert-onnx, server_url=localhost:8000, force_convert_inputs=True, use_shared_memory=False)>
└─ morpheus.MultiInferenceNLPMessage -> morpheus.MultiResponseProbsMessage
Added stage: <monitor-4; MonitorStage(description=Inference Rate, smoothing=0.001, unit=inf, delayed_start=False, determine_count_fn=None)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], prefix=)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <filter-6; FilterDetectionsStage(threshold=0.5)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <serialize-7; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
Added stage: <serialize-6; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MessageMeta
Added stage: <to-file-8; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1651079123.1867409
====Pipeline Started====
Inference Rate[Complete]: 93085inf [00:06, 153.30inf/s]
Starting! Time: 1656352480.541071
Inference Rate[Complete]: 93085inf [00:07, 12673.63inf/s]
====Pipeline Complete====

```

The output file `detections.jsonlines` will contain PCAP messages that contain some SI (any class with a predection greater that 0.5).
The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
* address
* bank_acct
* credit_card
* email
* govt_id
* name
* password
* phone_num
* secret_keys
* user

The value for these fields will either be a `1` indicating a decection and a `0` indicating no detection. An example row with a detection looks like:
```json
{
"timestamp": 1616381019580,
"host_ip": "10.188.40.56",
"data_len": "129",
"data": "\"{\\\"X-Postmark-Server-Token\\\": \\\"76904958 O7FWqd9p TzIBfSYk\\\"}\"",
"src_mac": "04:3f:72:bf:af:74",
"dest_mac": "b4:a9:fc:3c:46:f8",
"protocol": "6",
"src_ip": "10.20.16.248",
"dest_ip": "10.244.0.60",
"src_port": "51374",
"dest_port": "80",
"flags": "24",
"is_pii": false,
"address": 0,
"bank_acct": 0,
"credit_card": 0,
"email": 0,
"govt_id": 0,
"name": 0,
"password": 0,
"phone_num": 0,
"secret_keys": 1,
"user": 0
}
```
27 changes: 27 additions & 0 deletions morpheus/_lib/src/python_modules/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <morpheus/objects/tensor.hpp>
#include <morpheus/utilities/cudf_util.hpp>

#include <srf/node/edge_connector.hpp>

#include <pybind11/cast.h>
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h>
Expand Down Expand Up @@ -68,6 +70,31 @@ PYBIND11_MODULE(messages, m)
// Allows python objects to keep DataTable objects alive
py::class_<IDataTable, std::shared_ptr<IDataTable>>(m, "DataTable");

// EdgeConnectors for derived classes of MultiMessage to MultiMessage
srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceFILMessage>,
std::shared_ptr<morpheus::MultiInferenceMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceFILMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceNLPMessage>,
std::shared_ptr<morpheus::MultiInferenceMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceNLPMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiResponseMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiResponseProbsMessage>,
std::shared_ptr<morpheus::MultiResponseMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiResponseProbsMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

py::class_<MessageMeta, std::shared_ptr<MessageMeta>>(m, "MessageMeta")
.def(py::init<>(&MessageMetaInterfaceProxy::init_python), py::arg("df"))
.def_property_readonly("count", &MessageMetaInterfaceProxy::count)
Expand Down

0 comments on commit d35593c

Please sign in to comment.