Skip to content

Commit

Permalink
Merge pull request #3 from bsuryadevara/fix-dfp-benchmarks
Browse files Browse the repository at this point in the history
Fix DFP benchmarks
  • Loading branch information
drobison00 authored Mar 1, 2023
2 parents a39de93 + ba8734d commit 56477f4
Show file tree
Hide file tree
Showing 14 changed files with 619 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ To provide your own calibration or use other `pytest-benchmark` features with th

Morpheus pipeline configurations for each workflow are managed using [pipelines_conf.json](./resource/pipelines_conf.json). For example, this is the Morpheus configuration for `duo_training_modules`:
```
"test_dfp_training_duo_modules_e2e": {
"file_path": "../../../../data/dfp/duo-training-data/*.json",
"num_threads": 8,
"pipeline_batch_size": 1024,
"edge_buffer_size": 4,
"start_time": "2022-08-01",
"duration": "60d"
"test_dfp_modules_azure_training_e2e": {
"message_path": "./resource/control_message_azure_training.json",
"num_threads": 12,
"pipeline_batch_size": 256,
"edge_buffer_size": 128,
"start_time": "2022-08-01",
"duration": "60d"
},
...
```
Expand All @@ -61,14 +61,18 @@ The `--benchmark-warmup` and `--benchmark-warmup-iterations` options are used to
`<test-workflow>` is the name of the test to run benchmarks on. This can be one of the following:
- `test_dfp_inference_azure_stages_e2e`
- `test_dfp_inference_duo_stages_e2e`
- `test_dfp_training_azure_modules_e2e`
- `test_dfp_training_azure_stages_e2e`
- `test_dfp_training_duo_modules_e2e`
- `test_dfp_training_duo_stages_e2e`
- `test_dfp_modules_duo_training_e2e`
- `test_dfp_modules_azure_training_e2e`
- `test_dfp_modules_duo_inference_e2e`
- `test_dfp_modules_azure_inference_e2e`
- `test_dfp_modules_duo_e2e`
- `test_dfp_modules_azure_e2e`

For example, to run E2E benchmarks on the DFP training (modules) workflow on the duo logs:
```
pytest -s --benchmark-enable --benchmark-warmup=on --benchmark-warmup-iterations=1 --benchmark-autosave test_bench_e2e_dfp_pipeline.py::test_dfp_training_duo_modules_e2e
pytest -s --benchmark-enable --benchmark-warmup=on --benchmark-warmup-iterations=1 --benchmark-autosave test_bench_e2e_dfp_pipeline.py::test_dfp_modules_duo_training_e2e
```

To run E2E benchmarks on all workflows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,39 @@ def pytest_benchmark_update_json(config, benchmarks, output_json):
for fn in glob.glob(source_files_glob):
line_count += get_json_lines_count(fn)
byte_count += path.getsize(fn)
elif "message_path" in PIPELINES_CONF[bench["name"]]:
source_message_glob = path.join(curr_dir, PIPELINES_CONF[bench["name"]]["message_path"])
for message_fn in glob.glob(source_message_glob):
message_file = open(message_fn)
control_message = json.load(message_file)
inputs = control_message.get("inputs")
# Iterating over inputs
for input in inputs:
line_count_per_task = 0
byte_count_per_task = 0
tasks = input.get("tasks")
# Iterating over tasks
for task in tasks:
if task.get("type") == "load":
files = task.get("properties").get("files")
# Iterating over files in a task
for file_glob in files:
# Iterating over a file glob
for fn in glob.glob(file_glob):
count = get_json_lines_count(fn)
size = path.getsize(fn)
line_count += count
byte_count += size
line_count_per_task += count
byte_count_per_task += size
else:
non_load_task = task.get("type")
# Adding non-load task status here.
if non_load_task is not None:
bench['stats'][non_load_task] = {}
bench['stats'][non_load_task]["input_lines"] = line_count_per_task
bench['stats'][non_load_task]["input_bytes"] = byte_count_per_task

