diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index c2872e3569..89598e7cbc 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -51,7 +51,7 @@ export S3_URL="s3://rapids-downloads/ci/morpheus" export DISPLAY_URL="https://downloads.rapids.ai/ci/morpheus" export ARTIFACT_ENDPOINT="/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}" export ARTIFACT_URL="${S3_URL}${ARTIFACT_ENDPOINT}" -export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}/" +export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/" # Set sccache env vars export SCCACHE_S3_KEY_PREFIX=morpheus-${NVARCH} diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index 428d29f03f..46eda28522 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -64,13 +64,26 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() // When index_col_count is 0 this will cause a new range index to be created auto meta = MessageMeta::create_from_cpp(std::move(data_table), index_col_count); - // Always push at least 1 - output.on_next(meta); + // next_meta stores a copy of the upcoming meta + std::shared_ptr next_meta = nullptr; - for (cudf::size_type repeat_idx = 1; repeat_idx < m_repeat; ++repeat_idx) + for (cudf::size_type repeat_idx = 0; repeat_idx < m_repeat; ++repeat_idx) { - // Clone the previous meta object + if (!output.is_subscribed()) { + // Grab the GIL before disposing, just in case + pybind11::gil_scoped_acquire gil; + + // Reset meta to allow the DCHECK after the loop to pass + meta.reset(); + + break; + } + + // Clone the meta object before pushing while we still have access to it + if (repeat_idx + 1 < m_repeat) + { + // GIL must come after get_info pybind11::gil_scoped_acquire gil; // Use the copy function @@ -82,12 +95,20 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() df.attr("index") = index + df_len; - meta = MessageMeta::create_from_python(std::move(df)); + next_meta = MessageMeta::create_from_python(std::move(df)); } - output.on_next(meta); + DCHECK(meta) << "Cannot push null meta"; + + output.on_next(std::move(meta)); + + // Move next_meta into meta + std::swap(meta, next_meta); } + DCHECK(!meta) << "meta was not properly pushed"; + DCHECK(!next_meta) << "next_meta was not properly pushed"; + output.on_completed(); }; } diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 91cacda4bc..a1be432d8a 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -143,19 +143,15 @@ def _generate_frames(self): df_type="cudf", ) - count = 0 - - for _ in range(self._repeat_count): + for i in range(self._repeat_count): x = MessageMeta(df) - yield x - - count += 1 - - # If we are looping, copy and shift the index - if (self._repeat_count > 0): - prev_df = df - df = prev_df.copy() + # If we are looping, copy the object. Do this before we push the object in case it changes + if (i + 1 < self._repeat_count): + df = df.copy() + # Shift the index to allow for unique indices without reading more data df.index += len(df) + + yield x diff --git a/pyproject.toml b/pyproject.toml index 3b03b11dd3..c34ce19508 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ build-backend = "setuptools.build_meta" # These show up when querying `pytest --markers` [tool.pytest.ini_options] markers = [ + "benchmark: Benchmarks", "slow: Slow tests", "kafka: Tests that require a running instance of kafka", "use_cpp: Test support C++ nodes and objects", diff --git a/tests/benchmarks/test_bench_e2e_pipelines.py b/tests/benchmarks/test_bench_e2e_pipelines.py index 2c5c152673..ba23ed03c0 100644 --- a/tests/benchmarks/test_bench_e2e_pipelines.py +++ b/tests/benchmarks/test_bench_e2e_pipelines.py @@ -127,7 +127,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark def test_sid_nlp_e2e(benchmark, tmp_path): config = Config() @@ -160,7 +160,7 @@ def test_sid_nlp_e2e(benchmark, tmp_path): benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_abp_fil_e2e(benchmark, tmp_path): config = Config() @@ -185,7 +185,7 @@ def test_abp_fil_e2e(benchmark, tmp_path): benchmark(fil_pipeline, config, input_filepath, repeat, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_phishing_nlp_e2e(benchmark, tmp_path): config = Config() @@ -207,7 +207,7 @@ def test_phishing_nlp_e2e(benchmark, tmp_path): benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name) -@pytest.mark.slow +@pytest.mark.benchmark def test_cloudtrail_ae_e2e(benchmark, tmp_path): config = Config() diff --git a/tests/benchmarks/test_bench_monitor_stage.py b/tests/benchmarks/test_bench_monitor_stage.py index ba2c991b21..e5e60c8e1a 100644 --- a/tests/benchmarks/test_bench_monitor_stage.py +++ b/tests/benchmarks/test_bench_monitor_stage.py @@ -45,7 +45,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame): pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000]) def test_monitor_stage(benchmark, num_messages): diff --git a/tests/benchmarks/test_bench_serialize_stage.py b/tests/benchmarks/test_bench_serialize_stage.py index ca251cd599..a554a16db2 100644 --- a/tests/benchmarks/test_bench_serialize_stage.py +++ b/tests/benchmarks/test_bench_serialize_stage.py @@ -47,7 +47,7 @@ def build_and_run_pipeline(config: Config, pipeline.run() -@pytest.mark.slow +@pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [1, 100, 10000]) @pytest.mark.parametrize("output_type", ["json", "csv"]) def test_monitor_stage(benchmark, num_messages, output_type): diff --git a/tests/conftest.py b/tests/conftest.py index e32fd3f140..9c6337699a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -96,6 +96,13 @@ def pytest_addoption(parser: pytest.Parser): help="Run kafka tests that would otherwise be skipped", ) + parser.addoption( + "--run_benchmark", + action="store_true", + dest="run_benchmark", + help="Run benchmark tests that would otherwise be skipped", + ) + def pytest_generate_tests(metafunc: pytest.Metafunc): """ @@ -133,6 +140,10 @@ def pytest_runtest_setup(item): if (item.get_closest_marker("kafka") is not None): pytest.skip("Skipping Kafka tests by default. Use --run_kafka to enable") + if (not item.config.getoption("--run_benchmark")): + if (item.get_closest_marker("benchmark") is not None): + pytest.skip("Skipping benchmark tests by default. Use --run_benchmark to enable") + def pytest_collection_modifyitems(config, items): """ diff --git a/tests/test_add_scores_stage_pipe.py b/tests/test_add_scores_stage_pipe.py index 52ab084e1f..42c2d39427 100755 --- a/tests/test_add_scores_stage_pipe.py +++ b/tests/test_add_scores_stage_pipe.py @@ -80,14 +80,16 @@ def test_add_scores_stage_pipe(config, tmp_path, order, pipeline_batch_size, rep assert output_np.tolist() == expected.tolist() -def test_add_scores_stage_multi_segment_pipe(config, tmp_path): +@pytest.mark.parametrize('repeat', [1, 2, 5]) +def test_add_scores_stage_multi_segment_pipe(config, tmp_path, repeat): + # Intentionally using FileSourceStage's repeat argument as this triggers a bug in #443 config.class_labels = ['frogs', 'lizards', 'toads', 'turtles'] input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, 'results.csv') pipe = LinearPipeline(config) - pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False)) + pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False, repeat=repeat)) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_segment_boundary(MultiMessage) @@ -102,7 +104,8 @@ def test_add_scores_stage_multi_segment_pipe(config, tmp_path): assert os.path.exists(out_file) - expected = np.loadtxt(input_file, delimiter=",", skiprows=1) + expected_data = np.loadtxt(input_file, delimiter=",", skiprows=1) + expected = np.concatenate([expected_data for _ in range(repeat)]) # The output data will contain an additional id column that we will need to slice off # also somehow 0.7 ends up being 0.7000000000000001