diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 45f35f0500..7a31b2f399 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -133,6 +133,7 @@ jobs: env: NVIDIA_VISIBLE_DEVICES: ${{ env.NVIDIA_VISIBLE_DEVICES }} PARALLEL_LEVEL: '10' + MERGE_EXAMPLES_YAML: '1' strategy: fail-fast: true @@ -164,6 +165,8 @@ jobs: username: '$oauthtoken' password: ${{ secrets.NGC_API_KEY }} image: ${{ inputs.container }} + env: + MERGE_DOCS_YAML: '1' strategy: fail-fast: true diff --git a/.github/workflows/label-external-issues.yml b/.github/workflows/label-external-issues.yml index 8595c630a2..d08ff5267e 100644 --- a/.github/workflows/label-external-issues.yml +++ b/.github/workflows/label-external-issues.yml @@ -27,7 +27,7 @@ jobs: Label-Issue: runs-on: ubuntu-latest # Only run if the issue author is not part of NV-Morpheus - if: ${{ ! contains(fromJSON('["OWNER", "MEMBER", "CONTRIBUTOR"]'), github.event.issue.author_association)}} + if: ${{ ! contains(fromJSON('["OWNER", "MEMBER", "CONTRIBUTOR", "COLLABORATOR"]'), github.event.issue.author_association)}} steps: - name: add-triage-label run: | diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 34d2b58bcf..4698370584 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -46,7 +46,7 @@ jobs: uses: ./.github/workflows/ci_pipe.yml with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230414 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230414 + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230510 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230510 secrets: NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/ci/runner/Dockerfile b/ci/runner/Dockerfile index fe3181ebb9..372b80cd4e 100644 --- a/ci/runner/Dockerfile +++ b/ci/runner/Dockerfile @@ -45,6 +45,7 @@ RUN apt update && \ COPY ./docker/conda/environments/* /tmp/conda/ RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env create -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_dev.yml && \ + /opt/conda/bin/mamba install -n morpheus -c conda-forge "conda-merge>=0.2" && \ sed -i "s/conda activate base/conda activate ${PROJ_NAME}/g" ~/.bashrc && \ conda clean -afy && \ rm -rf /tmp/conda @@ -68,25 +69,33 @@ RUN apt update && \ apt clean && \ rm -rf /var/lib/apt/lists/* - # ============ test ================== FROM base as test # Add any test only dependencies here. ARG PROJ_NAME +ARG CUDA_SHORT_VER RUN apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ - nodejs \ - npm \ - openjdk-11-jdk && \ + openjdk-11-jre-headless && \ apt clean && \ rm -rf /var/lib/apt/lists/* +COPY ./docker/conda/environments/cuda${CUDA_SHORT_VER}_examples.yml /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml + +# Install extra deps needed for gnn_fraud_detection_pipeline & ransomware_detection examples +RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env update -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml && \ + conda clean -afy && \ + source activate ${PROJ_NAME} && \ + pip install --ignore-requires-python stellargraph==1.2.1 && \ + rm -rf /tmp/conda + # Install camouflage needed for unittests to mock a triton server -RUN npm install -g camouflage-server@0.9 && \ +RUN source activate ${PROJ_NAME} && \ + npm install -g camouflage-server@0.9 && \ npm cache clean --force # Install pytest-kafka diff --git a/docs/requirements.txt b/ci/scripts/bootstrap_local_ci.sh old mode 100644 new mode 100755 similarity index 55% rename from docs/requirements.txt rename to ci/scripts/bootstrap_local_ci.sh index 1406ac0e68..d2f3cee716 --- a/docs/requirements.txt +++ b/ci/scripts/bootstrap_local_ci.sh @@ -1,4 +1,5 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,10 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -breathe==4.34.0 -exhale==0.3.6 -ipython -myst-parser==0.17.2 -nbsphinx -sphinx -sphinx_rtd_theme +export WORKSPACE_TMP="$(pwd)/ws_tmp" +mkdir -p ${WORKSPACE_TMP} +git clone ${GIT_URL} Morpheus +cd Morpheus/ +git checkout ${GIT_BRANCH} +git pull + +export MORPHEUS_ROOT=$(pwd) +export WORKSPACE=${MORPHEUS_ROOT} +export LOCAL_CI=1 +unset CMAKE_CUDA_COMPILER_LAUNCHER +unset CMAKE_CXX_COMPILER_LAUNCHER +unset CMAKE_C_COMPILER_LAUNCHER + +if [[ "${STAGE}" != "bash" ]]; then + ${MORPHEUS_ROOT}/ci/scripts/github/${STAGE}.sh +fi diff --git a/ci/scripts/github/build.sh b/ci/scripts/github/build.sh index 3e5242ba6c..8aec506af0 100755 --- a/ci/scripts/github/build.sh +++ b/ci/scripts/github/build.sh @@ -28,20 +28,26 @@ cmake --version ninja --version sccache --version -rapids-logger "Configuring cmake for Morpheus" git submodule update --init --recursive -cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} \ - -DCCACHE_PROGRAM_PATH=$(which sccache) \ - -DMORPHEUS_PYTHON_BUILD_WHEEL=ON \ - -DMORPHEUS_PYTHON_BUILD_STUBS=OFF \ - -DCMAKE_BUILD_RPATH_USE_ORIGIN=ON . +CMAKE_FLAGS="${CMAKE_BUILD_ALL_FEATURES}" +CMAKE_FLAGS="${CMAKE_FLAGS} -DMORPHEUS_PYTHON_BUILD_WHEEL=ON" +CMAKE_FLAGS="${CMAKE_FLAGS} -DMORPHEUS_PYTHON_BUILD_STUBS=OFF" +CMAKE_FLAGS="${CMAKE_FLAGS} -DCMAKE_BUILD_RPATH_USE_ORIGIN=ON" +if [[ "${LOCAL_CI}" == "" ]]; then + CMAKE_FLAGS="${CMAKE_FLAGS} -DCCACHE_PROGRAM_PATH=$(which sccache)" +fi + +rapids-logger "Configuring cmake for Morpheus with ${CMAKE_FLAGS}" +cmake -B build -G Ninja ${CMAKE_FLAGS} . rapids-logger "Building Morpheus" cmake --build build --parallel ${PARALLEL_LEVEL} -rapids-logger "sccache usage for morpheus build:" -sccache --show-stats +if [[ "${LOCAL_CI}" == "" ]]; then + rapids-logger "sccache usage for morpheus build:" + sccache --show-stats +fi rapids-logger "Archiving results" tar cfj "${WORKSPACE_TMP}/wheel.tar.bz" build/dist @@ -54,9 +60,9 @@ CPP_TESTS=($(find ${MORPHEUS_ROOT}/build/morpheus/_lib/tests -name "*.x" -exec r tar cfj "${WORKSPACE_TMP}/cpp_tests.tar.bz" "${CPP_TESTS[@]}" rapids-logger "Pushing results to ${DISPLAY_ARTIFACT_URL}" -aws s3 cp --no-progress "${WORKSPACE_TMP}/wheel.tar.bz" "${ARTIFACT_URL}/wheel.tar.bz" -aws s3 cp --no-progress "${WORKSPACE_TMP}/morhpeus_libs.tar.bz" "${ARTIFACT_URL}/morhpeus_libs.tar.bz" -aws s3 cp --no-progress "${WORKSPACE_TMP}/cpp_tests.tar.bz" "${ARTIFACT_URL}/cpp_tests.tar.bz" +upload_artifact "${WORKSPACE_TMP}/wheel.tar.bz" +upload_artifact "${WORKSPACE_TMP}/morhpeus_libs.tar.bz" +upload_artifact "${WORKSPACE_TMP}/cpp_tests.tar.bz" rapids-logger "Success" exit 0 diff --git a/ci/scripts/github/checks.sh b/ci/scripts/github/checks.sh index c87fa0b449..9f985e9db1 100755 --- a/ci/scripts/github/checks.sh +++ b/ci/scripts/github/checks.sh @@ -31,13 +31,19 @@ ${MORPHEUS_ROOT}/ci/scripts/python_checks.sh git submodule update --init --recursive rapids-logger "Configuring cmake for Morpheus" -cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} -DCCACHE_PROGRAM_PATH=$(which sccache) . +CMAKE_FLAGS="${CMAKE_BUILD_ALL_FEATURES}" +if [[ "${LOCAL_CI}" == "" ]]; then + CMAKE_FLAGS="${CMAKE_FLAGS} -DCCACHE_PROGRAM_PATH=$(which sccache)" +fi +cmake -B build -G Ninja ${CMAKE_FLAGS} . rapids-logger "Building targets that generate source code" cmake --build build --target morpheus_style_checks --parallel ${PARALLEL_LEVEL} -rapids-logger "sccache usage for source build:" -sccache --show-stats +if [[ "${LOCAL_CI}" == "" ]]; then + rapids-logger "sccache usage for source build:" + sccache --show-stats +fi rapids-logger "Checking versions" ${MORPHEUS_ROOT}/ci/scripts/version_checks.sh diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index 5e88ca862c..0c7709de14 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -41,17 +41,17 @@ rapids-logger "Memory" rapids-logger "User Info" id -# For PRs, $GIT_BRANCH is like: pull-request/989 -REPO_NAME=$(basename "${GITHUB_REPOSITORY}") -ORG_NAME="${GITHUB_REPOSITORY_OWNER}" -PR_NUM="${GITHUB_REF_NAME##*/}" - # S3 vars export S3_URL="s3://rapids-downloads/ci/morpheus" export DISPLAY_URL="https://downloads.rapids.ai/ci/morpheus" export ARTIFACT_ENDPOINT="/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}" export ARTIFACT_URL="${S3_URL}${ARTIFACT_ENDPOINT}" -export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/" + +if [[ "${LOCAL_CI}" == "1" ]]; then + export DISPLAY_ARTIFACT_URL="${LOCAL_CI_TMP}" +else + export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/" +fi # Set sccache env vars export SCCACHE_S3_KEY_PREFIX=morpheus-${NVARCH} @@ -60,6 +60,11 @@ export SCCACHE_REGION="us-east-2" export SCCACHE_IDLE_TIMEOUT=32768 #export SCCACHE_LOG=debug +export CONDA_ENV_YML=${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml +export CONDA_EXAMPLES_YML=${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_examples.yml +export CONDA_DOCS_YML=${MORPHEUS_ROOT}/docs/conda_docs.yml +export PIP_REQUIREMENTS=${MORPHEUS_ROOT}/docker/conda/environments/requirements.txt + export CMAKE_BUILD_ALL_FEATURES="-DCMAKE_MESSAGE_CONTEXT_SHOW=ON -DMORPHEUS_CUDA_ARCHITECTURES=60;70;75;80 -DMORPHEUS_BUILD_BENCHMARKS=ON -DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_BUILD_TESTS=ON -DMORPHEUS_USE_CONDA=ON -DMORPHEUS_PYTHON_INPLACE_BUILD=OFF -DMORPHEUS_PYTHON_BUILD_STUBS=ON -DMORPHEUS_USE_CCACHE=ON" export FETCH_STATUS=0 @@ -67,22 +72,46 @@ export FETCH_STATUS=0 print_env_vars function update_conda_env() { - rapids-logger "Checking for updates to conda env" - # Deactivate the environment first before updating conda deactivate - # Update the packages with --prune to remove any extra packages - rapids-mamba-retry env update -n morpheus --prune -q --file ${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml + ENV_YAML=${CONDA_ENV_YML} + if [[ "${MERGE_EXAMPLES_YAML}" == "1" || "${MERGE_DOCS_YAML}" == "1" ]]; then + # Merge the dev, docs and examples envs, otherwise --prune will remove the examples packages + ENV_YAML=${condatmpdir}/merged_env.yml + YAMLS="${CONDA_ENV_YML}" + if [[ "${MERGE_EXAMPLES_YAML}" == "1" ]]; then + YAMLS="${YAMLS} ${CONDA_EXAMPLES_YML}" + fi + if [[ "${MERGE_DOCS_YAML}" == "1" ]]; then + YAMLS="${YAMLS} ${CONDA_DOCS_YML}" + fi + + # Conda is going to expect a requirements.txt file to be in the same directory as the env yaml + cp ${PIP_REQUIREMENTS} ${condatmpdir}/requirements.txt + + rapids-logger "Merging conda envs: ${YAMLS}" + conda run -n morpheus --live-stream conda-merge ${YAMLS} > ${ENV_YAML} + fi + + rapids-logger "Checking for updates to conda env" + + # Update the packages + rapids-mamba-retry env update -n morpheus --prune -q --file ${ENV_YAML} # Finally, reactivate conda activate morpheus rapids-logger "Final Conda Environment" - conda list + show_conda_info } -function fetch_base_branch() { +function fetch_base_branch_gh_api() { + # For PRs, $GIT_BRANCH is like: pull-request/989 + REPO_NAME=$(basename "${GITHUB_REPOSITORY}") + ORG_NAME="${GITHUB_REPOSITORY_OWNER}" + PR_NUM="${GITHUB_REF_NAME##*/}" + rapids-logger "Retrieving base branch from GitHub API" [[ -n "$GH_TOKEN" ]] && CURL_HEADERS=('-H' "Authorization: token ${GH_TOKEN}") RESP=$( @@ -92,42 +121,30 @@ function fetch_base_branch() { "${GITHUB_API_URL}/repos/${ORG_NAME}/${REPO_NAME}/pulls/${PR_NUM}" ) - BASE_BRANCH=$(echo "${RESP}" | jq -r '.base.ref') + export BASE_BRANCH=$(echo "${RESP}" | jq -r '.base.ref') # Change target is the branch name we are merging into but due to the weird way jenkins does # the checkout it isn't recognized by git without the origin/ prefix export CHANGE_TARGET="origin/${BASE_BRANCH}" - rapids-logger "Base branch: ${BASE_BRANCH}" } -function fetch_s3() { - ENDPOINT=$1 - DESTINATION=$2 - if [[ "${USE_S3_CURL}" == "1" ]]; then - curl -f "${DISPLAY_URL}${ENDPOINT}" -o "${DESTINATION}" - FETCH_STATUS=$? - else - aws s3 cp --no-progress "${S3_URL}${ENDPOINT}" "${DESTINATION}" - FETCH_STATUS=$? - fi +function fetch_base_branch_local() { + rapids-logger "Retrieving base branch from git" + git remote add upstream ${GIT_UPSTREAM_URL} + git fetch upstream --tags + source ${MORPHEUS_ROOT}/ci/scripts/common.sh + export BASE_BRANCH=$(get_base_branch) + export CHANGE_TARGET="upstream/${BASE_BRANCH}" } -function restore_conda_env() { - - rapids-logger "Downloading build artifacts from ${DISPLAY_ARTIFACT_URL}" - fetch_s3 "${ARTIFACT_ENDPOINT}/conda_env.tar.gz" "${WORKSPACE_TMP}/conda_env.tar.gz" - fetch_s3 "${ARTIFACT_ENDPOINT}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" - - rapids-logger "Extracting" - mkdir -p /opt/conda/envs/morpheus - - # We are using the --no-same-owner flag since user id & group id's are inconsistent between nodes in our CI pool - tar xf "${WORKSPACE_TMP}/conda_env.tar.gz" --no-same-owner --directory /opt/conda/envs/morpheus - tar xf "${WORKSPACE_TMP}/wheel.tar.bz" --no-same-owner --directory ${MORPHEUS_ROOT} +function fetch_base_branch() { + if [[ "${LOCAL_CI}" == "1" ]]; then + fetch_base_branch_local + else + fetch_base_branch_gh_api + fi - rapids-logger "Setting conda env" - conda activate morpheus - conda-unpack + rapids-logger "Base branch: ${BASE_BRANCH}" } function show_conda_info() { @@ -137,3 +154,24 @@ function show_conda_info() { conda config --show-sources conda list --show-channel-urls } + +function upload_artifact() { + FILE_NAME=$1 + BASE_NAME=$(basename "${FILE_NAME}") + rapids-logger "Uploading artifact: ${BASE_NAME}" + if [[ "${LOCAL_CI}" == "1" ]]; then + cp ${FILE_NAME} "${LOCAL_CI_TMP}/${BASE_NAME}" + else + aws s3 cp --only-show-errors "${FILE_NAME}" "${ARTIFACT_URL}/${BASE_NAME}" + fi +} + +function download_artifact() { + ARTIFACT=$1 + rapids-logger "Downloading ${ARTIFACT} from ${DISPLAY_ARTIFACT_URL}" + if [[ "${LOCAL_CI}" == "1" ]]; then + cp "${LOCAL_CI_TMP}/${ARTIFACT}" "${WORKSPACE_TMP}/${ARTIFACT}" + else + aws s3 cp --only-show-errors "${ARTIFACT_URL}/${ARTIFACT}" "${WORKSPACE_TMP}/${ARTIFACT}" + fi +} diff --git a/ci/scripts/github/docs.sh b/ci/scripts/github/docs.sh index 93301b040e..6175aef585 100755 --- a/ci/scripts/github/docs.sh +++ b/ci/scripts/github/docs.sh @@ -20,7 +20,7 @@ source ${WORKSPACE}/ci/scripts/github/common.sh update_conda_env -aws s3 cp --no-progress "${ARTIFACT_URL}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" +download_artifact "wheel.tar.bz" tar xf "${WORKSPACE_TMP}/wheel.tar.bz" @@ -32,9 +32,6 @@ cd ${MORPHEUS_ROOT} git lfs install ${MORPHEUS_ROOT}/scripts/fetch_data.py fetch docs examples -rapids-logger "Installing Documentation dependencies" -mamba env update -f ${MORPHEUS_ROOT}/docs/conda_docs.yml - git submodule update --init --recursive rapids-logger "Configuring for docs" @@ -47,7 +44,7 @@ rapids-logger "Archiving the docs" tar cfj "${WORKSPACE_TMP}/docs.tar.bz" build/docs/html rapids-logger "Pushing results to ${DISPLAY_ARTIFACT_URL}" -aws s3 cp --no-progress "${WORKSPACE_TMP}/docs.tar.bz" "${ARTIFACT_URL}/docs.tar.bz" +upload_artifact "${WORKSPACE_TMP}/docs.tar.bz" rapids-logger "Success" exit 0 diff --git a/ci/scripts/github/test.sh b/ci/scripts/github/test.sh index 2ad57f7ce0..1fad54e560 100755 --- a/ci/scripts/github/test.sh +++ b/ci/scripts/github/test.sh @@ -21,9 +21,9 @@ source ${WORKSPACE}/ci/scripts/github/common.sh update_conda_env -aws s3 cp --no-progress "${ARTIFACT_URL}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" -aws s3 cp --no-progress "${ARTIFACT_URL}/cpp_tests.tar.bz" "${WORKSPACE_TMP}/cpp_tests.tar.bz" -aws s3 cp --no-progress "${ARTIFACT_URL}/morhpeus_libs.tar.bz" "${WORKSPACE_TMP}/morhpeus_libs.tar.bz" +download_artifact "wheel.tar.bz" +download_artifact "cpp_tests.tar.bz" +download_artifact "morhpeus_libs.tar.bz" tar xf "${WORKSPACE_TMP}/wheel.tar.bz" tar xf "${WORKSPACE_TMP}/morhpeus_libs.tar.bz" @@ -69,7 +69,7 @@ done rapids-logger "Running Python tests" set +e -python -I -m pytest --run_slow --run_kafka \ +python -I -m pytest --run_slow --run_kafka --fail_missing \ --junit-xml=${REPORTS_DIR}/report_pytest.xml \ --cov=morpheus \ --cov-report term-missing \ @@ -86,6 +86,6 @@ cd $(dirname ${REPORTS_DIR}) tar cfj ${WORKSPACE_TMP}/test_reports.tar.bz $(basename ${REPORTS_DIR}) rapids-logger "Pushing results to ${DISPLAY_ARTIFACT_URL}" -aws s3 cp ${WORKSPACE_TMP}/test_reports.tar.bz "${ARTIFACT_URL}/test_reports.tar.bz" +upload_artifact ${WORKSPACE_TMP}/test_reports.tar.bz exit ${TEST_RESULTS} diff --git a/ci/scripts/run_ci_local.sh b/ci/scripts/run_ci_local.sh new file mode 100755 index 0000000000..761e215ed4 --- /dev/null +++ b/ci/scripts/run_ci_local.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +case "$1" in + "" ) + STAGES=("bash") + ;; + "all" ) + STAGES=("checks" "build" "docs" "test") + ;; + "checks" | "build" | "docs" | "test" | "bash" ) + STAGES=("$1") + ;; + * ) + echo "Error: Invalid argument \"$1\" provided. Expected values: \"all\", \"checks\", \"build\", \"docs\", \"test\", or \"bash\"" + exit 1 + ;; +esac + +# CI image doesn't contain ssh, need to use https +function git_ssh_to_https() +{ + local url=$1 + echo $url | sed -e 's|^git@github\.com:|https://github.com/|' +} + +MORPHEUS_ROOT=${MORPHEUS_ROOT:-$(git rev-parse --show-toplevel)} + +GIT_URL=$(git remote get-url origin) +GIT_URL=$(git_ssh_to_https ${GIT_URL}) + +GIT_UPSTREAM_URL=$(git remote get-url upstream) +GIT_UPSTREAM_URL=$(git_ssh_to_https ${GIT_UPSTREAM_URL}) + +GIT_BRANCH=$(git branch --show-current) +GIT_COMMIT=$(git log -n 1 --pretty=format:%H) + +LOCAL_CI_TMP=${LOCAL_CI_TMP:-${MORPHEUS_ROOT}/.tmp/local_ci_tmp} +CONTAINER_VER=${CONTAINER_VER:-230510} +CUDA_VER=${CUDA_VER:-11.8} +DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""} + +BUILD_CONTAINER="nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-${CONTAINER_VER}" +TEST_CONTAINER="nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-${CONTAINER_VER}" + +ENV_LIST="--env LOCAL_CI_TMP=/ci_tmp" +ENV_LIST="${ENV_LIST} --env GIT_URL=${GIT_URL}" +ENV_LIST="${ENV_LIST} --env GIT_UPSTREAM_URL=${GIT_UPSTREAM_URL}" +ENV_LIST="${ENV_LIST} --env GIT_BRANCH=${GIT_BRANCH}" +ENV_LIST="${ENV_LIST} --env GIT_COMMIT=${GIT_COMMIT}" +ENV_LIST="${ENV_LIST} --env PARALLEL_LEVEL=$(nproc)" +ENV_LIST="${ENV_LIST} --env CUDA_VER=${CUDA_VER}" + +mkdir -p ${LOCAL_CI_TMP} +cp ${MORPHEUS_ROOT}/ci/scripts/bootstrap_local_ci.sh ${LOCAL_CI_TMP} + +for STAGE in "${STAGES[@]}"; do + DOCKER_RUN_ARGS="--rm -ti --net=host -v "${LOCAL_CI_TMP}":/ci_tmp ${ENV_LIST} --env STAGE=${STAGE}" + if [[ "${STAGE}" == "test" || "${USE_GPU}" == "1" ]]; then + CONTAINER="${TEST_CONTAINER}" + DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} --runtime=nvidia --gpus all" + if [[ "${STAGE}" == "test" ]]; then + DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} --env MERGE_EXAMPLES_YAML=1 --cap-add=sys_nice" + fi + else + CONTAINER="${BUILD_CONTAINER}" + DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} --runtime=runc" + if [[ "${STAGE}" == "docs" ]]; then + DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} --env MERGE_DOCS_YAML=1" + fi + fi + + if [[ "${STAGE}" == "bash" ]]; then + DOCKER_RUN_CMD="bash --init-file /ci_tmp/bootstrap_local_ci.sh" + else + DOCKER_RUN_CMD="/ci_tmp/bootstrap_local_ci.sh" + fi + + echo "Running ${STAGE} stage in ${CONTAINER}" + docker run ${DOCKER_RUN_ARGS} ${DOCKER_EXTRA_ARGS} ${CONTAINER} ${DOCKER_RUN_CMD} + + STATUS=$? + if [[ ${STATUS} -ne 0 ]]; then + echo "Error: docker exited with a non-zero status code for ${STAGE} of ${STATUS}" + exit ${STATUS} + fi +done diff --git a/docker/conda/environments/cuda11.8_examples.yml b/docker/conda/environments/cuda11.8_examples.yml new file mode 100644 index 0000000000..6a714c2269 --- /dev/null +++ b/docker/conda/environments/cuda11.8_examples.yml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Additional dependencies needed by a some of the Morpheus examples. +# The intended usage is to first create the conda environment from the `cuda11.8_dev.yml` file, and then update the +# env with this file. ex: +# mamba env create -n morpheus --file docker/conda/environments/cuda11.8_dev.yml +# conda activate morpheus +# mamba env update -n morpheus --file docker/conda/environments/cuda11.8_examples.yml +channels: + - rapidsai + - nvidia + - conda-forge +dependencies: + - chardet=5.0.0 + - cuml=23.02 + - dask==2023.1.1 + - distributed==2023.1.1 + - pip + - pip: + # tensorflow exists in conda-forge but is tied to CUDA-11.3 + - tensorflow==2.12.0 diff --git a/docs/conda_docs.yml b/docs/conda_docs.yml index 4967eb88ef..cd77bee757 100644 --- a/docs/conda_docs.yml +++ b/docs/conda_docs.yml @@ -21,5 +21,10 @@ dependencies: - pip ####### Morpheus Pip Dependencies (keep sorted!) ####### - pip: - # Ensure all runtime requirements are installed using the requirements file - - --requirement requirements.txt + - breathe==4.34.0 + - exhale==0.3.6 + - ipython + - myst-parser==0.17.2 + - nbsphinx + - sphinx + - sphinx_rtd_theme diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py index a62759d67c..183ed9fcda 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py @@ -14,6 +14,7 @@ import logging import typing +import warnings from collections import namedtuple from datetime import datetime @@ -37,17 +38,29 @@ def __init__(self, c: Config, date_conversion_func, period="D", - sampling_rate_s=0, + sampling_rate_s: typing.Optional[int] = None, start_time: datetime = None, - end_time: datetime = None): + end_time: datetime = None, + sampling: typing.Union[str, float, int, None] = None): super().__init__(c) self._date_conversion_func = date_conversion_func - self._sampling_rate_s = sampling_rate_s self._period = period self._start_time = start_time self._end_time = end_time + if (sampling_rate_s is not None and sampling_rate_s > 0): + assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time" + + # Show the deprecation message + warnings.warn(("The `sampling_rate_s` argument has been deprecated. " + "Please use `sampling={sampling_rate_s}S` instead"), + DeprecationWarning) + + sampling = f"{sampling_rate_s}S" + + self._sampling = sampling + @property def name(self) -> str: return "dfp-file-batcher" @@ -60,8 +73,11 @@ def accepted_types(self) -> typing.Tuple: def on_data(self, file_objects: fsspec.core.OpenFiles): + timestamps = [] + full_names = [] + file_objs = [] + # Determine the date of the file, and apply the window filter if we have one - ts_and_files = [] for file_object in file_objects: ts = self._date_conversion_func(file_object) @@ -70,60 +86,52 @@ def on_data(self, file_objects: fsspec.core.OpenFiles): or (self._end_time is not None and ts > self._end_time)): continue - ts_and_files.append(TimestampFileObj(ts, file_object)) - - # sort the incoming data by date - ts_and_files.sort(key=lambda x: x.timestamp) + timestamps.append(ts) + full_names.append(file_object.full_name) + file_objs.append(file_object) - # Create a dataframe with the incoming metadata - if ((len(ts_and_files) > 1) and (self._sampling_rate_s > 0)): - file_sampled_list = [] + # Build the dataframe + df = pd.DataFrame(index=pd.DatetimeIndex(timestamps), data={"filename": full_names, "objects": file_objects}) - ts_last = ts_and_files[0].timestamp + # sort the incoming data by date + df.sort_index(inplace=True) - file_sampled_list.append(ts_and_files[0]) + # If sampling was provided, perform that here + if (self._sampling is not None): - for idx in range(1, len(ts_and_files)): - ts = ts_and_files[idx].timestamp + if (isinstance(self._sampling, str)): + # We have a frequency for sampling. Resample by the frequency, taking the first + df = df.resample(self._sampling).first().dropna() - if ((ts - ts_last).seconds >= self._sampling_rate_s): + elif (self._sampling < 1.0): + # Sample a fraction of the rows + df = df.sample(frac=self._sampling).sort_index() - ts_and_files.append(ts_and_files[idx]) - ts_last = ts else: - ts_and_files = file_sampled_list + # Sample a fixed amount + df = df.sample(n=self._sampling).sort_index() - df = pd.DataFrame() + # Early exit if no files were found + if (len(df) == 0): + return [] - timestamps = [] - full_names = [] - file_objs = [] - for (ts, file_object) in ts_and_files: - timestamps.append(ts) - full_names.append(file_object.full_name) - file_objs.append(file_object) + if (self._period is None): + # No period was set so group them all into one single batch + return [(fsspec.core.OpenFiles(df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs), + len(df))] - df["dfp_timestamp"] = timestamps - df["key"] = full_names - df["objects"] = file_objs + # Now group the rows by the period + resampled = df.resample(self._period) - output_batches = [] + n_groups = len(resampled) - if len(df) > 0: - # Now split by the batching settings - df_period = df["dfp_timestamp"].dt.to_period(self._period) - - period_gb = df.groupby(df_period) + output_batches = [] - n_groups = len(period_gb) - for group in period_gb.groups: - period_df = period_gb.get_group(group) + for _, period_df in resampled: - obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), - mode=file_objects.mode, - fs=file_objects.fs) + obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs) - output_batches.append((obj_list, n_groups)) + output_batches.append((obj_list, n_groups)) return output_batches diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py index 588f0a448e..86aa39ea63 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py @@ -111,7 +111,7 @@ type=int, default=0, show_envvar=True, - help="Minimum time step, in milliseconds, between object logs.") + help="Samples the input data files allowing only one file per bin defined by `sample_rate_s`.") @click.option( "--input_file", "-f", @@ -248,8 +248,8 @@ def run_pipeline(train_users, # Batch files into buckets by time. Use the default ISO date extractor from the filename pipeline.add_stage( DFPFileBatcherStage(config, - period="D", - sampling_rate_s=sample_rate_s, + period=None, + sampling=f"{sample_rate_s}S" if sample_rate_s > 0 else None, date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex), start_time=start_time, end_time=end_time)) diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index ef00c11576..1a2b0ed3a3 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -22,8 +22,11 @@ Prior to running the GNN fraud detection pipeline, additional requirements must ```bash mamba env update -n ${CONDA_DEFAULT_ENV} -f examples/gnn_fraud_detection_pipeline/requirements.yml +pip install --ignore-requires-python stellargraph==1.2.1 ``` +> **Note**: The `--ignore-requires-python` is needed because Stellargraph only officially supports Python versions prior to 3.9 ([stellargraph/stellargraph#1960](https://github.com/stellargraph/stellargraph/issues/1960)). + ## Running ##### Setup Env Variable diff --git a/examples/gnn_fraud_detection_pipeline/requirements.yml b/examples/gnn_fraud_detection_pipeline/requirements.yml index 85150ba54b..fa3f867ee9 100644 --- a/examples/gnn_fraud_detection_pipeline/requirements.yml +++ b/examples/gnn_fraud_detection_pipeline/requirements.yml @@ -20,7 +20,9 @@ channels: dependencies: - chardet=5.0.0 - cuml=23.02 + - dask==2023.1.1 + - distributed==2023.1.1 + - pip - pip: # tensorflow exists in conda-forge but is tied to CUDA-11.3 - - stellargraph==1.2.1 - tensorflow==2.12.0 diff --git a/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py b/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py index 2444d33438..3abdcd5561 100644 --- a/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py +++ b/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py @@ -21,6 +21,7 @@ import cuml from morpheus.cli.register_stage import register_stage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import MultiMessage @@ -48,6 +49,7 @@ def __init__(self, c: Config, model_xgb_file: str): super().__init__(c) self._xgb_model = cuml.ForestInference.load(model_xgb_file, output_class=True) + self._needed_columns.update({'node_id': TypeId.INT64, 'prediction': TypeId.FLOAT32}) @property def name(self) -> str: @@ -61,9 +63,11 @@ def supports_cpp_node(self): def _process_message(self, message: GraphSAGEMultiMessage): ind_emb_columns = message.get_meta(message.inductive_embedding_column_names) - message.set_meta("node_id", message.node_identifiers) + # The XGBoost model is returning two probabilities for the binary classification. The first (column 0) is + # probability that the transaction is in the benign class, and the second (column 1) is the probability that + # the transaction is in the fraudulent class. Added together the two values will always equal 1. prediction = self._xgb_model.predict_proba(ind_emb_columns).iloc[:, 1] message.set_meta("prediction", prediction) diff --git a/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py b/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py index 2bc377720c..cae0a6a81b 100644 --- a/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py +++ b/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py @@ -90,7 +90,7 @@ def _graph_construction(nodes, edges, node_features) -> "stellargraph.StellarGra for edge in edges: g_nx.add_edges_from(edge) - return StellarGraph(g_nx, node_type_name="ntype", node_features=node_features) + return StellarGraph.from_networkx(g_nx, node_type_attr='ntype', node_features=node_features) @staticmethod def _build_graph_features(dataset: pd.DataFrame) -> "stellargraph.StellarGraph": diff --git a/examples/ransomware_detection/config/ransomware_detection.yaml b/examples/ransomware_detection/config/ransomware_detection.yaml index ba1a3889bb..882c75baa7 100644 --- a/examples/ransomware_detection/config/ransomware_detection.yaml +++ b/examples/ransomware_detection/config/ransomware_detection.yaml @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ---- -file_extensions: +--- +file_extensions: - doc - docx - html @@ -38,7 +38,7 @@ file_extensions: - 7z - rar - msg -model_features: +model_features: - envirs_pathext - count_double_extension_count_handles - page_readonly_vads_count @@ -138,8 +138,9 @@ model_features: - page_execute_readwrite_vads_count - handles_df_type_unique_ratio - page_execute_readwrite_count +features: - ldrmodules_df_path -raw_columns: +raw_columns: - Base - Block - CommitCharge diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index bf1e7337d3..8950a7ee47 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -36,7 +36,7 @@ @click.command() @click.option('--debug', default=False) -@click.option('--use_cpp', default=False) +@click.option('--use_cpp', default=False, help="Enable C++ execution for this pipeline, currently this is unsupported.") @click.option( "--num_threads", default=os.cpu_count(), @@ -147,7 +147,10 @@ def run_pipeline(debug, cols_interested_plugins = rwd_conf['raw_columns'] # Feature columns used by the model. - feature_columns = rwd_conf['model_features'] + model_features = rwd_conf['model_features'] + + # Features to include in the DF, superset of model_features along with a few that the model doesn't receive + feature_columns = model_features + rwd_conf['features'] # File extensions. file_extns = rwd_conf['file_extensions'] @@ -185,8 +188,7 @@ def run_pipeline(debug, # Add preprocessing stage. # This stage generates snapshot sequences using sliding window for each pid_process. - pipeline.add_stage(PreprocessingRWStage(config, feature_columns=feature_columns[:-1], - sliding_window=sliding_window)) + pipeline.add_stage(PreprocessingRWStage(config, feature_columns=model_features, sliding_window=sliding_window)) # Add a monitor stage # This stage logs the metrics (msg/sec) from the above stage. diff --git a/examples/ransomware_detection/stages/create_features.py b/examples/ransomware_detection/stages/create_features.py index c5898dac13..bd74bdb83b 100644 --- a/examples/ransomware_detection/stages/create_features.py +++ b/examples/ransomware_detection/stages/create_features.py @@ -24,7 +24,6 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.pipeline.multi_message_stage import MultiMessageStage from morpheus.pipeline.stream_pair import StreamPair @@ -83,86 +82,86 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): return False - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + def on_next(self, x: AppShieldMessageMeta): - stream = input_stream[0] + snapshot_fea_dfs = [] - def on_next(x: AppShieldMessageMeta): + df = x.df - snapshot_fea_dfs = [] + # Type cast CommitCharge. + df["CommitCharge"] = df["CommitCharge"].astype("float").astype("Int32") + df["Name"] = df["Name"].str.lower() - df = x.df + # Create PID_Process feature. + df['PID_Process'] = df.PID + '_' + df.Process - # Type cast CommitCharge. - df["CommitCharge"] = df["CommitCharge"].astype("float").astype("Int32") - df["Name"] = df["Name"].str.lower() + snapshot_ids = df.snapshot_id.unique() - # Create PID_Process feature. - df['PID_Process'] = df.PID + '_' + df.Process + if len(snapshot_ids) > 1: + # Group snapshot rows using snapshot id. + all_dfs = [df[df.snapshot_id == snapshot_id] for snapshot_id in snapshot_ids] + else: + all_dfs = [df] - snapshot_ids = df.snapshot_id.unique() + extract_func = self._fe.extract_features + combine_func = FeatureExtractor.combine_features - if len(snapshot_ids) > 1: - # Group snapshot rows using snapshot id. - all_dfs = [df[df.snapshot_id == snapshot_id] for snapshot_id in snapshot_ids] - else: - all_dfs = [df] + # Schedule dask task `extract_features` per snapshot. + snapshot_fea_dfs = self._client.map(extract_func, all_dfs, feas_all_zeros=self._feas_all_zeros) - extract_func = self._fe.extract_features - combine_func = FeatureExtractor.combine_features + # Combined `extract_features` results. + features_df = self._client.submit(combine_func, snapshot_fea_dfs) - # Schedule dask task `extract_features` per snapshot. - snapshot_fea_dfs = self._client.map(extract_func, all_dfs, feas_all_zeros=self._feas_all_zeros) + # Gather features from all the snapshots. + features_df = features_df.result() - # Combined `extract_features` results. - features_df = self._client.submit(combine_func, snapshot_fea_dfs) + # Snapshot sequence will be generated using `source_pid_process`. + # Determines which source generated the snapshot messages. + # There's a chance of receiving the same snapshots names from multiple sources(hosts) + features_df['source_pid_process'] = x.source + '_' + features_df.pid_process - # Gather features from all the snapshots. - features_df = features_df.result() + # Sort entries by pid_process and snapshot_id + features_df = features_df.sort_values(by=["pid_process", "snapshot_id"]).reset_index(drop=True) - # Snapshot sequence will be generated using `source_pid_process`. - # Determines which source generated the snapshot messages. - # There's a chance of receiving the same snapshots names from multiple sources(hosts) - features_df['source_pid_process'] = x.source + '_' + features_df.pid_process + # Create AppShieldMessageMeta with extracted features information. + meta = AppShieldMessageMeta(features_df, x.source) - # Sort entries by pid_process and snapshot_id - features_df = features_df.sort_values(by=["pid_process", "snapshot_id"]).reset_index(drop=True) + return meta - # Create AppShieldMessageMeta with extracted features information. - meta = AppShieldMessageMeta(features_df, x.source) + def create_multi_messages(self, x: AppShieldMessageMeta) -> typing.List[MultiMessage]: - return meta + multi_messages = [] - def create_multi_messages(x: MessageMeta) -> typing.List[MultiMessage]: + df = x.df - multi_messages = [] + pid_processes = df.pid_process.unique() - df = x.df + # Create multi messaage per pid_process, this assumes that the DF has been sorted by the `pid_process` column + for pid_process in pid_processes: - pid_processes = df.pid_process.unique() + pid_process_index = df[df.pid_process == pid_process].index - # Create multi messaage per pid_process - for pid_process in pid_processes: + start = pid_process_index.min() + stop = pid_process_index.max() + 1 + mess_count = stop - start - pid_process_index = df[df.pid_process == pid_process].index + multi_message = MultiMessage(meta=x, mess_offset=start, mess_count=mess_count) + multi_messages.append(multi_message) - start = pid_process_index.min() - stop = pid_process_index.max() + 1 - mess_count = stop - start + return multi_messages - multi_message = MultiMessage(meta=x, mess_offset=start, mess_count=mess_count) - multi_messages.append(multi_message) + def on_completed(self): + # Close dask client when pipeline initiates shutdown + self._client.close() - return multi_messages + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: - def on_completed(): - # Close dask client when pipeline initiates shutdown - self._client.close() + stream = input_stream[0] node = builder.make_node(self.unique_name, - ops.map(on_next), - ops.map(create_multi_messages), - ops.on_completed(on_completed), + ops.map(self.on_next), + ops.map(self.create_multi_messages), + ops.on_completed(self.on_completed), ops.flatten()) builder.make_edge(stream, node) stream = node diff --git a/examples/ransomware_detection/stages/preprocessing.py b/examples/ransomware_detection/stages/preprocessing.py index de90b57c8c..9f8abfe423 100644 --- a/examples/ransomware_detection/stages/preprocessing.py +++ b/examples/ransomware_detection/stages/preprocessing.py @@ -20,6 +20,7 @@ from common.data_models import SnapshotData from morpheus.cli.register_stage import register_stage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import InferenceMemoryFIL @@ -32,7 +33,7 @@ @register_stage("ransomware-preprocess", modes=[PipelineModes.FIL]) class PreprocessingRWStage(PreprocessBaseStage): """ - This class extends PreprocessBaseStage and process the features that aree derived from Appshield data. + This class extends PreprocessBaseStage and process the features that are derived from Appshield data. It also arranges the snapshots of Appshield data in a sequential order using provided sliding window. Parameters @@ -58,6 +59,7 @@ def __init__(self, c: Config, feature_columns: typing.List[str], sliding_window: # Padding data to map inference response with input messages. self._padding_data = [0 for i in range(self._features_len * sliding_window)] + self._needed_columns.update({'sequence': TypeId.STRING}) @property def name(self) -> str: @@ -67,10 +69,12 @@ def supports_cpp_node(self): return False def _sliding_window_offsets(self, ids: typing.List[int], ids_len: int, - window: int) -> typing.List[typing.List[int]]: + window: int) -> typing.List[typing.Tuple[int]]: """ Create snapshot_id's sliding sequence for a given window """ + assert ids_len == len(ids) + assert ids_len >= window sliding_window_offsets = [] diff --git a/external/utilities b/external/utilities index b5a53cf77c..858a4a42ab 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit b5a53cf77c9c9e9406939f1e6bbc163ba16b0f0f +Subproject commit 858a4a42ab63654605ac8ebbbe3e21d432a958ef diff --git a/models/training-tuning-scripts/fraud-detection-models/requirements.txt b/models/training-tuning-scripts/fraud-detection-models/requirements.txt index 139f666431..45d60f1018 100644 --- a/models/training-tuning-scripts/fraud-detection-models/requirements.txt +++ b/models/training-tuning-scripts/fraud-detection-models/requirements.txt @@ -1,9 +1,9 @@ dateparser==1.1.1 -matplotlib==3.6.0 -networkx==2.8.6 -numpy==1.22.4 +matplotlib==3.7.1 +networkx==2.8.8 +numpy==1.23.5 pandas==1.3.5 scikit_learn==1.1.2 stellargraph==1.2.1 -tensorflow==2.9.0 -xgboost==1.6.2 +tensorflow==2.12.0 +xgboost==1.6.2 \ No newline at end of file diff --git a/models/training-tuning-scripts/fraud-detection-models/training.py b/models/training-tuning-scripts/fraud-detection-models/training.py index 193058e6e0..80c7144a06 100644 --- a/models/training-tuning-scripts/fraud-detection-models/training.py +++ b/models/training-tuning-scripts/fraud-detection-models/training.py @@ -167,7 +167,7 @@ def inductive_step_hinsage(S, trained_model, inductive_node_identifiers, batch_s generator = HinSAGENodeGenerator(S, batch_size, num_samples, head_node_type="transaction") test_gen_not_shuffled = generator.flow(inductive_node_identifiers, shuffle=False) - inductive_emb = trained_model.predict(test_gen_not_shuffled, verbose=1) + inductive_emb = np.concatenate([trained_model.predict(row[0], verbose=1) for row in test_gen_not_shuffled]) inductive_emb = pd.DataFrame(inductive_emb, index=inductive_node_identifiers) return inductive_emb diff --git a/morpheus/stages/input/appshield_source_stage.py b/morpheus/stages/input/appshield_source_stage.py index c37e81f78c..9a800b7a83 100644 --- a/morpheus/stages/input/appshield_source_stage.py +++ b/morpheus/stages/input/appshield_source_stage.py @@ -294,7 +294,7 @@ def files_to_dfs(x: typing.List[str], cols_include: typing.List[str], cols_exclude: typing.List[str], plugins_include: typing.List[str], - encoding: str) -> pd.DataFrame: + encoding: str) -> typing.Dict[str, pd.DataFrame]: """ Load plugin files into a dataframe, then segment the dataframe by source. diff --git a/morpheus/utils/monitor_utils.py b/morpheus/utils/monitor_utils.py index e6c36122ca..7bc473828a 100644 --- a/morpheus/utils/monitor_utils.py +++ b/morpheus/utils/monitor_utils.py @@ -16,6 +16,7 @@ import typing from functools import reduce +import fsspec from tqdm import TMonitor from tqdm import TqdmSynchronisationWarning from tqdm import tqdm @@ -315,7 +316,7 @@ def check_df(y): elif (isinstance(x, list)): item_count_fn = self.auto_count_fn(x[0]) return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0) - elif (isinstance(x, str)): + elif (isinstance(x, (str, fsspec.core.OpenFile))): return lambda y: 1 elif (hasattr(x, "__len__")): return len # Return len directly (same as `lambda y: len(y)`) diff --git a/pyproject.toml b/pyproject.toml index a374aa34f7..6d559c6eea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,8 @@ markers = [ "use_cudf: Test supports cuDF datasets", "use_pandas: Test supports Pandas datasets", "replace_callback: Replaces the results_callback in cli", - "reload_modules: Reloads a set of python modules after running the current test" + "reload_modules: Reloads a set of python modules after running the current test", + "import_mod: Import python modules not currently in the Python search path by file name", ] filterwarnings = [ diff --git a/tests/conftest.py b/tests/conftest.py index d9a0c1cc0c..bee7a03421 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -112,6 +112,14 @@ def pytest_addoption(parser: pytest.Parser): help="A specific log level to use during testing. Defaults to WARNING if not set.", ) + parser.addoption( + "--fail_missing", + action="store_true", + dest="fail_missing", + help=("Tests requiring unmet dependencies are normally skipped. " + "Setting this flag will instead cause them to be reported as a failure"), + ) + def pytest_generate_tests(metafunc: pytest.Metafunc): """ @@ -449,110 +457,28 @@ def chdir_tmpdir(request: pytest.FixtureRequest, tmp_path): @pytest.fixture(scope="function") -def dataset(df_type: typing.Literal['cudf', 'pandas']): - """ - Yields a DatasetLoader instance with `df_type` as the default DataFrame type. - Users of this fixture can still explicitly request either a cudf or pandas dataframe with the `cudf` and `pandas` - properties: - ``` - def test_something(dataset: DatasetManager): - df = dataset["filter_probs.csv"] # type will match the df_type parameter - if dataset.default_df_type == 'pandas': - assert isinstance(df, pd.DataFrame) - else: - assert isinstance(df, cudf.DataFrame) - - pdf = dataset.pandas["filter_probs.csv"] - cdf = dataset.cudf["filter_probs.csv"] - - ``` - - A test that requests this fixture will parameterize on the type of DataFrame returned by the DatasetManager. - If a test requests both this fixture and the `use_cpp` fixture, or indirectly via the `config` fixture, then - the test will parameterize over both df_type:[cudf, pandas] and use_cpp[True, False]. However it will remove the - df_type=pandas & use_cpp=True combinations as this will cause an unsupported usage of Pandas dataframes with the - C++ implementation of message classes. - - This behavior can also be overridden by using the `use_cudf`, `use_pandas`, `use_cpp` or `use_pandas` marks ex: - ``` - # This test will only run once with C++ enabled and cudf dataframes - @pytest.mark.use_cpp - def test something(dataset: DatasetManager): - ... - # This test will run once for each dataframe type, with C++ disabled both times - @pytest.mark.use_python - def test something(dataset: DatasetManager): - ... - # This test will run twice with C++ mode enabled/disabled, using cudf dataframes both times - @pytest.mark.use_cudf - def test something(use_cpp: bool, dataset: DatasetManager): - ... - # This test will run only once - @pytest.mark.use_cudf - @pytest.mark.use_python - def test something(dataset: DatasetManager): - ... - # This test creates an incompatible combination and will raise a RuntimeError without being executed - @pytest.mark.use_pandas - @pytest.mark.use_cpp - def test something(dataset: DatasetManager): - ``` - - Users who don't want to parametarize over the DataFrame should use the `dataset_pandas` or `dataset_cudf` fixtures. - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type=df_type) - - -@pytest.fixture(scope="function") -def dataset_pandas(): - """ - Yields a DatasetLoader instance with pandas as the default DataFrame type. - - Note: This fixture won't prevent a user from writing a test requiring C++ mode execution and requesting Pandas - dataframes. This is quite useful for tests like `tests/test_add_scores_stage_pipe.py` where we want to test with - both Python & C++ executions, but we use Pandas to build up the expected DataFrame to validate the test against. - - In addition to this, users can use this fixture to explicitly request a cudf Dataframe as well, allowing for a test - that looks like: - ``` - @pytest.mark.use_cpp - def test_something(dataset_pandas: DatasetManager): - input_df = dataset_pandas.cudf["filter_probs.csv"] # Feed our source stage a cudf DF - - # Perform pandas transformations to mimic the add scores stage - expected_df = dataset["filter_probs.csv"] - expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, class_labels))) - ``` - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type='pandas') +def reset_plugin_manger(): + from morpheus.cli.plugin_manager import PluginManager + PluginManager._singleton = None + yield @pytest.fixture(scope="function") -def dataset_cudf(): - """ - Yields a DatasetLoader instance with cudf as the default DataFrame type. - - Users who wish to have both cudf and pandas DataFrames can do so with this fixture and using the `pandas` property: - def test_something(dataset_cudf: DatasetManager): - cdf = dataset_cudf["filter_probs.csv"] - pdf = dataset_cudf.pandas["filter_probs.csv"] - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type='cudf') +def reset_global_stage_registry(): + from morpheus.cli.stage_registry import GlobalStageRegistry + from morpheus.cli.stage_registry import StageRegistry + GlobalStageRegistry._global_registry = StageRegistry() + yield @pytest.fixture(scope="function") -def filter_probs_df(dataset, use_cpp: bool): +def reset_plugins(reset_plugin_manger, reset_global_stage_registry): """ - Shortcut fixture for loading the filter_probs.csv dataset. - - Unless your test uses the `use_pandas` or `use_cudf` marks this fixture will parametarize over the two dataframe - types. Similarly unless your test uses the `use_cpp` or `use_python` marks this fixture will also parametarize over - that as well, while excluding the combination of C++ execution and Pandas dataframes. + Reset both the plugin manager and the global stage gregistry. + Some of the tests for examples import modules dynamically, which in some cases can cause register_stage to be + called more than once for the same stage. """ - yield dataset["filter_probs.csv"] + yield def wait_for_camouflage(host="localhost", port=8000, timeout=5): @@ -745,6 +671,16 @@ def _wrap_set_log_level(log_level: int): set_log_level(old_level) +@pytest.fixture(scope="session") +def fail_missing(pytestconfig: pytest.Config) -> bool: + """ + Returns the value of the `fail_missing` flag, when false tests requiring unmet dependencies will be skipped, when + True they will fail. + """ + yield pytestconfig.getoption("fail_missing") + + +# ==== Logging Fixtures ==== @pytest.fixture(scope="function") def reset_loglevel(): """ @@ -802,3 +738,108 @@ def loglevel_fatal(): # ==== DataFrame Fixtures ==== +@pytest.fixture(scope="function") +def dataset(df_type: typing.Literal['cudf', 'pandas']): + """ + Yields a DatasetLoader instance with `df_type` as the default DataFrame type. + Users of this fixture can still explicitly request either a cudf or pandas dataframe with the `cudf` and `pandas` + properties: + ``` + def test_something(dataset: DatasetManager): + df = dataset["filter_probs.csv"] # type will match the df_type parameter + if dataset.default_df_type == 'pandas': + assert isinstance(df, pd.DataFrame) + else: + assert isinstance(df, cudf.DataFrame) + + pdf = dataset.pandas["filter_probs.csv"] + cdf = dataset.cudf["filter_probs.csv"] + + ``` + + A test that requests this fixture will parameterize on the type of DataFrame returned by the DatasetManager. + If a test requests both this fixture and the `use_cpp` fixture, or indirectly via the `config` fixture, then + the test will parameterize over both df_type:[cudf, pandas] and use_cpp[True, False]. However it will remove the + df_type=pandas & use_cpp=True combinations as this will cause an unsupported usage of Pandas dataframes with the + C++ implementation of message classes. + + This behavior can also be overridden by using the `use_cudf`, `use_pandas`, `use_cpp` or `use_pandas` marks ex: + ``` + # This test will only run once with C++ enabled and cudf dataframes + @pytest.mark.use_cpp + def test something(dataset: DatasetManager): + ... + # This test will run once for each dataframe type, with C++ disabled both times + @pytest.mark.use_python + def test something(dataset: DatasetManager): + ... + # This test will run twice with C++ mode enabled/disabled, using cudf dataframes both times + @pytest.mark.use_cudf + def test something(use_cpp: bool, dataset: DatasetManager): + ... + # This test will run only once + @pytest.mark.use_cudf + @pytest.mark.use_python + def test something(dataset: DatasetManager): + ... + # This test creates an incompatible combination and will raise a RuntimeError without being executed + @pytest.mark.use_pandas + @pytest.mark.use_cpp + def test something(dataset: DatasetManager): + ``` + + Users who don't want to parametarize over the DataFrame should use the `dataset_pandas` or `dataset_cudf` fixtures. + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type=df_type) + + +@pytest.fixture(scope="function") +def dataset_pandas(): + """ + Yields a DatasetLoader instance with pandas as the default DataFrame type. + + Note: This fixture won't prevent a user from writing a test requiring C++ mode execution and requesting Pandas + dataframes. This is quite useful for tests like `tests/test_add_scores_stage_pipe.py` where we want to test with + both Python & C++ executions, but we use Pandas to build up the expected DataFrame to validate the test against. + + In addition to this, users can use this fixture to explicitly request a cudf Dataframe as well, allowing for a test + that looks like: + ``` + @pytest.mark.use_cpp + def test_something(dataset_pandas: DatasetManager): + input_df = dataset_pandas.cudf["filter_probs.csv"] # Feed our source stage a cudf DF + + # Perform pandas transformations to mimic the add scores stage + expected_df = dataset["filter_probs.csv"] + expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, class_labels))) + ``` + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type='pandas') + + +@pytest.fixture(scope="function") +def dataset_cudf(): + """ + Yields a DatasetLoader instance with cudf as the default DataFrame type. + + Users who wish to have both cudf and pandas DataFrames can do so with this fixture and using the `pandas` property: + def test_something(dataset_cudf: DatasetManager): + cdf = dataset_cudf["filter_probs.csv"] + pdf = dataset_cudf.pandas["filter_probs.csv"] + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type='cudf') + + +@pytest.fixture(scope="function") +def filter_probs_df(dataset, use_cpp: bool): + """ + Shortcut fixture for loading the filter_probs.csv dataset. + + Unless your test uses the `use_pandas` or `use_cudf` marks this fixture will parametarize over the two dataframe + types. Similarly unless your test uses the `use_cpp` or `use_python` marks this fixture will also parametarize over + that as well, while excluding the combination of C++ execution and Pandas dataframes. + """ + yield dataset["filter_probs.csv"] diff --git a/tests/examples/gnn_fraud_detection_pipeline/conftest.py b/tests/examples/gnn_fraud_detection_pipeline/conftest.py new file mode 100644 index 0000000000..ed8e690878 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/conftest.py @@ -0,0 +1,140 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import pytest + +from utils import TEST_DIRS +from utils import import_or_skip + +SKIP_REASON = ("Tests for the gnn_fraud_detection_pipeline example require a number of packages not installed in the " + "Morpheus development environment. See `examples/gnn_fraud_detection_pipeline/README.md` for details on " + "installing these additional dependencies") + + +@pytest.fixture(autouse=True, scope='session') +def stellargraph(fail_missing: bool): + """ + All of the tests in this subdir require stellargraph + """ + yield import_or_skip("stellargraph", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture(autouse=True, scope='session') +def cuml(fail_missing: bool): + """ + All of the tests in this subdir require cuml + """ + yield import_or_skip("cuml", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture(autouse=True, scope='session') +def tensorflow(fail_missing: bool): + """ + All of the tests in this subdir require tensorflow + """ + yield import_or_skip("tensorflow", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture +def config(config): + """ + The GNN fraud detection pipeline utilizes the "other" pipeline mode. + """ + from morpheus.config import PipelineModes + config.mode = PipelineModes.OTHER + yield config + + +@pytest.fixture +def example_dir(): + yield os.path.join(TEST_DIRS.examples_dir, 'gnn_fraud_detection_pipeline') + + +@pytest.fixture +def training_file(example_dir: str): + yield os.path.join(example_dir, 'training.csv') + + +@pytest.fixture +def hinsage_model(example_dir: str): + yield os.path.join(example_dir, 'model/hinsage-model.pt') + + +@pytest.fixture +def xgb_model(example_dir: str): + yield os.path.join(example_dir, 'model/xgb-model.pt') + + +# Some of the code inside gnn_fraud_detection_pipeline performs some relative imports in the form of: +# from .mod import Class +# For this reason we need to ensure that the examples dir is in the sys.path first +@pytest.fixture +def gnn_fraud_detection_pipeline(request: pytest.FixtureRequest, restore_sys_path, reset_plugins): + sys.path.append(TEST_DIRS.examples_dir) + import gnn_fraud_detection_pipeline + yield gnn_fraud_detection_pipeline + + +@pytest.fixture +def test_data(): + """ + Construct test data, a small DF of 10 rows which we will build a graph from + The nodes in our graph will be the unique values from each of our three columns, and the index is also + representing our transaction ids. + There is only one duplicated value (2697) in our dataset so we should expect 29 nodes + Our expected edges will be each value in client_node and merchant_node to their associated index value ex: + (795, 2) & (8567, 2) + thus we should expect 20 edges, although 2697 is duplicated in the client_node column we should expect two + unique edges for each entry (2697, 14) & (2697, 91) + """ + import pandas as pd + index = [2, 14, 16, 26, 41, 42, 70, 91, 93, 95] + client_data = [795, 2697, 5531, 415, 2580, 3551, 6547, 2697, 3503, 7173] + merchant_data = [8567, 4609, 2781, 7844, 629, 6915, 7071, 570, 2446, 8110] + + df_data = { + 'index': index, + 'client_node': client_data, + 'merchant_node': merchant_data, + 'fraud_label': [1 for _ in range(len(index))] + } + + # Fill in the other columns so that we match the shape the model is expecting + for i in range(1000, 1113): + # these two values are skipped, apparently place-holders for client_node & merchant_node + if i not in (1002, 1003): + df_data[str(i)] = [0 for _ in range(len(index))] + + df = pd.DataFrame(df_data, index=index) + + expected_nodes = set(index + client_data + merchant_data) + assert len(expected_nodes) == 29 # ensuring test data & assumptions are correct + + expected_edges = set() + for data in (client_data, merchant_data): + for (i, val) in enumerate(data): + expected_edges.add((val, index[i])) + + assert len(expected_edges) == 20 # ensuring test data & assumptions are correct + + yield dict(index=index, + client_data=client_data, + merchant_data=merchant_data, + df=df, + expected_nodes=expected_nodes, + expected_edges=expected_edges) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py new file mode 100644 index 0000000000..ae164ecace --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eithe r express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestClassificationStage: + + def test_constructor(self, + config: Config, + xgb_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + cuml: types.ModuleType): + from gnn_fraud_detection_pipeline.stages.classification_stage import ClassificationStage + + stage = ClassificationStage(config, xgb_model) + assert isinstance(stage._xgb_model, cuml.ForestInference) + + def test_process_message(self, + config: Config, + xgb_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + dataset_cudf: DatasetManager): + from gnn_fraud_detection_pipeline.stages.classification_stage import ClassificationStage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEMultiMessage + + df = dataset_cudf['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + df.rename(lambda x: "ind_emb_{}".format(x), axis=1, inplace=True) + + expected_df = dataset_cudf.pandas['examples/gnn_fraud_detection_pipeline/predictions.csv'] + assert len(df) == len(expected_df) + + # The exact values of the node_identifiers aren't important to this stage, we just need to verify that they're + # inserted into a "node_id" column in the DF + node_identifiers = expected_df['node_id'].tolist() + + ind_emb_columns = list(df.columns) + + meta = MessageMeta(df) + msg = GraphSAGEMultiMessage(meta=meta, + node_identifiers=node_identifiers, + inductive_embedding_column_names=ind_emb_columns) + + stage = ClassificationStage(config, xgb_model) + results = stage._process_message(msg) + print(results.get_meta(['prediction', 'node_id'])) + + # The stage actually edits the message in place, and returns it, but we don't need to assert that + dataset_cudf.assert_compare_df(results.get_meta(['prediction', 'node_id']), expected_df) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py new file mode 100644 index 0000000000..d39c00d994 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import types +import typing +from io import StringIO + +import pandas as pd +import pytest + +import cudf + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage +from utils import TEST_DIRS + + +@pytest.mark.use_python +@pytest.mark.import_mod( + [os.path.join(TEST_DIRS.examples_dir, 'gnn_fraud_detection_pipeline/stages/graph_construction_stage.py')]) +class TestGraphConstructionStage: + + def test_constructor(self, config: Config, training_file: str, import_mod: typing.List[types.ModuleType]): + graph_construction_stage = import_mod[0] + stage = graph_construction_stage.FraudGraphConstructionStage(config, training_file) + assert isinstance(stage._training_data, cudf.DataFrame) + + # The training datafile contains many more columns than this, but these are the four columns + # that are depended upon in the code + assert {'client_node', 'index', 'fraud_label', 'merchant_node'}.issubset(stage._column_names) + + def _check_graph( + self, + stellargraph: types.ModuleType, + sg: "stellargraph.StellarGraph", # noqa: F821 + expected_nodes, + expected_edges): + assert isinstance(sg, stellargraph.StellarGraph) + sg.check_graph_for_ml(features=True, expensive_check=True) # this will raise if it doesn't pass + assert not sg.is_directed() + + nodes = sg.nodes() + assert set(nodes) == expected_nodes + + edges = sg.edges() + assert set(edges) == expected_edges + + def test_graph_construction(self, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + df = test_data['df'] + + client_features = pd.DataFrame({0: 1}, index=list(set(test_data['client_data']))) + merchant_features = pd.DataFrame({0: 1}, index=test_data['merchant_data']) + + # Call _graph_construction + sg = graph_construction_stage.FraudGraphConstructionStage._graph_construction( + nodes={ + 'client': df.client_node, 'merchant': df.merchant_node, 'transaction': df.index + }, + edges=[ + zip(df.client_node, df.index), + zip(df.merchant_node, df.index), + ], + node_features={ + "transaction": df[['client_node', 'merchant_node']], + "client": client_features, + "merchant": merchant_features + }) + + self._check_graph(stellargraph, sg, test_data['expected_nodes'], test_data['expected_edges']) + + def test_build_graph_features(self, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + sg = graph_construction_stage.FraudGraphConstructionStage._build_graph_features(test_data['df']) + self._check_graph(stellargraph, sg, test_data['expected_nodes'], test_data['expected_edges']) + + def test_process_message(self, + config: Config, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + df = test_data['df'] + + # The stage wants a csv file from the first 5 rows + training_data = StringIO(df[0:5].to_csv(index=False)) + stage = graph_construction_stage.FraudGraphConstructionStage(config, training_data) + + # Since we used the first 5 rows as the training data, send the second 5 as inference data + meta = MessageMeta(cudf.DataFrame(df)) + mm = MultiMessage(meta=meta, mess_offset=5, mess_count=5) + fgmm = stage._process_message(mm) + + assert isinstance(fgmm, graph_construction_stage.FraudGraphMultiMessage) + assert fgmm.meta is meta + assert fgmm.mess_offset == 5 + assert fgmm.mess_count == 5 + + self._check_graph(stellargraph, fgmm.graph, test_data['expected_nodes'], test_data['expected_edges']) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py new file mode 100644 index 0000000000..b8a449de48 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py @@ -0,0 +1,103 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + +import cudf + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestGraphSageStage: + + def test_constructor(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + tensorflow): + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + stage = GraphSAGEStage(config, + model_hinsage_file=hinsage_model, + batch_size=10, + sample_size=[4, 64], + record_id="test_id", + target_node="test_node") + + assert isinstance(stage._keras_model, tensorflow.keras.models.Model) + assert stage._batch_size == 10 + assert stage._sample_size == [4, 64] + assert stage._record_id == "test_id" + assert stage._target_node == "test_node" + + def test_inductive_step_hinsage(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + test_data: dict, + dataset_pandas: DatasetManager): + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphConstructionStage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + + # The column names in the saved test data will be strings, in the results they will be ints + expected_df = dataset_pandas['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + expected_df.rename(lambda x: int(x), axis=1, inplace=True) + + df = test_data['df'] + + graph = FraudGraphConstructionStage._build_graph_features(df) + + stage = GraphSAGEStage(config, model_hinsage_file=hinsage_model) + results = stage._inductive_step_hinsage(graph, stage._keras_model, test_data['index']) + + assert isinstance(results, cudf.DataFrame) + assert results.index.to_arrow().to_pylist() == test_data['index'] + dataset_pandas.assert_compare_df(results, expected_df) + + def test_process_message(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + test_data: dict, + dataset_pandas: DatasetManager): + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphConstructionStage + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphMultiMessage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEMultiMessage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + + expected_df = dataset_pandas['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + expected_df.rename(lambda x: "ind_emb_{}".format(x), axis=1, inplace=True) + + df = test_data['df'] + meta = MessageMeta(cudf.DataFrame(df)) + graph = FraudGraphConstructionStage._build_graph_features(df) + msg = FraudGraphMultiMessage(meta=meta, graph=graph) + + stage = GraphSAGEStage(config, model_hinsage_file=hinsage_model) + results = stage._process_message(msg) + + assert isinstance(results, GraphSAGEMultiMessage) + assert results.meta is meta + assert results.mess_offset == 0 + assert results.mess_count == len(df) + assert results.node_identifiers == test_data['index'] + assert sorted(results.inductive_embedding_column_names) == sorted(expected_df.columns) + + ind_emb_df = results.get_meta(results.inductive_embedding_column_names) + dataset_pandas.assert_compare_df(ind_emb_df.to_pandas(), expected_df) diff --git a/tests/examples/log_parsing/test_postprocessing.py b/tests/examples/log_parsing/test_postprocessing.py index f40d936437..ec2c6e9f4a 100644 --- a/tests/examples/log_parsing/test_postprocessing.py +++ b/tests/examples/log_parsing/test_postprocessing.py @@ -73,4 +73,4 @@ def test_log_parsing_post_processing_stage(config: Config, out_meta = stage._postprocess(post_proc_message) assert isinstance(out_meta, MessageMeta) - DatasetManager.assert_df_equal(out_meta._df, expected_df) + DatasetManager.assert_compare_df(out_meta._df, expected_df) diff --git a/tests/examples/ransomware_detection/conftest.py b/tests/examples/ransomware_detection/conftest.py new file mode 100644 index 0000000000..5a52f30449 --- /dev/null +++ b/tests/examples/ransomware_detection/conftest.py @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import pytest +import yaml + +from utils import TEST_DIRS +from utils import import_or_skip + +SKIP_REASON = ("Tests for the ransomware_detection example require a number of packages not installed in the Morpheus " + "development environment. See `/home/dagardner/work/morpheus/examples/ransomware_detection/README.md` " + "for details on installing these additional dependencies") + + +@pytest.fixture(autouse=True, scope='session') +def dask_distributed(fail_missing: bool): + """ + All of the tests in this subdir requires dask.distributed + """ + yield import_or_skip("dask.distributed", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture +def config(config): + """ + The ransomware detection pipeline utilizes the FIL pipeline mode. + """ + from morpheus.config import PipelineModes + config.mode = PipelineModes.FIL + yield config + + +@pytest.fixture +def example_dir(): + yield os.path.join(TEST_DIRS.examples_dir, 'ransomware_detection') + + +@pytest.fixture +def conf_file(example_dir): + yield os.path.join(example_dir, 'config/ransomware_detection.yaml') + + +@pytest.fixture +def rwd_conf(conf_file): + with open(conf_file, encoding='UTF-8') as fh: + conf = yaml.safe_load(fh) + + yield conf + + +@pytest.fixture +def interested_plugins(): + yield ['ldrmodules', 'threadlist', 'envars', 'vadinfo', 'handles'] + + +# Some of the code inside ransomware_detection performs imports in the form of: +# from common.... +# For this reason we need to ensure that the examples/ransomware_detection dir is in the sys.path first +@pytest.fixture(autouse=True) +def ransomware_detection_in_sys_path(request: pytest.FixtureRequest, restore_sys_path, reset_plugins, example_dir): + sys.path.append(example_dir) diff --git a/tests/examples/ransomware_detection/test_create_features.py b/tests/examples/ransomware_detection/test_create_features.py new file mode 100644 index 0000000000..5b0fccc2b1 --- /dev/null +++ b/tests/examples/ransomware_detection/test_create_features.py @@ -0,0 +1,198 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import glob +import os +import types +import typing +from unittest import mock + +import pytest + +from morpheus.config import Config +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import AppShieldMessageMeta +from morpheus.pipeline.multi_message_stage import MultiMessageStage +from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage +from utils import TEST_DIRS +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestCreateFeaturesRWStage: + + def test_constructor(self, + config: Config, + dask_distributed: types.ModuleType, + rwd_conf: dict, + interested_plugins: typing.List[str]): + from common.data_models import FeatureConfig + from common.feature_extractor import FeatureExtractor + from stages.create_features import CreateFeaturesRWStage + + n_workers = 12 + threads_per_worker = 8 + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=n_workers, + threads_per_worker=threads_per_worker) + + assert isinstance(stage, MultiMessageStage) + assert isinstance(stage._client, dask_distributed.Client) + scheduler_info = stage._client.scheduler_info() + len(scheduler_info['workers']) == n_workers + for worker in scheduler_info['workers'].values(): + assert worker['nthreads'] == threads_per_worker + + assert isinstance(stage._feature_config, FeatureConfig) + assert stage._feature_config.file_extns == rwd_conf['file_extensions'] + assert stage._feature_config.interested_plugins == interested_plugins + + assert stage._feas_all_zeros == {c: 0 for c in rwd_conf['model_features']} + + assert isinstance(stage._fe, FeatureExtractor) + assert stage._fe._config is stage._feature_config + + @mock.patch('stages.create_features.Client') + def test_on_next(self, + mock_dask_client, + config: Config, + rwd_conf: dict, + interested_plugins: typing.List[str], + dataset_pandas: DatasetManager): + from stages.create_features import CreateFeaturesRWStage + + test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/ransomware_detection') + + mock_dask_client.return_value = mock_dask_client + mock_dask_client.map.return_value = mock.MagicMock() + + dask_results = dataset_pandas[os.path.join(test_data_dir, 'dask_results.csv')] + + mock_dask_future = mock.MagicMock() + mock_dask_future.result.return_value = dask_results + mock_dask_client.submit.return_value = mock_dask_future + + input_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield', 'snapshot-1', '*.json') + input_data = AppShieldSourceStage.files_to_dfs(glob.glob(input_glob), + cols_include=rwd_conf['raw_columns'], + cols_exclude=["SHA256"], + plugins_include=interested_plugins, + encoding='latin1') + + input_metas = AppShieldSourceStage._build_metadata(input_data) + + # Make sure the input test date looks the way we expect it + assert len(input_metas) == 1 + input_meta = input_metas[0] + assert input_meta.source == 'appshield' + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + # make sure we have a mocked dask client + assert stage._client is mock_dask_client + + meta = stage.on_next(input_meta) + assert isinstance(meta, AppShieldMessageMeta) + assert meta.source == input_meta.source + + expected_df = dataset_pandas[os.path.join(test_data_dir, 'dask_results.csv')] + expected_df['source_pid_process'] = 'appshield_' + expected_df.pid_process + expected_df.sort_values(by=["pid_process", "snapshot_id"], inplace=True) + expected_df.reset_index(drop=True, inplace=True) + dataset_pandas.assert_compare_df(meta.copy_dataframe(), expected_df) + + @mock.patch('stages.create_features.Client') + def test_create_multi_messages(self, + mock_dask_client, + config: Config, + rwd_conf: dict, + interested_plugins: typing.List[str], + dataset_pandas: DatasetManager): + from stages.create_features import CreateFeaturesRWStage + mock_dask_client.return_value = mock_dask_client + + pids = [75956, 118469, 1348612, 2698363, 2721362, 2788672] + df = dataset_pandas["filter_probs.csv"] + df['pid_process'] = [ + 2788672, + 75956, + 75956, + 2788672, + 2788672, + 2698363, + 2721362, + 118469, + 1348612, + 2698363, + 118469, + 2698363, + 1348612, + 118469, + 75956, + 2721362, + 75956, + 118469, + 118469, + 118469 + ] + df = df.sort_values(by=["pid_process"]).reset_index(drop=True) + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + meta = AppShieldMessageMeta(df, source='tests') + multi_messages = stage.create_multi_messages(meta) + assert len(multi_messages) == len(pids) + + prev_loc = 0 + for (i, mm) in enumerate(multi_messages): + assert isinstance(mm, MultiMessage) + pid = pids[i] + (mm.get_meta(['pid_process']) == pid).all() + assert mm.mess_offset == prev_loc + prev_loc = mm.mess_offset + mm.mess_count + + assert prev_loc == len(df) + + @mock.patch('stages.create_features.Client') + def test_on_completed(self, mock_dask_client, config: Config, rwd_conf: dict, interested_plugins: typing.List[str]): + from stages.create_features import CreateFeaturesRWStage + mock_dask_client.return_value = mock_dask_client + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + assert stage._client is mock_dask_client + mock_dask_client.close.assert_not_called() + + stage.on_completed() + + mock_dask_client.close.assert_called_once() diff --git a/tests/examples/ransomware_detection/test_preprocessing.py b/tests/examples/ransomware_detection/test_preprocessing.py new file mode 100644 index 0000000000..af86df90b1 --- /dev/null +++ b/tests/examples/ransomware_detection/test_preprocessing.py @@ -0,0 +1,164 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cupy as cp +import pandas as pd +import pytest + +from morpheus.config import Config +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import AppShieldMessageMeta +from morpheus.messages.multi_inference_message import MultiInferenceFILMessage +from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestPreprocessingRWStage: + + def test_constructor(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + assert isinstance(stage, PreprocessBaseStage) + assert stage._feature_columns == rwd_conf['model_features'] + assert stage._features_len == len(rwd_conf['model_features']) + assert stage._snapshot_dict == {} + assert len(stage._padding_data) == len(rwd_conf['model_features']) * 6 + for i in stage._padding_data: + assert i == 0 + + def test_sliding_window_offsets(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + + window = 3 + ids = [17, 18, 19, 20, 21, 22, 23, 31, 32, 33] + results = stage._sliding_window_offsets(ids, len(ids), window=window) + assert results == [(0, 3), (1, 4), (2, 5), (3, 6), (4, 7), (7, 10)] + + # Non-consecutive ids don't create sliding windows + stage._sliding_window_offsets(list(reversed(ids)), len(ids), window=window) == [] + + def test_sliding_window_offsets_errors(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + + # ids_len doesn't match the length of the ids list + with pytest.raises(AssertionError): + stage._sliding_window_offsets(ids=[5, 6, 7], ids_len=12, window=2) + + # Window is larger than ids + with pytest.raises(AssertionError): + stage._sliding_window_offsets(ids=[5, 6, 7], ids_len=3, window=4) + + def test_rollover_pending_snapshots(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [5, 8, 10, 13] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + assert len(df) == len(snapshot_ids) + + # The snapshot_id's in the test data set are all '1', set them to different values + df['snapshot_id'] = snapshot_ids + df.index = df.snapshot_id + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + stage._rollover_pending_snapshots(snapshot_ids, source_pid_process, df) + + assert list(stage._snapshot_dict.keys()) == [source_pid_process] + + # Due to the sliding window we should have all but the first snapshot_id in the results + expected_snapshot_ids = snapshot_ids[1:] + snapshots = stage._snapshot_dict[source_pid_process] + + assert len(snapshots) == len(expected_snapshot_ids) + for (i, snapshot) in enumerate(snapshots): + expected_snapshot_id = expected_snapshot_ids[i] + assert snapshot.snapshot_id == expected_snapshot_id + expected_data = df.loc[expected_snapshot_id].fillna('').values + assert (pd.Series(snapshot.data).fillna('').values == expected_data).all() + + def test_rollover_pending_snapshots_empty_results(self, + config: Config, + rwd_conf: dict, + dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + stage._rollover_pending_snapshots(snapshot_ids, source_pid_process, df) + assert len(stage._snapshot_dict) == 0 + + def test_merge_curr_and_prev_snapshots(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from common.data_models import SnapshotData + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [5, 8, 10, 13] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + assert len(df) == len(snapshot_ids) + df['snapshot_id'] = snapshot_ids + df.index = df.snapshot_id + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + test_row_8 = df.loc[8].copy(deep=True) + test_row_8.pid_process = 'test_val1' + + test_row_13 = df.loc[13].copy(deep=True) + test_row_13.pid_process = 'test_val2' + + stage._snapshot_dict = { + source_pid_process: [SnapshotData(8, test_row_8.values), SnapshotData(13, test_row_13.values)] + } + + expected_df = dataset_pandas['examples/ransomware_detection/dask_results.csv'].fillna('') + expected_df['pid_process'][1] = 'test_val1' + expected_df['pid_process'][3] = 'test_val2' + + expected_df['snapshot_id'] = snapshot_ids + expected_df.index = expected_df.snapshot_id + + stage._merge_curr_and_prev_snapshots(df, source_pid_process) + dataset_pandas.assert_compare_df(df.fillna(''), expected_df) + + def test_pre_process_batch(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + df['source_pid_process'] = 'appshield_' + df.pid_process + expected_df = df.copy(deep=True).fillna('') + meta = AppShieldMessageMeta(df=df, source='tests') + mm = MultiMessage(meta=meta) + + sliding_window = 4 + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=sliding_window) + results = stage._pre_process_batch(mm) + assert isinstance(results, MultiInferenceFILMessage) + + expected_df['sequence'] = ['dummy' for _ in range(len(expected_df))] + expected_input__0 = cp.asarray([0 for i in range(len(rwd_conf['model_features']) * sliding_window)]) + expected_seq_ids = cp.zeros((len(expected_df), 3), dtype=cp.uint32) + expected_seq_ids[:, 0] = cp.arange(0, len(expected_df), dtype=cp.uint32) + expected_seq_ids[:, 2] = len(rwd_conf['model_features']) * 3 + + dataset_pandas.assert_compare_df(results.get_meta().fillna(''), expected_df) + assert (results.get_tensor('input__0') == expected_input__0).all() + assert (results.get_tensor('seq_ids') == expected_seq_ids).all() diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index a8abaade05..5b90235d1b 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -65,9 +65,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) # Same thing but change the probs tensor name message = MultiResponseMessage(meta=MessageMeta(df), @@ -76,9 +76,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) # Fail in missing probs data message = MultiResponseMessage(meta=MessageMeta(df), diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index ab7a1fa144..e08642d7bc 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -63,9 +63,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=None) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) # Same thing but change the probs tensor name message = MultiResponseMessage(meta=MessageMeta(df), @@ -74,9 +74,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=None) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) # Fail in missing probs data message = MultiResponseMessage(meta=MessageMeta(df), diff --git a/tests/test_deserialize_stage_pipe.py b/tests/test_deserialize_stage_pipe.py index dbda223bd4..68e55e30cb 100755 --- a/tests/test_deserialize_stage_pipe.py +++ b/tests/test_deserialize_stage_pipe.py @@ -44,7 +44,7 @@ def test_fixing_non_unique_indexes(use_cpp: bool, dataset: DatasetManager): assert not meta.has_sliceable_index() assert "_index_" not in meta.df.columns - assert dataset.assert_df_equal(meta.df, df) + dataset.assert_df_equal(meta.df, df) DeserializeStage.process_dataframe(meta, 5, ensure_sliceable_index=True) diff --git a/tests/test_error_pipe.py b/tests/test_error_pipe.py new file mode 100755 index 0000000000..09fae093d3 --- /dev/null +++ b/tests/test_error_pipe.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing + +import mrc +import pandas as pd +import pytest + +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from utils.stages.error_raiser import ErrorRaiserStage + + +class InMemSourceXStage(SingleOutputSource): + """ + InMemorySourceStage subclass that emits whatever you give it and doesn't assume the source data + is a dataframe. + """ + + def __init__(self, c: Config, data: typing.List[typing.Any]): + super().__init__(c) + self._data = data + + @property + def name(self) -> str: + return "from-data" + + def supports_cpp_node(self) -> bool: + return False + + def _emit_data(self) -> typing.Iterator[typing.Any]: + for x in self._data: + yield x + + def _build_source(self, builder: mrc.Builder) -> StreamPair: + node = builder.make_source(self.unique_name, self._emit_data()) + return node, type(self._data[0]) + + +@pytest.mark.parametrize("exception_cls", [RuntimeError, ValueError, NotImplementedError]) +def test_stage_raises_exception(config: Config, filter_probs_df: pd.DataFrame, exception_cls: type[Exception]): + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) + error_raiser_stage = pipe.add_stage(ErrorRaiserStage(config, exception_cls=exception_cls)) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + with pytest.raises(exception_cls): + pipe.run() + + # Ensure that the raised exception was from our stage and not from something else + assert error_raiser_stage.error_raised + assert len(sink_stage.get_messages()) == 0 + + +@pytest.mark.use_python +@pytest.mark.parametrize("delayed_start", [False, True]) +def test_monitor_not_impl(config: Config, delayed_start: bool): + + class UnsupportedType: + pass + + pipe = LinearPipeline(config) + pipe.set_source(InMemSourceXStage(config, [UnsupportedType()])) + monitor_stage = pipe.add_stage(MonitorStage(config, log_level=logging.WARNING, delayed_start=delayed_start)) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + assert monitor_stage._mc.is_enabled() + + with pytest.raises(NotImplementedError): + pipe.run() + + assert len(sink_stage.get_messages()) == 0 diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index eab5664bc9..a52549de66 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -197,7 +197,7 @@ def test_read_cpp_compare(input_file: str): CppConfig.set_should_use_cpp(True) df_cpp = read_file_to_df(input_file, df_type='cudf') - assert DatasetManager.assert_df_equal(df_python, df_cpp) + DatasetManager.assert_df_equal(df_python, df_cpp) @pytest.mark.slow diff --git a/tests/test_message_meta.py b/tests/test_message_meta.py index 57ac593dc9..105c732d69 100644 --- a/tests/test_message_meta.py +++ b/tests/test_message_meta.py @@ -130,10 +130,10 @@ def test_copy_dataframe(df: cudf.DataFrame): copied_df = meta.copy_dataframe() - assert DatasetManager.assert_df_equal(copied_df, df), "Should be identical" + DatasetManager.assert_df_equal(copied_df, df), "Should be identical" assert copied_df is not df, "But should be different instances" # Try setting a single value on the copy cdf = meta.copy_dataframe() cdf['v2'].iloc[3] = 47 - assert DatasetManager.assert_df_equal(meta.copy_dataframe(), df), "Should be identical" + DatasetManager.assert_df_equal(meta.copy_dataframe(), df), "Should be identical" diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index 8e9a22fe14..896fb1b18a 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -21,6 +21,7 @@ import typing from unittest import mock +import fsspec import mrc import pytest @@ -99,27 +100,38 @@ def test_refresh(mock_morph_tqdm, config): mock_morph_tqdm.refresh.assert_called_once() -def test_auto_count_fn(config): +@pytest.mark.parametrize('value,expected_fn,expected', + [ + (None, False, None), + ([], False, None), + (['s'], True, 1), + ('s', True, 1), + ('test', True, 1), + (cudf.DataFrame(), True, 0), + (cudf.DataFrame(range(12), columns=["test"]), True, 12), + (MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"]))), True, 12), + ({}, True, 0), + (tuple(), True, 0), + (set(), True, 0), + (fsspec.open_files(os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv')), True, 1), + ]) +def test_auto_count_fn(config, value: typing.Any, expected_fn: bool, expected: typing.Union[int, None]): m = MonitorStage(config, log_level=logging.WARNING) - assert m._mc.auto_count_fn(None) is None - assert m._mc.auto_count_fn([]) is None + auto_fn = m._mc.auto_count_fn(value) + if expected_fn: + assert callable(auto_fn) + assert auto_fn(value) == expected + else: + assert auto_fn is None - # Ints not supported, lists are, but lists of unsupported are also unsupported - pytest.raises(NotImplementedError, m._mc.auto_count_fn, 1) - pytest.raises(NotImplementedError, m._mc.auto_count_fn, [1]) - # Just verify that we get a valid function for each supported type - assert inspect.isfunction(m._mc.auto_count_fn(['s'])) - assert inspect.isfunction(m._mc.auto_count_fn('s')) - assert inspect.isfunction(m._mc.auto_count_fn(cudf.DataFrame())) - assert inspect.isfunction( - m._mc.auto_count_fn(MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"]))))) +@pytest.mark.parametrize('value', [1, [1], [2, 0]]) +def test_auto_count_fn_not_impl(config, value: typing.Any): + m = MonitorStage(config, log_level=logging.WARNING) - # Other iterables return the len function - assert m._mc.auto_count_fn({}) is len - assert m._mc.auto_count_fn(()) is len - assert m._mc.auto_count_fn(set()) is len + with pytest.raises(NotImplementedError): + m._mc.auto_count_fn(value) @mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') diff --git a/tests/test_multi_message.py b/tests/test_multi_message.py index 524f3b60d3..5431f85f31 100644 --- a/tests/test_multi_message.py +++ b/tests/test_multi_message.py @@ -120,18 +120,18 @@ def _test_get_meta(df: typing.Union[cudf.DataFrame, pd.DataFrame]): # Manually slice the dataframe according to the multi settings df_sliced: cudf.DataFrame = df.iloc[multi.mess_offset:multi.mess_offset + multi.mess_count, :] - assert DatasetManager.assert_df_equal(multi.get_meta(), df_sliced) + DatasetManager.assert_df_equal(multi.get_meta(), df_sliced) # Make sure we return a table here, not a series col_name = df_sliced.columns[0] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) col_name = [df_sliced.columns[0], df_sliced.columns[2]] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) # Out of order columns col_name = [df_sliced.columns[3], df_sliced.columns[0]] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) # Should fail with missing column with pytest.raises(KeyError): @@ -139,7 +139,7 @@ def _test_get_meta(df: typing.Union[cudf.DataFrame, pd.DataFrame]): # Finally, check that we dont overwrite the original dataframe multi.get_meta(col_name).iloc[:] = 5 - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) def test_get_meta(filter_probs_df: typing.Union[cudf.DataFrame, pd.DataFrame]): @@ -167,10 +167,10 @@ def test_set_meta(use_cpp: bool, dataset: DatasetManager): def test_value(columns, value): multi.set_meta(columns, value) - assert dataset.assert_df_equal(multi.get_meta(columns), value) + dataset.assert_df_equal(multi.get_meta(columns), value) # Now make sure the original dataframe is untouched - assert dataset.assert_df_equal(df_saved[saved_mask], meta.df[saved_mask]) + dataset.assert_df_equal(df_saved[saved_mask], meta.df[saved_mask]) single_column = "v2" two_columns = ["v1", "v3"] @@ -207,17 +207,17 @@ def _test_set_meta_new_column(df: typing.Union[cudf.DataFrame, pd.DataFrame], df # Set a list val_to_set = list(range(multi.mess_count)) multi.set_meta("list_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("list_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("list_column"), val_to_set) # Set a string val_to_set = "string to set" multi.set_meta("string_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("string_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("string_column"), val_to_set) # Set a date val_to_set = pd.date_range("2018-01-01", periods=multi.mess_count, freq="H") multi.set_meta("date_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("date_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("date_column"), val_to_set) if (df_type == "cudf"): # cudf isnt capable of setting more than one new column at a time @@ -226,7 +226,7 @@ def _test_set_meta_new_column(df: typing.Union[cudf.DataFrame, pd.DataFrame], df # Now set one with new and old columns val_to_set = np.random.randn(multi.mess_count, 2) multi.set_meta(["v2", "new_column2"], val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta(["v2", "new_column2"]), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta(["v2", "new_column2"]), val_to_set) def test_set_meta_new_column(use_cpp: bool, dataset: DatasetManager): @@ -272,7 +272,7 @@ def _test_copy_ranges(df: typing.Union[cudf.DataFrame, pd.DataFrame]): assert mm2.meta.df is not df assert mm2.mess_offset == 0 assert mm2.mess_count == 6 - 2 - assert DatasetManager.assert_df_equal(mm2.get_meta(), df.iloc[2:6]) + DatasetManager.assert_df_equal(mm2.get_meta(), df.iloc[2:6]) # slice two different ranges of rows mm3 = mm.copy_ranges([(2, 6), (12, 15)]) @@ -293,7 +293,7 @@ def _test_copy_ranges(df: typing.Union[cudf.DataFrame, pd.DataFrame]): expected_df = concat_fn([df.iloc[2:6], df.iloc[12:15]]) - assert DatasetManager.assert_df_equal(mm3.get_meta(), expected_df) + DatasetManager.assert_df_equal(mm3.get_meta(), expected_df) def test_copy_ranges(filter_probs_df: typing.Union[cudf.DataFrame, pd.DataFrame]): @@ -367,54 +367,54 @@ def _test_get_slice_values(df: typing.Union[cudf.DataFrame, pd.DataFrame]): multi_full = MultiMessage(meta=meta) # Single slice - assert DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta(), df.iloc[3:8]) + DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta(), df.iloc[3:8]) # Single slice with one columns - assert DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta("v1"), df.iloc[3:8]["v1"]) + DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta("v1"), df.iloc[3:8]["v1"]) # Single slice with multiple columns - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(3, 8).get_meta(["v4", "v3", "v1"]), df.iloc[3:8][["v4", "v3", "v1"]]) # Chained slice - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta(), df.iloc[2 + 5:(2 + 5) + (9 - 5)]) # Chained slice one column - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta("v1"), df.iloc[2 + 5:(2 + 5) + (9 - 5)]["v1"]) # Chained slice multi column - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta(["v4", "v3", "v1"]), df.iloc[2 + 5:(2 + 5) + (9 - 5)][["v4", "v3", "v1"]]) # Set values multi_full.get_slice(4, 10).set_meta(None, 1.15) - assert DatasetManager.assert_df_equal(multi_full.get_slice(4, 10).get_meta(), df.iloc[4:10]) + DatasetManager.assert_df_equal(multi_full.get_slice(4, 10).get_meta(), df.iloc[4:10]) # Set values one column multi_full.get_slice(1, 6).set_meta("v3", 5.3) - assert DatasetManager.assert_df_equal(multi_full.get_slice(1, 6).get_meta("v3"), df.iloc[1:6]["v3"]) + DatasetManager.assert_df_equal(multi_full.get_slice(1, 6).get_meta("v3"), df.iloc[1:6]["v3"]) # Set values multi column multi_full.get_slice(5, 8).set_meta(["v4", "v1", "v3"], 7) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(5, 8).get_meta(["v4", "v1", "v3"]), df.iloc[5:8][["v4", "v1", "v3"]]) # Chained Set values multi_full.get_slice(10, 20).get_slice(1, 4).set_meta(None, 8) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(1, 4).get_meta(), df.iloc[10 + 1:(10 + 1) + (4 - 1)]) # Chained Set values one column multi_full.get_slice(10, 20).get_slice(3, 5).set_meta("v4", 112) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(3, 5).get_meta("v4"), df.iloc[10 + 3:(10 + 3) + (5 - 3)]["v4"]) # Chained Set values multi column multi_full.get_slice(10, 20).get_slice(5, 8).set_meta(["v4", "v1", "v2"], 22) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(5, 8).get_meta(["v4", "v1", "v2"]), df.iloc[10 + 5:(10 + 5) + (8 - 5)][["v4", "v1", "v2"]]) @@ -769,7 +769,7 @@ def test_tensor_slicing(use_cpp: bool, dataset: DatasetManager): assert multi_slice.mess_count == equiv_slice.mess_count assert multi_slice.offset == equiv_slice.offset assert multi_slice.count == equiv_slice.count - assert dataset.assert_df_equal(multi_slice.get_meta(), equiv_slice.get_meta()) + dataset.assert_df_equal(multi_slice.get_meta(), equiv_slice.get_meta()) # Finally, compare a double slice to a single memory = InferenceMemory(count=tensor_count, tensors={"seq_ids": seq_ids, "probs": probs}) @@ -781,4 +781,4 @@ def test_tensor_slicing(use_cpp: bool, dataset: DatasetManager): assert double_slice.offset == single_slice.offset assert double_slice.count == single_slice.count assert cp.all(double_slice.get_tensor("probs") == single_slice.get_tensor("probs")) - assert dataset.assert_df_equal(double_slice.get_meta(), single_slice.get_meta()) + dataset.assert_df_equal(double_slice.get_meta(), single_slice.get_meta()) diff --git a/tests/test_write_to_kafka_stage_pipe.py b/tests/test_write_to_kafka_stage_pipe.py index 6e7bad114d..397b1c680e 100644 --- a/tests/test_write_to_kafka_stage_pipe.py +++ b/tests/test_write_to_kafka_stage_pipe.py @@ -25,8 +25,7 @@ from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils import compare_df -from utils import assert_results +from utils.dataset_manager import DatasetManager if (typing.TYPE_CHECKING): from kafka import KafkaConsumer @@ -35,7 +34,7 @@ @pytest.mark.kafka @pytest.mark.use_cudf def test_write_to_kafka_stage_pipe(config, - filter_probs_df, + dataset_cudf: DatasetManager, kafka_bootstrap_servers: str, kafka_consumer: "KafkaConsumer", kafka_topics: typing.Tuple[str, str]) -> None: @@ -43,6 +42,8 @@ def test_write_to_kafka_stage_pipe(config, Even though WriteToKafkaStage only has a Python impl, testing with both C++ and Python execution to ensure it works just as well with the C++ impls of the message classes. """ + + filter_probs_df = dataset_cudf['filter_probs.csv'] pipe = LinearPipeline(config) pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) @@ -62,4 +63,4 @@ def test_write_to_kafka_stage_pipe(config, assert len(output_df) == len(filter_probs_df) - assert_results(compare_df.compare_df(filter_probs_df.to_pandas(), output_df)) + dataset_cudf.assert_compare_df(filter_probs_df, output_df) diff --git a/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv new file mode 100644 index 0000000000..3c8eae2372 --- /dev/null +++ b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:278c304032df75c9c2e13d0367c08ad4d652fb77a86a50ea7b1d26c37e282a76 +size 8142 diff --git a/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv b/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv new file mode 100644 index 0000000000..87256ef044 --- /dev/null +++ b/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4baf153412823c07ebc35684ef88e2237aeef159ba0857dd98ef80190c639e44 +size 168 diff --git a/tests/tests_data/examples/ransomware_detection/dask_results.csv b/tests/tests_data/examples/ransomware_detection/dask_results.csv new file mode 100644 index 0000000000..57392631fd --- /dev/null +++ b/tests/tests_data/examples/ransomware_detection/dask_results.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d88740c15ee80c199e5d1a59fc1c469ba188e3717fa88f2ece0286df70860b2a +size 4456 diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index fa25defca5..d854bd988d 100755 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -17,8 +17,11 @@ import json import os import time +import types import typing +import pytest + from morpheus.io.deserializers import read_file_to_df from .test_directories import TestDirectories @@ -138,3 +141,18 @@ def assert_results(results: dict) -> dict: assert results["diff_cols"] == 0, f"Expected diff_cols=0 : {results}" assert results["diff_rows"] == 0, f"Expected diff_rows=0 : {results}" return results + + +def import_or_skip(modname: str, + minversion: str = None, + reason: str = None, + fail_missing: bool = False) -> types.ModuleType: + """ + Wrapper for `pytest.importorskip` will re-raise any `Skipped` exceptions as `ImportError` if `fail_missing` is True. + """ + try: + return pytest.importorskip(modname, minversion=minversion, reason=reason) + except pytest.skip.Exception as e: + if fail_missing: + raise ImportError(e) + raise diff --git a/tests/utils/dataset_manager.py b/tests/utils/dataset_manager.py index 61fdd839da..05f67b1498 100644 --- a/tests/utils/dataset_manager.py +++ b/tests/utils/dataset_manager.py @@ -24,7 +24,9 @@ import cudf as cdf # rename to avoid clash with property method from morpheus.io.deserializers import read_file_to_df +from morpheus.utils import compare_df from utils import TEST_DIRS +from utils import assert_results class DatasetManager(object): @@ -180,19 +182,47 @@ def dup_index(cls, # Return a new dataframe where we replace some index values with others return cls.replace_index(df, replace_dict) - @staticmethod - def assert_df_equal(df_to_check: typing.Union[pd.DataFrame, cdf.DataFrame], val_to_check: typing.Any) -> bool: + def value_as_pandas(val: typing.Union[pd.DataFrame, cdf.DataFrame, cdf.Series], assert_is_pandas=True): + if (isinstance(val, cdf.DataFrame) or isinstance(val, cdf.Series)): + return val.to_pandas() + + if assert_is_pandas: + assert isinstance(val, (pd.DataFrame, pd.Series)), type(val) + + return val + + @classmethod + def assert_df_equal(cls, df_to_check: typing.Union[pd.DataFrame, cdf.DataFrame], val_to_check: typing.Any): """Compare a DataFrame against a validation dataset which can either be a DataFrame, Series or CuPy array.""" # Comparisons work better in cudf so convert everything to that - if (isinstance(df_to_check, cdf.DataFrame) or isinstance(df_to_check, cdf.Series)): - df_to_check = df_to_check.to_pandas() + df_to_check = cls.value_as_pandas(df_to_check) - if (isinstance(val_to_check, cdf.DataFrame) or isinstance(val_to_check, cdf.Series)): - val_to_check = val_to_check.to_pandas() - elif (isinstance(val_to_check, cp.ndarray)): + if (isinstance(val_to_check, cp.ndarray)): val_to_check = val_to_check.get() + else: + val_to_check = cls.value_as_pandas(val_to_check, assert_is_pandas=False) bool_df = df_to_check == val_to_check - return bool(bool_df.all(axis=None)) + assert bool(bool_df.all(axis=None)) + + @classmethod + def compare_df(cls, + dfa: typing.Union[pd.DataFrame, cdf.DataFrame], + dfb: typing.Union[pd.DataFrame, cdf.DataFrame], + **compare_args): + """ + Wrapper for morpheus.utils.compare_df.compare_df + """ + return compare_df.compare_df(cls.value_as_pandas(dfa), cls.value_as_pandas(dfb), **compare_args) + + @classmethod + def assert_compare_df(cls, + dfa: typing.Union[pd.DataFrame, cdf.DataFrame], + dfb: typing.Union[pd.DataFrame, cdf.DataFrame], + **compare_args): + """ + Convenience method for calling compare_df and asserting that the results are the same + """ + assert_results(cls.compare_df(dfa, dfb, **compare_args)) diff --git a/tests/utils/stages/error_raiser.py b/tests/utils/stages/error_raiser.py new file mode 100644 index 0000000000..aef1ded487 --- /dev/null +++ b/tests/utils/stages/error_raiser.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import mrc +from mrc.core import operators as ops + +from morpheus.config import Config +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.atomic_integer import AtomicInteger + + +class ErrorRaiserStage(SinglePortStage): + """ + Stage that raises an exception in the on_data method + """ + + def __init__(self, config: Config, exception_cls: type[Exception] = RuntimeError, raise_on: int = 0): + assert raise_on >= 0 + + super().__init__(config) + self._exception_cls = exception_cls + self._raise_on = raise_on + self._counter = AtomicInteger(0) + self._error_raised = False + + @property + def name(self) -> str: + return "error-raiser" + + def accepted_types(self) -> typing.Tuple: + return (typing.Any, ) + + def supports_cpp_node(self) -> bool: + return False + + def on_data(self, message: typing.Any): + count = self._counter.get_and_inc() + if count >= self._raise_on: + self._error_raised = True + raise self._exception_cls(f"ErrorRaiserStage: raising exception on message {count}") + return message + + @property + def error_raised(self) -> bool: + return self._error_raised + + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + node = builder.make_node(self.unique_name, ops.map(self.on_data)) + builder.make_edge(input_stream[0], node) + + return node, input_stream[1]