Skip to content

Commit

Permalink
Migrate CLX DGA Detection (#46)
Browse files Browse the repository at this point in the history
- Migrate code, notebook, dataset for CLX DGA Detection
- Remove references to CLX
- Example Morpheus pipeline
- Triton model repo with ONNX model
- Fix bug in existing code caused by upstream cudf updates
- Add new training and validation data
- Merge version updates from PR #49
- Remove `cuml` dependency so scripts/notebooks can also be run in Morpheus container 

Task from nv-morpheus/Morpheus#508

Authors:
  - Eli Fajardo (https://github.com/efajardo-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - https://github.com/raykallen

URL: #46
  • Loading branch information
efajardo-nv authored Jun 5, 2023
1 parent fb89620 commit f59aada
Show file tree
Hide file tree
Showing 22 changed files with 1,858 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ appshield-dga-detection/datasets/pipeline/data/** filter=lfs diff=lfs merge=lfs
phishing-url-detection/datasets/pipeline/data/** filter=lfs diff=lfs merge=lfs -text
phishing-url-detection/datasets/pipeline/models/** filter=lfs diff=lfs merge=lfs -text
appshield-dga-detection/datasets/pipeline/models/** filter=lfs diff=lfs merge=lfs -text
dga-detection/datasets/** filter=lfs diff=lfs merge=lfs -text
dga-detection/models/** filter=lfs diff=lfs merge=lfs -text
53 changes: 53 additions & 0 deletions dga-detection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## DGA Detection

### Use Case
Detection of domains created by domain generation algorithms

### Version
1.0

### Model Overview
This model is a recurrent neural network model trained to classify URL domains generated by Domain-Generation-Algorithms. Domain generation algorithms (DGA) are algorithms seen in various families of malware that are used to periodically generate a large number of domain names that can be used as rendezvous points with their command and control servers. The large number of potential rendezvous points makes it difficult for law enforcement to effectively shut down botnets, since infected computers will attempt to contact some of these domain names every day to receive updates or commands.

### Model Architecture
We use a type of recurrent neural network called the Gated Recurrent Unit (GRU) to train a model with up-to-date domain names representative of both benign and DGA generated strings.

### Requirements
To run this example, additional requirements must be installed into your environment. A supplementary requirements file has been provided in this example directory.

```bash
pip install -r requirements.txt
```

### Training

#### Training data

Training data consists of 116K labelled as DGA domains and 100K labelled as not DGA domains.

Two types of DGA domains (Banjori, Chinad) were generated based on the implementations on https://github.com/baderj/domain_generation_algorithms. 100000 benign domains were taken from https://www.domcop.com/files/top/top10milliondomains.csv.zip.

#### Training epochs
25

#### Training batch size
10000

#### GPU model
V100

#### Model accuracy
precision = 0.995
ccuracy = 0.998

#### Training script

To train the model run the following script under working directory.
```bash
cd ${MORPHEUS_EXPERIMENTAL_ROOT}/dga-detection/training-tuning-inference

# Run training script and save models

python train.py --training-data ../datasets/benign_and_dga_domains.csv
```
This saves the trained model file to the `models` directory which will be created if does not already exist.
3 changes: 3 additions & 0 deletions dga-detection/datasets/dga-training-data.csv
Git LFS file not shown
3 changes: 3 additions & 0 deletions dga-detection/datasets/dga-validation-data.csv
Git LFS file not shown
3 changes: 3 additions & 0 deletions dga-detection/models/dga-detection.onnx
Git LFS file not shown
114 changes: 114 additions & 0 deletions dga-detection/morpheus-pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<!--
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-->

# Example Morpheus Pipeline for DGA Detection

Example Morpheus pipeline using Triton Inference server and Morpheus.

### Set up Triton Inference Server

##### Pull Triton Inference Server Docker Image
Pull Docker image from NGC (https://ngc.nvidia.com/catalog/containers/nvidia:tritonserver) suitable for your environment.

Example:

```bash
docker pull nvcr.io/nvidia/tritonserver:23.03-py3
```

##### Start Triton Inference Server Container
From the `morpheus-experimental` repo root directory, run the following to launch Triton and load the `dga-detection-onnx` model:

```bash
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/dga-detection:/models nvcr.io/nvidia/tritonserver:23.03-py3 tritonserver --model-repository=/models/morpheus-pipeline/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model dga-detection-onnx
```

##### Verify Model Deployment
Once Triton server finishes starting up, it will display the status of all loaded models. Successful deployment of the model will show the following:

```
+------------------+---------+--------+
| Model | Version | Status |
+------------------+---------+--------+
| dga-detection-onnx | 1 | READY |
+------------------+---------+--------+
```

> **Note**: If this is not present in the output, check the Triton log for any error messages related to loading the model.
### Build and Run Morpheus Container

Now that the model has been deployed successfully. For the experimental pipeline to execute, let's build a Morpheus container if one does not already exist.

**Note**: Before running the Morpheus container, we would need to supply an additional docker parameter to bind the Morpheus experimental pipeline repo to the container as a volume as shown in the example.

Build the release container as instructed in the [Build Morpheus Container] section of [Getting Started with Morpheus] document.

Set the following environmental variable from the root of your `morpheus-experimental` repo:
```bash
export MORPHEUS_EXPERIMENTAL_ROOT=$(pwd)
```

Now `cd` to your Morpheus repo and run the following to start your Morpheus container:
```bash
DOCKER_EXTRA_ARGS="-v ${MORPHEUS_EXPERIMENTAL_ROOT}:/workspace/morpheus-experimental" ./docker/run_container_release.sh
```

### Run DGA Detection Pipeline

Run the following in your container to start the DGA detection pipeline:

```bash
cd morpheus-experimental/dga-detection/morpheus-pipeline
python run.py \
--log_level INFO \
--num_threads 1 \
--input_file ../datasets/dga-validation-data.csv \
--output_file ./dga-detection-output.jsonlines \
--model_name dga-detection-onnx \
--server_url localhost:8001
```

Use `--help` to display information about the command line options:

```bash
python run.py --help

Usage: run.py [OPTIONS]

Options:
--num_threads INTEGER RANGE Number of internal pipeline threads to use.
[x>=1]
--pipeline_batch_size INTEGER RANGE
Internal batch size for the pipeline. Can be
much larger than the model batch size. Also
used for Kafka consumers. [x>=1]
--model_max_batch_size INTEGER RANGE
Max batch size to use for the model. [x>=1]
--input_file PATH Input filepath. [required]
--output_file TEXT The path to the file where the inference
output will be saved.
--model_name TEXT The name of the model that is deployed on
Tritonserver. [required]
--model_seq_length INTEGER RANGE
Sequence length to use for the model.
[x>=1]
--server_url TEXT Tritonserver url. [required]
--log_level [CRITICAL|FATAL|ERROR|WARN|WARNING|INFO|DEBUG]
Specify the logging level to use.
--help Show this message and exit.
```

76 changes: 76 additions & 0 deletions dga-detection/morpheus-pipeline/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
import typing

from morpheus.messages import MultiInferenceMessage
from morpheus.messages.memory.tensor_memory import TensorMemory
from morpheus.messages.message_meta import MessageMeta


@dataclasses.dataclass
class MultiInferenceDGAMessage(MultiInferenceMessage, cpp_class=None):
"""
A stronger typed version of `MultiInferenceMessage` that is used for NLP workloads. Helps ensure the
proper inputs are set and eases debugging.
"""

required_tensors: typing.ClassVar[typing.List[str]] = ["domains", "seq_ids"]

def __init__(self,
*,
meta: MessageMeta,
mess_offset: int = 0,
mess_count: int = -1,
memory: TensorMemory = None,
offset: int = 0,
count: int = -1):

super().__init__(meta=meta,
mess_offset=mess_offset,
mess_count=mess_count,
memory=memory,
offset=offset,
count=count)

@property
def domains(self):
"""
Returns token-ids for each string padded with 0s to max_length.
Returns
-------
cupy.ndarray
The token-ids for each string padded with 0s to max_length.
"""

return self._get_tensor_prop("domains")

@property
def seq_ids(self):
"""
Returns sequence ids, which are used to keep track of which inference requests belong to each message.
Returns
-------
cupy.ndarray
Ids used to index from an inference input to a message. Necessary since there can be more
inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
"""

return self._get_tensor_prop("seq_ids")
130 changes: 130 additions & 0 deletions dga-detection/morpheus-pipeline/preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing
from functools import partial

import cupy as cp
import mrc
from messages import MultiInferenceDGAMessage

import cudf

from morpheus.config import Config
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import MultiMessage
from morpheus.messages.memory.tensor_memory import TensorMemory
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage


class PreprocessDGAStage(PreprocessBaseStage):
"""
Prepare NLP input DataFrames for inference.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
vocab_hash_file : str
Path to hash file containing vocabulary of words with token-ids. This can be created from the raw vocabulary
using the `cudf.utils.hash_vocab_utils.hash_vocab` function.
truncation : bool
If set to true, strings will be truncated and padded to max_length. Each input string will result in exactly one
output sequence. If set to false, there may be multiple output sequences when the max_length is smaller
than generated tokens.
do_lower_case : bool
If set to true, original text will be lowercased before encoding.
add_special_tokens : bool
Whether or not to encode the sequences with the special tokens of the BERT classification model.
stride : int
If `truncation` == False and the tokenized string is larger than max_length, the sequences containing the
overflowing token-ids can contain duplicated token-ids from the main sequence. If max_length is equal to stride
there are no duplicated-id tokens. If stride is 80% of max_length, 20% of the first sequence will be repeated on
the second sequence and so on until the entire sentence is encoded.
column : str
Name of the column containing the data that needs to be preprocessed.
"""

def __init__(self, c: Config, column: str = "data", truncate_length: int = 100):
super().__init__(c)

self._column = column
self._truncate_length = truncate_length
self._fea_length = c.feature_length

@property
def name(self) -> str:
return "preprocess-dga"

def supports_cpp_node(self):
return False

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, column: str, truncate_len: int) -> MultiInferenceNLPMessage:

df = x.get_meta([column])
df[column] = df[column].str.slice_replace(truncate_len, repl='')

split_ser = df[column].str.findall(r"[\w\W\d\D\s\S]")
split_df = split_ser.to_frame()
split_df = cudf.DataFrame(split_df[column].to_arrow().to_pylist())
columns_cnt = len(split_df.columns)

# Replace null's with ^.
split_df = split_df.fillna("^")
temp_df = cudf.DataFrame()
for col in range(0, columns_cnt):
temp_df[col] = split_df[col].str.code_points()
del split_df

# Replace ^ ascii value 94 with 0.
temp_df = temp_df.replace(94, 0)
temp_df.index = df.index
# temp_df["len"] = df["len"]
if "type" in df.columns:
temp_df["type"] = df["type"]
temp_df[column] = df[column]

temp_df = temp_df.drop("domain", axis=1)

domains = cp.asarray(temp_df.to_cupy()).astype("long")

input = cp.zeros((domains.shape[0], fea_len))
input[:domains.shape[0], :domains.shape[1]] = domains
input = input.astype("long")

count = input.shape[0]

seg_ids = cp.zeros((count, 3), dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = TensorMemory(count=input.shape[0], tensors={'domains': input, 'seq_ids': seg_ids})

infer_message = MultiInferenceDGAMessage.from_message(x, memory=memory)

return infer_message

def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]:

return partial(PreprocessDGAStage.pre_process_batch,
fea_len=self._fea_length,
column=self._column,
truncate_len=self._truncate_length)

def _get_preprocess_node(self, builder: mrc.Builder):
raise NotImplementedError("C++ node not implemented")
Loading

0 comments on commit f59aada

Please sign in to comment.