Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unittests #444

Merged
10 commits merged into from
Nov 7, 2022
2 changes: 1 addition & 1 deletion ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export S3_URL="s3://rapids-downloads/ci/morpheus"
export DISPLAY_URL="https://downloads.rapids.ai/ci/morpheus"
export ARTIFACT_ENDPOINT="/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}"
export ARTIFACT_URL="${S3_URL}${ARTIFACT_ENDPOINT}"
export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}/pull-request/${PR_NUM}/${GIT_COMMIT}/${NVARCH}/"
export DISPLAY_ARTIFACT_URL="${DISPLAY_URL}${ARTIFACT_ENDPOINT}/"

# Set sccache env vars
export SCCACHE_S3_KEY_PREFIX=morpheus-${NVARCH}
Expand Down
33 changes: 27 additions & 6 deletions morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,26 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build()
// When index_col_count is 0 this will cause a new range index to be created
auto meta = MessageMeta::create_from_cpp(std::move(data_table), index_col_count);

// Always push at least 1
output.on_next(meta);
// next_meta stores a copy of the upcoming meta
std::shared_ptr<MessageMeta> next_meta = nullptr;

for (cudf::size_type repeat_idx = 1; repeat_idx < m_repeat; ++repeat_idx)
for (cudf::size_type repeat_idx = 0; repeat_idx < m_repeat; ++repeat_idx)
{
// Clone the previous meta object
if (!output.is_subscribed())
{
// Grab the GIL before disposing, just in case
pybind11::gil_scoped_acquire gil;

// Reset meta to allow the DCHECK after the loop to pass
meta.reset();

break;
}

// Clone the meta object before pushing while we still have access to it
if (repeat_idx + 1 < m_repeat)
{
// GIL must come after get_info
pybind11::gil_scoped_acquire gil;

// Use the copy function
Expand All @@ -82,12 +95,20 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build()

df.attr("index") = index + df_len;

meta = MessageMeta::create_from_python(std::move(df));
next_meta = MessageMeta::create_from_python(std::move(df));
}

output.on_next(meta);
DCHECK(meta) << "Cannot push null meta";

output.on_next(std::move(meta));

// Move next_meta into meta
std::swap(meta, next_meta);
}

DCHECK(!meta) << "meta was not properly pushed";
DCHECK(!next_meta) << "next_meta was not properly pushed";

output.on_completed();
};
}
Expand Down
18 changes: 7 additions & 11 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,15 @@ def _generate_frames(self):
df_type="cudf",
)

count = 0

for _ in range(self._repeat_count):
for i in range(self._repeat_count):

x = MessageMeta(df)

yield x

count += 1

# If we are looping, copy and shift the index
if (self._repeat_count > 0):
prev_df = df
df = prev_df.copy()
# If we are looping, copy the object. Do this before we push the object in case it changes
if (i + 1 < self._repeat_count):
df = df.copy()

# Shift the index to allow for unique indices without reading more data
df.index += len(df)

yield x
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ build-backend = "setuptools.build_meta"
# These show up when querying `pytest --markers`
[tool.pytest.ini_options]
markers = [
"benchmark: Benchmarks",
"slow: Slow tests",
"kafka: Tests that require a running instance of kafka",
"use_cpp: Test support C++ nodes and objects",
Expand Down
8 changes: 4 additions & 4 deletions tests/benchmarks/test_bench_e2e_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
def test_sid_nlp_e2e(benchmark, tmp_path):

config = Config()
Expand Down Expand Up @@ -160,7 +160,7 @@ def test_sid_nlp_e2e(benchmark, tmp_path):
benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_abp_fil_e2e(benchmark, tmp_path):

config = Config()
Expand All @@ -185,7 +185,7 @@ def test_abp_fil_e2e(benchmark, tmp_path):
benchmark(fil_pipeline, config, input_filepath, repeat, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_phishing_nlp_e2e(benchmark, tmp_path):

config = Config()
Expand All @@ -207,7 +207,7 @@ def test_phishing_nlp_e2e(benchmark, tmp_path):
benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name)


@pytest.mark.slow
@pytest.mark.benchmark
def test_cloudtrail_ae_e2e(benchmark, tmp_path):

config = Config()
Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/test_bench_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame):
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
@pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000])
def test_monitor_stage(benchmark, num_messages):

Expand Down
2 changes: 1 addition & 1 deletion tests/benchmarks/test_bench_serialize_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def build_and_run_pipeline(config: Config,
pipeline.run()


@pytest.mark.slow
@pytest.mark.benchmark
@pytest.mark.parametrize("num_messages", [1, 100, 10000])
@pytest.mark.parametrize("output_type", ["json", "csv"])
def test_monitor_stage(benchmark, num_messages, output_type):
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ def pytest_addoption(parser: pytest.Parser):
help="Run kafka tests that would otherwise be skipped",
)

parser.addoption(
"--run_benchmark",
action="store_true",
dest="run_benchmark",
help="Run benchmark tests that would otherwise be skipped",
)


def pytest_generate_tests(metafunc: pytest.Metafunc):
"""
Expand Down Expand Up @@ -133,6 +140,10 @@ def pytest_runtest_setup(item):
if (item.get_closest_marker("kafka") is not None):
pytest.skip("Skipping Kafka tests by default. Use --run_kafka to enable")

if (not item.config.getoption("--run_benchmark")):
if (item.get_closest_marker("benchmark") is not None):
pytest.skip("Skipping benchmark tests by default. Use --run_benchmark to enable")


def pytest_collection_modifyitems(config, items):
"""
Expand Down
9 changes: 6 additions & 3 deletions tests/test_add_scores_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ def test_add_scores_stage_pipe(config, tmp_path, order, pipeline_batch_size, rep
assert output_np.tolist() == expected.tolist()


def test_add_scores_stage_multi_segment_pipe(config, tmp_path):
@pytest.mark.parametrize('repeat', [1, 2, 5])
def test_add_scores_stage_multi_segment_pipe(config, tmp_path, repeat):
# Intentionally using FileSourceStage's repeat argument as this triggers a bug in #443
config.class_labels = ['frogs', 'lizards', 'toads', 'turtles']

input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.csv')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False, repeat=repeat))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
Expand All @@ -102,7 +104,8 @@ def test_add_scores_stage_multi_segment_pipe(config, tmp_path):

assert os.path.exists(out_file)

expected = np.loadtxt(input_file, delimiter=",", skiprows=1)
expected_data = np.loadtxt(input_file, delimiter=",", skiprows=1)
expected = np.concatenate([expected_data for _ in range(repeat)])

# The output data will contain an additional id column that we will need to slice off
# also somehow 0.7 ends up being 0.7000000000000001
Expand Down