else:
raise KeyError("Configuration requires either 'glob_path' or 'file_path' attribute.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

import mlflow
import pandas as pd
from dfp.utils.derive_args import pyobj2str

from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.messages.multi_message import MultiMessage
from morpheus.utils.column_info import BoolColumn
from morpheus.utils.column_info import ColumnInfo
from morpheus.utils.column_info import CustomColumn
Expand Down Expand Up @@ -197,10 +199,10 @@ def feature_columns(self):
def source(self):
return self._source

def get_config(self) -> Config:
def get_config(self, use_cpp=True) -> Config:

config = Config()
CppConfig.set_should_use_cpp(False)
CppConfig.set_should_use_cpp(use_cpp)
config.ae = ConfigAutoEncoder()

config.num_threads = self.pipeline_conf["num_threads"]
Expand Down Expand Up @@ -238,29 +240,39 @@ def _get_start_stop_time(self) -> typing.Tuple[datetime, datetime]:
return tuple((start_time, end_time))

def update_modules_conf(self, source_schema: DataFrameInputSchema, preprocess_schema: DataFrameInputSchema):

start_stop_time = self._get_start_stop_time()
self.modules_conf["preprocessing"]["FileBatcher"]["start_time"] = start_stop_time[0]
self.modules_conf["preprocessing"]["FileBatcher"]["end_time"] = start_stop_time[0]
self.modules_conf["preprocessing"]["DFPRollingWindow"]["max_history"] = self.pipeline_conf["duration"]

encoding = "latin1"

# Convert schema as a string
source_schema_str = str(pickle.dumps(source_schema), encoding=encoding)
preprocess_schema_str = str(pickle.dumps(preprocess_schema), encoding=encoding)

self.modules_conf["preprocessing"]["FileToDF"]["schema"]["schema_str"] = source_schema_str
self.modules_conf["preprocessing"]["FileToDF"]["schema"]["encoding"] = encoding
self.modules_conf["preprocessing"]["DFPDataPrep"]["schema"]["schema_str"] = preprocess_schema_str
self.modules_conf["preprocessing"]["DFPDataPrep"]["schema"]["encoding"] = encoding
self.modules_conf["train_deploy"]["DFPTraining"]["feature_columns"] = self.feature_columns

self.modules_conf["train_deploy"]["MLFlowModelWriter"]["model_name_formatter"] = self._get_model_name_formatter(
)
self.modules_conf["train_deploy"]["MLFlowModelWriter"][
start_stop_time = self._get_start_stop_time()
self.modules_conf["DFPPreproc"]["FileBatcher"]["start_time"] = start_stop_time[0]
self.modules_conf["DFPPreproc"]["FileBatcher"]["end_time"] = start_stop_time[0]
self.modules_conf["DFPPreproc"]["FileBatcher"]["schema"]["schema_str"] = source_schema_str
self.modules_conf["DFPPreproc"]["FileBatcher"]["schema"]["encoding"] = encoding

self.modules_conf["DFPTra"]["DFPDataPrep"]["schema"]["schema_str"] = preprocess_schema_str
self.modules_conf["DFPTra"]["DFPDataPrep"]["schema"]["encoding"] = encoding

self.modules_conf["DFPTra"]["DFPRollingWindow"]["max_history"] = self.pipeline_conf["duration"]
self.modules_conf["DFPTra"]["DFPTraining"]["feature_columns"] = self.feature_columns
self.modules_conf["DFPTra"]["MLFlowModelWriter"]["model_name_formatter"] = self._get_model_name_formatter()
self.modules_conf["DFPTra"]["MLFlowModelWriter"][
"experiment_name_formatter"] = self._get_experiment_name_formatter()

self.modules_conf["DFPInf"]["DFPRollingWindow"]["max_history"] = "1d"
self.modules_conf["DFPInf"]["DFPDataPrep"]["schema"]["schema_str"] = preprocess_schema_str
self.modules_conf["DFPInf"]["DFPDataPrep"]["schema"]["encoding"] = encoding
self.modules_conf["DFPInf"]["DFPInference"]["model_name_formatter"] = self._get_model_name_formatter()
self.modules_conf["DFPInf"]["FilterDetections"]["schema"]["input_message_type"] = pyobj2str(
MultiMessage, encoding)
self.modules_conf["DFPInf"]["FilterDetections"]["schema"]["encoding"] = encoding
self.modules_conf["DFPInf"]["Serialize"]["use_cpp"] = True
self.modules_conf["DFPInf"]["WriteToFile"]["filename"] = "dfp_detections_{}.csv".format(self._source)

self.modules_conf["output_port_count"] = 2

def get_stages_conf(self) -> typing.Dict[str, any]:

stages_conf = {}
Expand Down Expand Up @@ -289,6 +301,10 @@ def get_filenames(self) -> typing.List[str]:
file_path = self.pipeline_conf.get("file_path")
full_file_path = path.join(THIS_DIR, file_path)
filenames = [full_file_path]
elif "message_path" in self.pipeline_conf:
file_path = self.pipeline_conf.get("message_path")
full_file_path = path.join(THIS_DIR, file_path)
filenames = [full_file_path]
else:
raise KeyError("Configuration needs the glob path or file path attribute.")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
{
"type": "training",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
},
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
{
"type": "inference",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-inference-data/*.json"
]
}
},
{
"type": "inference",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/azure-training-data/*.json"
]
}
},
{
"type": "training",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/duo-training-data/*.json"
]
}
},
{
"type": "training",
"properties": {
}
}
],
"metadata": {
"data_type": "streaming"
}
},
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/duo-inference-data/*.json"
]
}
},
{
"type": "inference",
"properties": {
}
}
],
"metadata": {
"data_type": "streaming"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/duo-inference-data/*.json"
]
}
},
{
"type": "inference",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"../../../../../examples/data/dfp/duo-training-data/*.json"
]
}
},
{
"type": "training",
"properties": {
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
Loading

0 comments on commit 56477f4

Please sign in to comment.