Skip to content

Commit

Permalink
Update GNN stellargraph with DGL (#1032)
Browse files Browse the repository at this point in the history
This PR replaces stellargraph reference in the training & validation scripts. This will address the issue #945  & #925

Authors:
  - Tad ZeMicheal (https://github.com/tzemicheal)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - https://github.com/gbatmaz
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1032
  • Loading branch information
tzemicheal authored Aug 29, 2023
1 parent 2314afa commit c89d4c1
Show file tree
Hide file tree
Showing 57 changed files with 2,709 additions and 2,156 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ tests/mock_triton_server/payloads/** filter=lfs diff=lfs merge=lfs -text
tests/tests_data/** filter=lfs diff=lfs merge=lfs -text
examples/basic_usage/img/** filter=lfs diff=lfs merge=lfs -text
docs/source/img/* filter=lfs diff=lfs merge=lfs -text
git filter=lfs diff=lfs merge=lfs -text
status filter=lfs diff=lfs merge=lfs -text
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
uses: ./.github/workflows/ci_pipe.yml
with:
run_check: ${{ startsWith(github.ref_name, 'pull-request/') }}
container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230801
test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230801
container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230828
test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230828
secrets:
NGC_API_KEY: ${{ secrets.NGC_API_KEY }}
7 changes: 0 additions & 7 deletions ci/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ RUN apt update && \
COPY ./docker/conda/environments/cuda${CUDA_SHORT_VER}_examples.yml /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml
COPY ./ci/scripts/download_kafka.py /tmp/scripts/download_kafka.py

# Install extra deps needed for gnn_fraud_detection_pipeline & ransomware_detection examples
RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env update -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml && \
conda clean -afy && \
source activate ${PROJ_NAME} && \
pip install --ignore-requires-python stellargraph==1.2.1 && \
rm -rf /tmp/conda

# Install camouflage needed for unittests to mock a triton server
RUN source activate ${PROJ_NAME} && \
npm install -g camouflage-server@0.9 && \
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/run_ci_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ GIT_BRANCH=$(git branch --show-current)
GIT_COMMIT=$(git log -n 1 --pretty=format:%H)

LOCAL_CI_TMP=${LOCAL_CI_TMP:-${MORPHEUS_ROOT}/.tmp/local_ci_tmp}
CONTAINER_VER=${CONTAINER_VER:-230801}
CONTAINER_VER=${CONTAINER_VER:-230828}
CUDA_VER=${CUDA_VER:-11.8}
DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""}

Expand Down
8 changes: 2 additions & 6 deletions docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ channels:
- rapidsai
- nvidia
- conda-forge
- dglteam/label/cu118
dependencies:
- boto3
- chardet=5.0.0
- cuml=23.06
- dask>=2023.1.1
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- mlflow>=2.2.1,<3
- papermill=2.3.4
- s3fs>=2023.6
- pip
- wrapt=1.14.1 # ver 1.15 breaks the keras model used by the gnn_fraud_detection_pipeline
- pip:
# tensorflow exists in conda-forge but is tied to CUDA-11.3
- tensorflow==2.12.0
71 changes: 36 additions & 35 deletions examples/gnn_fraud_detection_pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ Prior to running the GNN fraud detection pipeline, additional requirements must

```bash
mamba env update -n ${CONDA_DEFAULT_ENV} -f examples/gnn_fraud_detection_pipeline/requirements.yml
pip install --ignore-requires-python stellargraph==1.2.1
```

> **Note**: The `--ignore-requires-python` is needed because Stellargraph only officially supports Python versions prior to 3.9 ([stellargraph/stellargraph#1960](https://github.com/stellargraph/stellargraph/issues/1960)).
## Running

##### Setup Env Variable
Expand All @@ -44,23 +41,22 @@ python run.py --help
Usage: run.py [OPTIONS]
Options:
--num_threads INTEGER RANGE Number of internal pipeline threads to use
--num_threads INTEGER RANGE Number of internal pipeline threads to use.
[x>=1]
--pipeline_batch_size INTEGER RANGE
Internal batch size for the pipeline. Can be
much larger than the model batch size. Also
used for Kafka consumers
used for Kafka consumers. [x>=1]
--model_max_batch_size INTEGER RANGE
Max batch size to use for the model
--input_file PATH Input filepath [required]
Max batch size to use for the model. [x>=1]
--model_fea_length INTEGER RANGE
Features length to use for the model.
[x>=1]
--input_file PATH Input data filepath. [required]
--training_file PATH Training data filepath. [required]
--model_dir PATH Trained model directory path [required]
--output_file TEXT The path to the file where the inference
output will be saved.
--training_file PATH Training data file [required]
--model_fea_length INTEGER RANGE
Features length to use for the model
--model-xgb-file PATH The name of the XGB model that is deployed
--model-hinsage-file PATH The name of the trained HinSAGE model file path
--help Show this message and exit.
```

Expand All @@ -71,35 +67,41 @@ cd ${MORPHEUS_ROOT}/examples/gnn_fraud_detection_pipeline
python run.py
```
```
====Registering Pipeline====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=validation.csv, iterative=None, file_type=auto, repeat=1, filter_null=False)>
Graph construction rate: 0 messages [00:00, ? me====Building Pipeline Complete!====
Inference rate: 0 messages [00:00, ? messages/s]====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started==== 0 messages [00:00, ? messages/s]
====Building Segment: linear_segment_0====ges/s]
Added source: <from-file-0; FileSourceStage(filename=validation.csv, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=False)>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage()>
Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True)>
└─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <fraud-graph-construction-2; FraudGraphConstructionStage(training_file=training.csv)>
└─ morpheus.MultiMessage -> stages.FraudGraphMultiMessage
Added stage: <monitor-3; MonitorStage(description=Graph construction rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
Added stage: <monitor-3; MonitorStage(description=Graph construction rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ stages.FraudGraphMultiMessage -> stages.FraudGraphMultiMessage
Added stage: <gnn-fraud-sage-4; GraphSAGEStage(model_hinsage_file=model/hinsage-model.pt, batch_size=5, sample_size=[2, 32], record_id=index, target_node=transaction)>
Added stage: <gnn-fraud-sage-4; GraphSAGEStage(model_dir=model, batch_size=100, record_id=index, target_node=transaction)>
└─ stages.FraudGraphMultiMessage -> stages.GraphSAGEMultiMessage
Added stage: <monitor-5; MonitorStage(description=Inference rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
Added stage: <monitor-5; MonitorStage(description=Inference rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ stages.GraphSAGEMultiMessage -> stages.GraphSAGEMultiMessage
Added stage: <gnn-fraud-classification-6; ClassificationStage(model_xgb_file=model/xgb-model.pt)>
Added stage: <gnn-fraud-classification-6; ClassificationStage(model_xgb_file=model/xgb.pt)>
└─ stages.GraphSAGEMultiMessage -> morpheus.MultiMessage
Added stage: <monitor-7; MonitorStage(description=Add classification rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
Added stage: <monitor-7; MonitorStage(description=Add classification rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MultiMessage -> morpheus.MultiMessage
Added stage: <serialize-8; SerializeStage(include=None, exclude=['^ID$', '^_ts_'], output_type=pandas)>
└─ morpheus.MultiMessage -> pandas.DataFrame
Added stage: <monitor-9; MonitorStage(description=Serialize rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
└─ pandas.DataFrame -> pandas.DataFrame
Added stage: <to-file-10; WriteToFileStage(filename=result.csv, overwrite=True, file_type=auto)>
└─ pandas.DataFrame -> pandas.DataFrame
====Building Pipeline Complete!====
====Pipeline Started====
Graph construction rate[Complete]: 265messages [00:00, 1590.22messages/s]
Inference rate[Complete]: 265messages [00:01, 150.23messages/s]
Add classification rate[Complete]: 265messages [00:01, 147.11messages/s]
Serialize rate[Complete]: 265messages [00:01, 142.31messages/s]
Added stage: <serialize-8; SerializeStage(include=[], exclude=['^ID$', '^_ts_'], fixed_columns=True)>
└─ morpheus.MultiMessage -> morpheus.MessageMeta
Added stage: <monitor-9; MonitorStage(description=Serialize rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <to-file-10; WriteToFileStage(filename=output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Graph construction rate[Complete]: 265 messages [00:00, 1218.88 messages/s]
Inference rate[Complete]: 265 messages [00:01, 174.04 messages/s]
Add classification rate[Complete]: 265 messages [00:01, 170.69 messages/s]
Serialize rate[Complete]: 265 messages [00:01, 166.36 messages/s]
====Pipeline Complete====
```

### CLI Example
Expand All @@ -118,9 +120,8 @@ morpheus --log_level INFO \
deserialize \
fraud-graph-construction --training_file examples/gnn_fraud_detection_pipeline/training.csv \
monitor --description "Graph construction rate" \
gnn-fraud-sage --model_hinsage_file examples/gnn_fraud_detection_pipeline/model/hinsage-model.pt \
gnn-fraud-sage --model_dir examples/gnn_fraud_detection_pipeline/model/ \
monitor --description "Inference rate" \
gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb-model.pt \
monitor --description "Add classification rate" \
serialize \
to-file --filename "output.csv" --overwrite
Expand Down
Binary file not shown.

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
9 changes: 2 additions & 7 deletions examples/gnn_fraud_detection_pipeline/requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ channels:
- rapidsai
- nvidia
- conda-forge
- dglteam/label/cu118
dependencies:
- chardet=5.0.0
- cuml=23.06
- dask>=2023.1.1
- distributed>=2023.1.1
- pip
- pip:
# tensorflow exists in conda-forge but is tied to CUDA-11.3
- tensorflow==2.12.0
- dgl=1.0.2
44 changes: 18 additions & 26 deletions examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logger import configure_logging
# pylint: disable=no-name-in-module
from stages.classification_stage import ClassificationStage
from stages.graph_construction_stage import FraudGraphConstructionStage
from stages.graph_sage_stage import GraphSAGEStage
Expand Down Expand Up @@ -60,48 +61,39 @@
)
@click.option(
"--input_file",
type=click.Path(exists=True, readable=True),
type=click.Path(exists=True, readable=True, dir_okay=False),
default="validation.csv",
required=True,
help="Input data filepath.",
)
@click.option(
"--training_file",
type=click.Path(exists=True, readable=True),
type=click.Path(exists=True, readable=True, dir_okay=False),
default="training.csv",
required=True,
help="Training data filepath.",
)
@click.option(
"--model-hinsage-file",
type=click.Path(exists=True, readable=True),
default="model/hinsage-model.pt",
"--model_dir",
type=click.Path(exists=True, readable=True, file_okay=False, dir_okay=True),
default="model",
required=True,
help="Trained hinsage model filepath.",
)
@click.option(
"--model-xgb-file",
type=click.Path(exists=True, readable=True),
default="model/xgb-model.pt",
required=True,
help="Trained xgb model filepath.",
help="Path to trained Hinsage & XGB models.",
)
@click.option(
"--output_file",
type=click.Path(dir_okay=False),
default="output.csv",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(
num_threads,
pipeline_batch_size,
model_max_batch_size,
model_fea_length,
input_file,
training_file,
model_hinsage_file,
model_xgb_file,
output_file,
):
def run_pipeline(num_threads,
pipeline_batch_size,
model_max_batch_size,
model_fea_length,
input_file,
training_file,
model_dir,
output_file):
# Enable the default logger.
configure_logging(log_level=logging.INFO)

Expand Down Expand Up @@ -140,12 +132,12 @@ def run_pipeline(
pipeline.add_stage(MonitorStage(config, description="Graph construction rate"))

# Add a sage inference stage.
pipeline.add_stage(GraphSAGEStage(config, model_hinsage_file))
pipeline.add_stage(GraphSAGEStage(config, model_dir))
pipeline.add_stage(MonitorStage(config, description="Inference rate"))

# Add classification stage.
# This stage adds detected classifications to each message.
pipeline.add_stage(ClassificationStage(config, model_xgb_file))
pipeline.add_stage(ClassificationStage(config, os.path.join(model_dir, "xgb.pt")))
pipeline.add_stage(MonitorStage(config, description="Add classification rate"))

# Add a serialize stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import typing

import mrc
from mrc.core import operators as ops

Expand Down Expand Up @@ -55,13 +53,13 @@ def __init__(self, c: Config, model_xgb_file: str):
def name(self) -> str:
return "gnn-fraud-classification"

def accepted_types(self) -> typing.Tuple:
def accepted_types(self) -> (GraphSAGEMultiMessage, ):
return (GraphSAGEMultiMessage, )

def supports_cpp_node(self):
def supports_cpp_node(self) -> bool:
return False

def _process_message(self, message: GraphSAGEMultiMessage):
def _process_message(self, message: GraphSAGEMultiMessage) -> GraphSAGEMultiMessage:
ind_emb_columns = message.get_meta(message.inductive_embedding_column_names)
message.set_meta("node_id", message.node_identifiers)

Expand Down
Loading

0 comments on commit c89d4c1

Please sign in to comment.