From af9bb17dae47c61c1581e5b89dbdb43b3bc6feef Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 11:29:16 -0700 Subject: [PATCH 01/40] Add tests to reproduce #1639 --- tests/test_sid.py | 91 ++++++++++++++++++++++++++--------------- tests/test_sid_kafka.py | 27 +++++++----- 2 files changed, 74 insertions(+), 44 deletions(-) diff --git a/tests/test_sid.py b/tests/test_sid.py index 67ca36161c..a0cc087137 100755 --- a/tests/test_sid.py +++ b/tests/test_sid.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os from unittest import mock @@ -25,6 +26,7 @@ from _utils import calc_error_val from _utils import compare_class_to_scores from _utils import mk_async_infer +from morpheus.config import Config from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline @@ -44,7 +46,13 @@ MODEL_MAX_BATCH_SIZE = 32 -def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name: str = "data"): +def _run_minibert_pipeline(*, + config: Config, + tmp_path: str, + model_name: str, + truncated: bool, + data_col_name: str = "data", + num_threads: int = 1): """ Runs just the Minibert Pipeline """ @@ -66,7 +74,7 @@ def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_nam config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -100,7 +108,8 @@ def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_nam column=data_col_name)) pipe.add_stage( TritonInferenceStage(config, model_name=model_name, server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=logging.INFO)) pipe.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="si_")) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( @@ -113,7 +122,13 @@ def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_nam return calc_error_val(results_file_name) -def _run_minibert(config, tmp_path, model_name, truncated, data_col_name: str = "data"): +def _run_minibert(*, + config: Config, + tmp_path: str, + model_name: str, + truncated: bool, + data_col_name: str = "data", + num_threads: int = 1): """ Runs the minibert pipeline and mocks the Triton Python interface """ @@ -145,44 +160,52 @@ def _run_minibert(config, tmp_path, model_name, truncated, data_col_name: str = async_infer = mk_async_infer(inf_results) mock_triton_client.async_infer.side_effect = async_infer - return _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name) + return _run_minibert_pipeline(config=config, + tmp_path=tmp_path, + model_name=model_name, + truncated=truncated, + data_col_name=data_col_name, + num_threads=num_threads) @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_no_trunc(config, tmp_path): +@pytest.mark.parametrize("num_threads", [1, 4]) +def test_minibert_no_trunc(config: Config, tmp_path: str, num_threads: int): - results = _run_minibert(config, tmp_path, "sid-minibert-onnx-no-trunc", False) + results = _run_minibert(config=config, + tmp_path=tmp_path, + model_name="sid-minibert-onnx-no-trunc", + truncated=False, + num_threads=num_threads) - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 18 - else: - assert results.diff_rows == 1333 + # When threading is enabled, the results returned from the mocked Triton server won't match the expected results + if num_threads == 1: + # Not sure why these are different + if (CppConfig.get_should_use_cpp()): + assert results.diff_rows == 18 + else: + assert results.diff_rows == 1333 @pytest.mark.slow @pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_truncated(config, tmp_path): - - results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True) - - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 1204 - else: - assert results.diff_rows == 1333 - - -@pytest.mark.slow -@pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_data_col_name(config, tmp_path): - - results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True, "definitely_not_data") - - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 1204 - else: - assert results.diff_rows == 1333 +@pytest.mark.parametrize("data_col_name", ["data", "definitely_not_data"]) +@pytest.mark.parametrize("num_threads", [1, 4]) +def test_minibert_truncated(config: Config, tmp_path: str, data_col_name: str, num_threads: int): + + results = _run_minibert(config=config, + tmp_path=tmp_path, + model_name='sid-minibert-onnx', + truncated=True, + data_col_name=data_col_name, + num_threads=num_threads) + + # When threading is enabled, the results returned from the mocked Triton server won't match the expected results + if num_threads == 1: + # Not sure why these are different + if (CppConfig.get_should_use_cpp()): + assert results.diff_rows == 1204 + else: + assert results.diff_rows == 1333 diff --git a/tests/test_sid_kafka.py b/tests/test_sid_kafka.py index ecc87de4b3..7c710db05a 100755 --- a/tests/test_sid_kafka.py +++ b/tests/test_sid_kafka.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import typing from io import StringIO @@ -52,13 +53,15 @@ @pytest.mark.kafka @pytest.mark.slow @pytest.mark.use_python +@pytest.mark.parametrize("num_threads", [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + num_threads: int): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT32", "shape": [-1, FEATURE_LENGTH] @@ -101,7 +104,7 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -137,20 +140,23 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, assert len(output_df) == len(val_df) - results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) - - assert results['diff_rows'] == 1333 + # When threading is enabled, the results returned from the mocked Triton server won't match the expected results + if num_threads == 1: + results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) + assert results['diff_rows'] == 1333 @pytest.mark.kafka @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") +@pytest.mark.parametrize("num_threads", [1, 4]) def test_minibert_cpp(dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + num_threads: int): config.mode = PipelineModes.NLP config.class_labels = [ "address", @@ -168,7 +174,7 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -207,6 +213,7 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, assert len(output_df) == len(val_df) - results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) - - assert results['diff_rows'] == 1204 + # When threading is enabled, the results returned from the mocked Triton server won't match the expected results + if num_threads == 1: + results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) + assert results['diff_rows'] == 1204 From 61d391c1fd4af247e86a0ed554c58c9233c80b07 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 12:21:10 -0700 Subject: [PATCH 02/40] More repo tests --- tests/test_abp.py | 43 +++++++++++++++++----------- tests/test_phishing.py | 15 ++++++---- tests/test_triton_inference_stage.py | 8 ++++-- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/tests/test_abp.py b/tests/test_abp.py index 86778bfdb6..734ceb83fc 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -51,8 +51,9 @@ @pytest.mark.slow @pytest.mark.use_python +@pytest.mark.parametrize('num_threads', [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): +def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, num_threads: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -83,7 +84,7 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -108,21 +109,24 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): pipe.run() compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + + if num_threads == 1: + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_abp_cpp(config, tmp_path): +@pytest.mark.parametrize('num_threads', [1, 4]) +def test_abp_cpp(config: Config, tmp_path: str, num_threads: int): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -151,14 +155,17 @@ def test_abp_cpp(config, tmp_path): pipe.run() compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + + if num_threads == 1: + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_python +@pytest.mark.parametrize('num_threads', [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): +def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, num_threads: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -189,7 +196,7 @@ def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -230,21 +237,24 @@ def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + + if num_threads == 1: + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_abp_multi_segment_cpp(config, tmp_path): +@pytest.mark.parametrize('num_threads', [1, 4]) +def test_abp_multi_segment_cpp(config: Config, tmp_path: str, num_threads: int): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -289,5 +299,6 @@ def test_abp_multi_segment_cpp(config, tmp_path): pipe.run() - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + if num_threads == 1: + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 diff --git a/tests/test_phishing.py b/tests/test_phishing.py index 4f434e993e..4ab9ba5ca6 100755 --- a/tests/test_phishing.py +++ b/tests/test_phishing.py @@ -23,6 +23,7 @@ from _utils import TEST_DIRS from _utils import calc_error_val from _utils import mk_async_infer +from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage @@ -44,7 +45,7 @@ @pytest.mark.slow @pytest.mark.use_python @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_email_no_cpp(mock_triton_client, config, tmp_path): +def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT64", "shape": [-1, FEATURE_LENGTH] @@ -104,6 +105,7 @@ def test_email_no_cpp(mock_triton_client, config, tmp_path): pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() + results = calc_error_val(results_file_name) assert results.diff_rows == 153 @@ -111,14 +113,15 @@ def test_email_no_cpp(mock_triton_client, config, tmp_path): @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_email_cpp(config, tmp_path): +@pytest.mark.parametrize('num_threads', [1, 4]) +def test_email_cpp(config: Config, tmp_path: str, num_threads: int): config.mode = PipelineModes.NLP config.class_labels = load_labels_file(os.path.join(TEST_DIRS.data_dir, "labels_phishing.txt")) config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'phishing-email-validation-data.jsonlines') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -147,5 +150,7 @@ def test_email_cpp(config, tmp_path): pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() - results = calc_error_val(results_file_name) - assert results.diff_rows == 682 + + if num_threads == 1: + results = calc_error_val(results_file_name) + assert results.diff_rows == 682 diff --git a/tests/test_triton_inference_stage.py b/tests/test_triton_inference_stage.py index a361c712a1..e36298f773 100644 --- a/tests/test_triton_inference_stage.py +++ b/tests/test_triton_inference_stage.py @@ -152,8 +152,9 @@ def test_stage_get_inference_worker(config: Config, pipeline_mode: PipelineModes @pytest.mark.slow @pytest.mark.use_python @pytest.mark.parametrize('num_records', [1000, 2000, 4000]) +@pytest.mark.parametrize('num_threads', [1, 4, 12]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_triton_stage_pipe(mock_triton_client, config, num_records): +def test_triton_stage_pipe(mock_triton_client: mock.MagicMock, config: Config, num_records: int, num_threads: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, 1] @@ -185,7 +186,7 @@ def test_triton_stage_pipe(mock_triton_client, config, num_records): config.pipeline_batch_size = 1024 config.feature_length = 1 config.edge_buffer_size = 128 - config.num_threads = 1 + config.num_threads = num_threads config.fil = ConfigFIL() config.fil.feature_columns = ['v'] @@ -202,4 +203,5 @@ def test_triton_stage_pipe(mock_triton_client, config, num_records): pipe.run() - assert_results(comp_stage.get_results()) + if num_threads == 1: + assert_results(comp_stage.get_results()) From 9cfed05e13ad2836c9e7c9f87609b92c10af7c83 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 12:24:01 -0700 Subject: [PATCH 03/40] Rollback change --- tests/test_sid_kafka.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/tests/test_sid_kafka.py b/tests/test_sid_kafka.py index 7c710db05a..ecc87de4b3 100755 --- a/tests/test_sid_kafka.py +++ b/tests/test_sid_kafka.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import typing from io import StringIO @@ -53,15 +52,13 @@ @pytest.mark.kafka @pytest.mark.slow @pytest.mark.use_python -@pytest.mark.parametrize("num_threads", [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer", - num_threads: int): + kafka_consumer: "KafkaConsumer"): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT32", "shape": [-1, FEATURE_LENGTH] @@ -104,7 +101,7 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -140,23 +137,20 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, assert len(output_df) == len(val_df) - # When threading is enabled, the results returned from the mocked Triton server won't match the expected results - if num_threads == 1: - results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) - assert results['diff_rows'] == 1333 + results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) + + assert results['diff_rows'] == 1333 @pytest.mark.kafka @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize("num_threads", [1, 4]) def test_minibert_cpp(dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer", - num_threads: int): + kafka_consumer: "KafkaConsumer"): config.mode = PipelineModes.NLP config.class_labels = [ "address", @@ -174,7 +168,7 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -213,7 +207,6 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, assert len(output_df) == len(val_df) - # When threading is enabled, the results returned from the mocked Triton server won't match the expected results - if num_threads == 1: - results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) - assert results['diff_rows'] == 1204 + results = compare_df(val_df, output_df, exclude_columns=[r'^ID$', r'^_ts_'], rel_tol=0.05) + + assert results['diff_rows'] == 1204 From 8f6209ac3031738494d8112671d8cd481db45e89 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 12:28:07 -0700 Subject: [PATCH 04/40] Only set the pe_count for python impls --- morpheus/stages/inference/inference_stage.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index e4111926e9..042f19701a 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -299,8 +299,10 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): else: node = builder.make_node(self.unique_name, ops.build(py_inference_fn)) - # Set the concurrency level to be up with the thread count - node.launch_options.pe_count = self._thread_count + # Set the concurrency level to be up with the thread count, intentionally only setting this for python + # implementations to avoid #1639 + node.launch_options.pe_count = self._thread_count + builder.make_edge(input_node, node) return node From 2d124adc78facfb2d84d88b6df1f8f42bb3801a4 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 13:31:25 -0700 Subject: [PATCH 05/40] Revert "More repo tests" This reverts commit 61d391c1fd4af247e86a0ed554c58c9233c80b07. --- tests/test_abp.py | 43 +++++++++++----------------- tests/test_phishing.py | 15 ++++------ tests/test_triton_inference_stage.py | 8 ++---- 3 files changed, 24 insertions(+), 42 deletions(-) diff --git a/tests/test_abp.py b/tests/test_abp.py index 734ceb83fc..86778bfdb6 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -51,9 +51,8 @@ @pytest.mark.slow @pytest.mark.use_python -@pytest.mark.parametrize('num_threads', [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, num_threads: int): +def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -84,7 +83,7 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -109,24 +108,21 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path pipe.run() compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - - if num_threads == 1: - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize('num_threads', [1, 4]) -def test_abp_cpp(config: Config, tmp_path: str, num_threads: int): +def test_abp_cpp(config, tmp_path): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -155,17 +151,14 @@ def test_abp_cpp(config: Config, tmp_path: str, num_threads: int): pipe.run() compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - - if num_threads == 1: - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_python -@pytest.mark.parametrize('num_threads', [1, 4]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, num_threads: int): +def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -196,7 +189,7 @@ def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock, config: Co config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -237,24 +230,21 @@ def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock, config: Co pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() - - if num_threads == 1: - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize('num_threads', [1, 4]) -def test_abp_multi_segment_cpp(config: Config, tmp_path: str, num_threads: int): +def test_abp_multi_segment_cpp(config, tmp_path): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 config.fil = ConfigFIL() config.fil.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_fil.txt')) @@ -299,6 +289,5 @@ def test_abp_multi_segment_cpp(config: Config, tmp_path: str, num_threads: int): pipe.run() - if num_threads == 1: - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 diff --git a/tests/test_phishing.py b/tests/test_phishing.py index 4ab9ba5ca6..4f434e993e 100755 --- a/tests/test_phishing.py +++ b/tests/test_phishing.py @@ -23,7 +23,6 @@ from _utils import TEST_DIRS from _utils import calc_error_val from _utils import mk_async_infer -from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage @@ -45,7 +44,7 @@ @pytest.mark.slow @pytest.mark.use_python @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str): +def test_email_no_cpp(mock_triton_client, config, tmp_path): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT64", "shape": [-1, FEATURE_LENGTH] @@ -105,7 +104,6 @@ def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_pa pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() - results = calc_error_val(results_file_name) assert results.diff_rows == 153 @@ -113,15 +111,14 @@ def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_pa @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize('num_threads', [1, 4]) -def test_email_cpp(config: Config, tmp_path: str, num_threads: int): +def test_email_cpp(config, tmp_path): config.mode = PipelineModes.NLP config.class_labels = load_labels_file(os.path.join(TEST_DIRS.data_dir, "labels_phishing.txt")) config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'phishing-email-validation-data.jsonlines') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -150,7 +147,5 @@ def test_email_cpp(config: Config, tmp_path: str, num_threads: int): pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() - - if num_threads == 1: - results = calc_error_val(results_file_name) - assert results.diff_rows == 682 + results = calc_error_val(results_file_name) + assert results.diff_rows == 682 diff --git a/tests/test_triton_inference_stage.py b/tests/test_triton_inference_stage.py index e36298f773..a361c712a1 100644 --- a/tests/test_triton_inference_stage.py +++ b/tests/test_triton_inference_stage.py @@ -152,9 +152,8 @@ def test_stage_get_inference_worker(config: Config, pipeline_mode: PipelineModes @pytest.mark.slow @pytest.mark.use_python @pytest.mark.parametrize('num_records', [1000, 2000, 4000]) -@pytest.mark.parametrize('num_threads', [1, 4, 12]) @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_triton_stage_pipe(mock_triton_client: mock.MagicMock, config: Config, num_records: int, num_threads: int): +def test_triton_stage_pipe(mock_triton_client, config, num_records): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, 1] @@ -186,7 +185,7 @@ def test_triton_stage_pipe(mock_triton_client: mock.MagicMock, config: Config, n config.pipeline_batch_size = 1024 config.feature_length = 1 config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 config.fil = ConfigFIL() config.fil.feature_columns = ['v'] @@ -203,5 +202,4 @@ def test_triton_stage_pipe(mock_triton_client: mock.MagicMock, config: Config, n pipe.run() - if num_threads == 1: - assert_results(comp_stage.get_results()) + assert_results(comp_stage.get_results()) From 1aec4401e66a9cb6ea0caaacc2d9b1eb9897c74e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 13:31:35 -0700 Subject: [PATCH 06/40] Revert "Add tests to reproduce #1639" This reverts commit af9bb17dae47c61c1581e5b89dbdb43b3bc6feef. --- tests/test_sid.py | 91 ++++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/tests/test_sid.py b/tests/test_sid.py index a0cc087137..67ca36161c 100755 --- a/tests/test_sid.py +++ b/tests/test_sid.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os from unittest import mock @@ -26,7 +25,6 @@ from _utils import calc_error_val from _utils import compare_class_to_scores from _utils import mk_async_infer -from morpheus.config import Config from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline @@ -46,13 +44,7 @@ MODEL_MAX_BATCH_SIZE = 32 -def _run_minibert_pipeline(*, - config: Config, - tmp_path: str, - model_name: str, - truncated: bool, - data_col_name: str = "data", - num_threads: int = 1): +def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name: str = "data"): """ Runs just the Minibert Pipeline """ @@ -74,7 +66,7 @@ def _run_minibert_pipeline(*, config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 - config.num_threads = num_threads + config.num_threads = 1 val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'sid-validation-data.csv') vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt') @@ -108,8 +100,7 @@ def _run_minibert_pipeline(*, column=data_col_name)) pipe.add_stage( TritonInferenceStage(config, model_name=model_name, server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=logging.INFO)) + pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) pipe.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="si_")) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( @@ -122,13 +113,7 @@ def _run_minibert_pipeline(*, return calc_error_val(results_file_name) -def _run_minibert(*, - config: Config, - tmp_path: str, - model_name: str, - truncated: bool, - data_col_name: str = "data", - num_threads: int = 1): +def _run_minibert(config, tmp_path, model_name, truncated, data_col_name: str = "data"): """ Runs the minibert pipeline and mocks the Triton Python interface """ @@ -160,52 +145,44 @@ def _run_minibert(*, async_infer = mk_async_infer(inf_results) mock_triton_client.async_infer.side_effect = async_infer - return _run_minibert_pipeline(config=config, - tmp_path=tmp_path, - model_name=model_name, - truncated=truncated, - data_col_name=data_col_name, - num_threads=num_threads) + return _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name) @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize("num_threads", [1, 4]) -def test_minibert_no_trunc(config: Config, tmp_path: str, num_threads: int): +def test_minibert_no_trunc(config, tmp_path): - results = _run_minibert(config=config, - tmp_path=tmp_path, - model_name="sid-minibert-onnx-no-trunc", - truncated=False, - num_threads=num_threads) + results = _run_minibert(config, tmp_path, "sid-minibert-onnx-no-trunc", False) - # When threading is enabled, the results returned from the mocked Triton server won't match the expected results - if num_threads == 1: - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 18 - else: - assert results.diff_rows == 1333 + # Not sure why these are different + if (CppConfig.get_should_use_cpp()): + assert results.diff_rows == 18 + else: + assert results.diff_rows == 1333 @pytest.mark.slow @pytest.mark.usefixtures("launch_mock_triton") -@pytest.mark.parametrize("data_col_name", ["data", "definitely_not_data"]) -@pytest.mark.parametrize("num_threads", [1, 4]) -def test_minibert_truncated(config: Config, tmp_path: str, data_col_name: str, num_threads: int): - - results = _run_minibert(config=config, - tmp_path=tmp_path, - model_name='sid-minibert-onnx', - truncated=True, - data_col_name=data_col_name, - num_threads=num_threads) - - # When threading is enabled, the results returned from the mocked Triton server won't match the expected results - if num_threads == 1: - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 1204 - else: - assert results.diff_rows == 1333 +def test_minibert_truncated(config, tmp_path): + + results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True) + + # Not sure why these are different + if (CppConfig.get_should_use_cpp()): + assert results.diff_rows == 1204 + else: + assert results.diff_rows == 1333 + + +@pytest.mark.slow +@pytest.mark.usefixtures("launch_mock_triton") +def test_minibert_data_col_name(config, tmp_path): + + results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True, "definitely_not_data") + + # Not sure why these are different + if (CppConfig.get_should_use_cpp()): + assert results.diff_rows == 1204 + else: + assert results.diff_rows == 1333 From 8444f7736d1abfd134ec9848059895ce6b248690 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 13:34:11 -0700 Subject: [PATCH 07/40] Remove the launch options entirely since we currently don't have a reliable way to perform a multi-threaded triton unittest --- morpheus/stages/inference/inference_stage.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 042f19701a..058ff6c672 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -299,10 +299,6 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): else: node = builder.make_node(self.unique_name, ops.build(py_inference_fn)) - # Set the concurrency level to be up with the thread count, intentionally only setting this for python - # implementations to avoid #1639 - node.launch_options.pe_count = self._thread_count - builder.make_edge(input_node, node) return node From ee0343857b46e5167336fd936007de7738a912b2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 13:40:13 -0700 Subject: [PATCH 08/40] Revert "Remove the launch options entirely since we currently don't have a reliable way to perform a multi-threaded triton unittest" This reverts commit 8444f7736d1abfd134ec9848059895ce6b248690. --- morpheus/stages/inference/inference_stage.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 058ff6c672..042f19701a 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -299,6 +299,10 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): else: node = builder.make_node(self.unique_name, ops.build(py_inference_fn)) + # Set the concurrency level to be up with the thread count, intentionally only setting this for python + # implementations to avoid #1639 + node.launch_options.pe_count = self._thread_count + builder.make_edge(input_node, node) return node From a0092c72f6da56dfba4f53b7d5c2215dae20fb61 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 13:40:38 -0700 Subject: [PATCH 09/40] Revert "Only set the pe_count for python impls" This reverts commit 8f6209ac3031738494d8112671d8cd481db45e89. --- morpheus/stages/inference/inference_stage.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 042f19701a..e4111926e9 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -299,10 +299,8 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): else: node = builder.make_node(self.unique_name, ops.build(py_inference_fn)) - # Set the concurrency level to be up with the thread count, intentionally only setting this for python - # implementations to avoid #1639 - node.launch_options.pe_count = self._thread_count - + # Set the concurrency level to be up with the thread count + node.launch_options.pe_count = self._thread_count builder.make_edge(input_node, node) return node From d0d2e70c5b67a29bc3b6b6a5b6fb63b7fe808587 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 14:09:50 -0700 Subject: [PATCH 10/40] Disable the hammah validation script until #1641 is resolved --- scripts/validation/val-run-all.sh | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scripts/validation/val-run-all.sh b/scripts/validation/val-run-all.sh index 905ee7f7e5..c85711cdbf 100755 --- a/scripts/validation/val-run-all.sh +++ b/scripts/validation/val-run-all.sh @@ -31,7 +31,10 @@ ensure_triton_running export USE_CPP=0 ${SCRIPT_DIR}/abp/val-abp-all.sh -${SCRIPT_DIR}/hammah/val-hammah-all.sh + +# Disabled per #1641 +# ${SCRIPT_DIR}/hammah/val-hammah-all.sh + ${SCRIPT_DIR}/phishing/val-phishing-all.sh ${SCRIPT_DIR}/sid/val-sid-all.sh @@ -39,6 +42,9 @@ ${SCRIPT_DIR}/sid/val-sid-all.sh export USE_CPP=1 ${SCRIPT_DIR}/abp/val-abp-all.sh -${SCRIPT_DIR}/hammah/val-hammah-all.sh + +# Disabled per #1641 +# ${SCRIPT_DIR}/hammah/val-hammah-all.sh + ${SCRIPT_DIR}/phishing/val-phishing-all.sh ${SCRIPT_DIR}/sid/val-sid-all.sh From e35748e41510a902e2e288c8efe892b2f6427f69 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 14:10:16 -0700 Subject: [PATCH 11/40] Don't hard-code the thread-count to 1 --- scripts/validation/val-run-pipeline.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/validation/val-run-pipeline.sh b/scripts/validation/val-run-pipeline.sh index ee8b00075c..65641a1370 100755 --- a/scripts/validation/val-run-pipeline.sh +++ b/scripts/validation/val-run-pipeline.sh @@ -37,7 +37,7 @@ function run_pipeline_sid_minibert(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ pipeline-nlp --model_seq_length=256 \ from-file --filename=${INPUT_FILE} \ deserialize \ @@ -58,7 +58,7 @@ function run_pipeline_sid_bert(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ pipeline-nlp --model_seq_length=256 \ from-file --filename=${INPUT_FILE} \ deserialize \ @@ -79,7 +79,7 @@ function run_pipeline_abp_nvsmi(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ pipeline-fil --columns_file=${MORPHEUS_ROOT}/morpheus/data/columns_fil.txt \ from-file --filename=${INPUT_FILE} \ deserialize \ @@ -100,7 +100,7 @@ function run_pipeline_phishing_email(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=32 --use_cpp=${USE_CPP} \ pipeline-nlp --model_seq_length=128 --labels_file=${MORPHEUS_ROOT}/morpheus/data/labels_phishing.txt \ from-file --filename=${INPUT_FILE} \ deserialize \ @@ -121,7 +121,7 @@ function run_pipeline_hammah_user123(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ pipeline-ae --columns_file="${MORPHEUS_ROOT}/morpheus/data/columns_ae_cloudtrail.txt" --userid_filter="user123" --userid_column_name="userIdentitysessionContextsessionIssueruserName" --timestamp_column_name="event_dt" \ from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/dfp-cloudtrail-*-input.csv" \ train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/dfp-cloudtrail-*.csv" --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage --seed 42 \ @@ -143,7 +143,7 @@ function run_pipeline_hammah_role-g(){ VAL_FILE=$4 VAL_OUTPUT=$5 - morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ + morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ pipeline-ae --columns_file="${MORPHEUS_ROOT}/morpheus/data/columns_ae_cloudtrail.txt" --userid_filter="role-g" --userid_column_name="userIdentitysessionContextsessionIssueruserName" --timestamp_column_name="event_dt" \ from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/dfp-cloudtrail-*-input.csv" \ train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/dfp-cloudtrail-*.csv" --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage --seed 42 \ From ab3b1445a25274c3b98a970feda62a7dc3458849 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 14:10:47 -0700 Subject: [PATCH 12/40] Ensure the C++ impl of the triton stage only uses a single progress engine --- morpheus/stages/inference/triton_inference_stage.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index e5901363f9..e6c5c0fbb7 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -781,3 +781,13 @@ def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: self._needs_logits, self._input_mapping, self._output_mapping) + + def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: + node = super()._build_single(builder, input_node) + + # ensure that the C++ impl only uses a single progress engine + if (self._build_cpp_node()): + node.launch_options.pe_count = 1 + node.launch_options.engines_per_pe = 1 + + return node From c401291648343e8f274788c601b90c1bfd574724 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 18 Apr 2024 16:39:12 -0700 Subject: [PATCH 13/40] WIP : bypassed the message incompatibility error, but now failing on a validation error on the rss source schema --- examples/llm/vdb_upload/helper.py | 3 ++- examples/llm/vdb_upload/pipeline.py | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/llm/vdb_upload/helper.py b/examples/llm/vdb_upload/helper.py index 20f0484a97..bf71985635 100644 --- a/examples/llm/vdb_upload/helper.py +++ b/examples/llm/vdb_upload/helper.py @@ -17,6 +17,7 @@ from morpheus.config import Config from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.linear_modules_source import LinearModuleSourceStage @@ -73,7 +74,7 @@ def setup_rss_source(pipe: Pipeline, config: Config, source_name: str, rss_confi module_config={"rss_config": rss_config}, ) rss_pipe = pipe.add_stage( - LinearModuleSourceStage(config, module_definition, output_type=ControlMessage, output_port_name="output")) + LinearModuleSourceStage(config, module_definition, output_type=MessageMeta, output_port_name="output")) return rss_pipe diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index 494446d16c..652f476f55 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -24,6 +24,7 @@ from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage logger = logging.getLogger(__name__) @@ -64,6 +65,8 @@ def pipeline(pipeline_config: Config, vdb_sources = process_vdb_sources(pipe, pipeline_config, source_config) + deserialize_stage = pipe.add_stage(DeserializeStage(pipeline_config)) + trigger = None if (isolate_embeddings): trigger = pipe.add_stage(TriggerStage(pipeline_config)) @@ -85,13 +88,13 @@ def pipeline(pipeline_config: Config, # Connect the pipeline for source_output in vdb_sources: - if (isolate_embeddings): - pipe.add_edge(source_output, trigger) - else: - pipe.add_edge(source_output, nlp_stage) + pipe.add_edge(source_output, deserialize_stage) if (isolate_embeddings): + pipe.add_edge(deserialize_stage, trigger) pipe.add_edge(trigger, nlp_stage) + else: + pipe.add_edge(deserialize_stage, nlp_stage) pipe.add_edge(nlp_stage, monitor_1) pipe.add_edge(monitor_1, embedding_stage) From a52d6e9d66d42c347777cd09243fd79e837b8568 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 08:48:10 -0700 Subject: [PATCH 14/40] Revert "WIP : bypassed the message incompatibility error, but now failing on a validation error on the rss source schema" This reverts commit c401291648343e8f274788c601b90c1bfd574724. --- examples/llm/vdb_upload/helper.py | 3 +-- examples/llm/vdb_upload/pipeline.py | 11 ++++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/examples/llm/vdb_upload/helper.py b/examples/llm/vdb_upload/helper.py index bf71985635..20f0484a97 100644 --- a/examples/llm/vdb_upload/helper.py +++ b/examples/llm/vdb_upload/helper.py @@ -17,7 +17,6 @@ from morpheus.config import Config from morpheus.messages import ControlMessage -from morpheus.messages import MessageMeta from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.linear_modules_source import LinearModuleSourceStage @@ -74,7 +73,7 @@ def setup_rss_source(pipe: Pipeline, config: Config, source_name: str, rss_confi module_config={"rss_config": rss_config}, ) rss_pipe = pipe.add_stage( - LinearModuleSourceStage(config, module_definition, output_type=MessageMeta, output_port_name="output")) + LinearModuleSourceStage(config, module_definition, output_type=ControlMessage, output_port_name="output")) return rss_pipe diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index 652f476f55..494446d16c 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -24,7 +24,6 @@ from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage -from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage logger = logging.getLogger(__name__) @@ -65,8 +64,6 @@ def pipeline(pipeline_config: Config, vdb_sources = process_vdb_sources(pipe, pipeline_config, source_config) - deserialize_stage = pipe.add_stage(DeserializeStage(pipeline_config)) - trigger = None if (isolate_embeddings): trigger = pipe.add_stage(TriggerStage(pipeline_config)) @@ -88,13 +85,13 @@ def pipeline(pipeline_config: Config, # Connect the pipeline for source_output in vdb_sources: - pipe.add_edge(source_output, deserialize_stage) + if (isolate_embeddings): + pipe.add_edge(source_output, trigger) + else: + pipe.add_edge(source_output, nlp_stage) if (isolate_embeddings): - pipe.add_edge(deserialize_stage, trigger) pipe.add_edge(trigger, nlp_stage) - else: - pipe.add_edge(deserialize_stage, nlp_stage) pipe.add_edge(nlp_stage, monitor_1) pipe.add_edge(monitor_1, embedding_stage) From 1df96425dfd5b1832b3410b9db9c510ae5fab10b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 08:55:18 -0700 Subject: [PATCH 15/40] Mark ControlMessage as one of the accepted input messages --- morpheus/stages/inference/inference_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index e4111926e9..bf2c3c03e3 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -192,7 +192,7 @@ def accepted_types(self) -> typing.Tuple: typing.Tuple Tuple of input types. """ - return (MultiInferenceMessage, ) + return (MultiInferenceMessage, ControlMessage) def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MultiResponseMessage) From f1e703d3ddfc5cb27db9947ed1f6f3dc41d163c7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 09:37:37 -0700 Subject: [PATCH 16/40] Set a default non-None value for stop_after_sec --- examples/llm/vdb_upload/vdb_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 2b399fcd21..d5d3dbcc64 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -135,7 +135,7 @@ def _build_default_rss_source(enable_cache, "output_batch_size": 2048, "cache_dir": "./.cache/http", "cooldown_interval_sec": interval_secs, - "stop_after_sec": stop_after, + "stop_after_sec": stop_after or 0, "enable_cache": enable_cache, "enable_monitor": enable_monitors, "feed_input": feed_inputs if feed_inputs else build_rss_urls(), From 31679dd03fadf4d6588c32cda0df640cb8a07e7a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 10:16:36 -0700 Subject: [PATCH 17/40] Fix import for CppTensorMemory --- morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index feace923dc..5f0377f38b 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -25,6 +25,7 @@ import cudf import morpheus._lib.stages as _stages +from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_package_relative_file @@ -35,7 +36,6 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series From 9f09023ff70ccc01db117da7225f4f7dc92148d6 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 17 Apr 2024 15:51:20 -0400 Subject: [PATCH 18/40] Use conda env create --yes instead of --force (#1636) conda dropped support for the --force flag to conda env create. This changes that flag name to --yes. See https://github.com/conda/conda/blob/main/CHANGELOG.md#2430-2024-03-12 and https://github.com/rapidsai/miniforge-cuda/pull/63 for more info. ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - Eli Fajardo (https://github.com/efajardo-nv) Approvers: - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1636 --- ci/check_style.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/check_style.sh b/ci/check_style.sh index 9205625726..beb561bb4f 100755 --- a/ci/check_style.sh +++ b/ci/check_style.sh @@ -16,7 +16,7 @@ rapids-dependency-file-generator \ --file_key checks \ --matrix "cuda=${RAPIDS_CUDA_VERSION%.*};arch=$(arch);py=${RAPIDS_PY_VERSION}" | tee env.yaml -rapids-mamba-retry env create --force -f env.yaml -n checks +rapids-mamba-retry env create --yes -f env.yaml -n checks conda activate checks # Run pre-commit checks From 9a96b3cbab3618387bdf3f0777eb5fa3cee22667 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 11:02:06 -0700 Subject: [PATCH 19/40] Only accept ControlMessage when in Python mode --- morpheus/stages/inference/inference_stage.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index bf2c3c03e3..579ddccd53 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -192,6 +192,9 @@ def accepted_types(self) -> typing.Tuple: typing.Tuple Tuple of input types. """ + if (self._build_cpp_node()): + return (MultiInferenceMessage, ) + return (MultiInferenceMessage, ControlMessage) def compute_schema(self, schema: StageSchema): From ae7c5c871aca1ae8cefc18c0171006929754c2a6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 12:02:58 -0700 Subject: [PATCH 20/40] Default to rss source --- examples/llm/vdb_upload/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 04627f8359..5817445661 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -104,7 +104,7 @@ def run(): @click.option("--source_type", multiple=True, type=click.Choice(['rss', 'filesystem'], case_sensitive=False), - default=[], + default=['rss'], show_default=True, help="The type of source to use. Can specify multiple times for different source types.") @click.option( From ede695d15d278bdb456635498426307be81e68c5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 12:24:15 -0700 Subject: [PATCH 21/40] Disable pylint warning --- morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 5f0377f38b..1fa3030355 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -25,6 +25,8 @@ import cudf import morpheus._lib.stages as _stages +# We need the C++ impl of TensorMemory until #1646 is resolved +# pylint: disable-next=morpheus-incorrect-lib-from-import from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath From 7d758cf037c580e5c4b78abf9a4eb0afac8ecf92 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 12:53:23 -0700 Subject: [PATCH 22/40] Fix import --- .../stages/preprocess/preprocess_nlp_stage.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 1fa3030355..de610ab52c 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -24,10 +24,8 @@ import cudf +import morpheus._lib.messages as _messages import morpheus._lib.stages as _stages -# We need the C++ impl of TensorMemory until #1646 is resolved -# pylint: disable-next=morpheus-incorrect-lib-from-import -from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_package_relative_file @@ -206,13 +204,14 @@ def process_control_message(message: ControlMessage, del text_series + # We need the C++ impl of TensorMemory until #1646 is resolved message.tensors( - CppTensorMemory(count=tokenized.input_ids.shape[0], - tensors={ - "input_ids": tokenized.input_ids, - "input_mask": tokenized.input_mask, - "seq_ids": tokenized.segment_ids - })) + _messages.TensorMemory(count=tokenized.input_ids.shape[0], + tensors={ + "input_ids": tokenized.input_ids, + "input_mask": tokenized.input_mask, + "seq_ids": tokenized.segment_ids + })) message.set_metadata("inference_memory_params", {"inference_type": "nlp"}) return message From dbfa52530294b2417581c06284afc5377020b985 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 12:56:23 -0700 Subject: [PATCH 23/40] Fix tests to use the use_cpp fixture --- tests/stages/test_preprocess_fil_stage.py | 2 +- tests/stages/test_preprocess_nlp_stage.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/stages/test_preprocess_fil_stage.py b/tests/stages/test_preprocess_fil_stage.py index eb6dc8b620..15297292af 100644 --- a/tests/stages/test_preprocess_fil_stage.py +++ b/tests/stages/test_preprocess_fil_stage.py @@ -27,7 +27,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): config.feature_length = 1 config.fil = ConfigFIL() config.fil.feature_columns = ["data"] diff --git a/tests/stages/test_preprocess_nlp_stage.py b/tests/stages/test_preprocess_nlp_stage.py index 9c2b5d4e39..5b20012b05 100644 --- a/tests/stages/test_preprocess_nlp_stage.py +++ b/tests/stages/test_preprocess_nlp_stage.py @@ -29,7 +29,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): config.class_labels = [ "address", "bank_acct", From 3b4e26ff3527ee7863f8d50013b462738e04a6f2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 12:57:44 -0700 Subject: [PATCH 24/40] Ensure the C++ impl for tensor memory is used --- morpheus/stages/preprocess/preprocess_fil_stage.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/preprocess/preprocess_fil_stage.py b/morpheus/stages/preprocess/preprocess_fil_stage.py index 45b1640d72..cbfc6a581f 100644 --- a/morpheus/stages/preprocess/preprocess_fil_stage.py +++ b/morpheus/stages/preprocess/preprocess_fil_stage.py @@ -23,6 +23,7 @@ import cudf +import morpheus._lib.messages as _messages import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.config import Config @@ -32,7 +33,6 @@ from morpheus.messages import MultiInferenceFILMessage from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage logger = logging.getLogger(__name__) @@ -123,7 +123,8 @@ def process_control_message(x: ControlMessage, fea_len: int, fea_cols: typing.Li seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32) seg_ids[:, 2] = fea_len - 1 - x.tensors(CppTensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids})) + # We need the C++ impl of TensorMemory until #1646 is resolved + x.tensors(_messages.TensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids})) return x @staticmethod From 2be44262ff065b154975f07320bdeee2b45c8426 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 13:00:00 -0700 Subject: [PATCH 25/40] Ensure the use_cpp fixture is used --- tests/test_add_scores_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index ad67709959..cbac7cb368 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): config.class_labels = ['frogs', 'lizards', 'toads'] config.feature_length = 12 yield config From cd045a99e8139b49f9302f7ba5ff76f790fdaeed Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 13:06:04 -0700 Subject: [PATCH 26/40] Ensure the use_cpp fixture is used in all overloads of the config fixture --- tests/examples/gnn_fraud_detection_pipeline/conftest.py | 2 +- tests/examples/log_parsing/conftest.py | 2 +- tests/examples/ransomware_detection/conftest.py | 2 +- tests/test_add_classifications_stage.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/examples/gnn_fraud_detection_pipeline/conftest.py b/tests/examples/gnn_fraud_detection_pipeline/conftest.py index a625d51862..1407c9bc31 100644 --- a/tests/examples/gnn_fraud_detection_pipeline/conftest.py +++ b/tests/examples/gnn_fraud_detection_pipeline/conftest.py @@ -44,7 +44,7 @@ def cuml_fixture(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): """ The GNN fraud detection pipeline utilizes the "other" pipeline mode. """ diff --git a/tests/examples/log_parsing/conftest.py b/tests/examples/log_parsing/conftest.py index d31891873a..77291ad58f 100644 --- a/tests/examples/log_parsing/conftest.py +++ b/tests/examples/log_parsing/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): """ The log_parsing pipelie requires NLP mode. Set this here so all the tests don't need to set it themselves. """ diff --git a/tests/examples/ransomware_detection/conftest.py b/tests/examples/ransomware_detection/conftest.py index e1c5e2541d..8288d57d3b 100644 --- a/tests/examples/ransomware_detection/conftest.py +++ b/tests/examples/ransomware_detection/conftest.py @@ -39,7 +39,7 @@ def dask_distributed(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): """ The ransomware detection pipeline utilizes the FIL pipeline mode. """ diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index 279963ba9a..de67b99951 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name="config") -def config_fixture(config: Config): +def config_fixture(config: Config, use_cpp: bool): config.class_labels = ['frogs', 'lizards', 'toads'] yield config From d5f607f65ef8205f3b7c96e9c1788ee3ce6d056a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 13:09:17 -0700 Subject: [PATCH 27/40] Fix pyling warnings --- tests/examples/gnn_fraud_detection_pipeline/conftest.py | 2 +- tests/examples/log_parsing/conftest.py | 2 +- tests/examples/ransomware_detection/conftest.py | 2 +- tests/stages/test_preprocess_fil_stage.py | 2 +- tests/stages/test_preprocess_nlp_stage.py | 2 +- tests/test_add_classifications_stage.py | 2 +- tests/test_add_scores_stage.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/examples/gnn_fraud_detection_pipeline/conftest.py b/tests/examples/gnn_fraud_detection_pipeline/conftest.py index 1407c9bc31..30176f71e4 100644 --- a/tests/examples/gnn_fraud_detection_pipeline/conftest.py +++ b/tests/examples/gnn_fraud_detection_pipeline/conftest.py @@ -44,7 +44,7 @@ def cuml_fixture(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config, use_cpp: bool): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The GNN fraud detection pipeline utilizes the "other" pipeline mode. """ diff --git a/tests/examples/log_parsing/conftest.py b/tests/examples/log_parsing/conftest.py index 77291ad58f..f927c3fcc1 100644 --- a/tests/examples/log_parsing/conftest.py +++ b/tests/examples/log_parsing/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture(name="config") -def config_fixture(config, use_cpp: bool): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The log_parsing pipelie requires NLP mode. Set this here so all the tests don't need to set it themselves. """ diff --git a/tests/examples/ransomware_detection/conftest.py b/tests/examples/ransomware_detection/conftest.py index 8288d57d3b..a92786555a 100644 --- a/tests/examples/ransomware_detection/conftest.py +++ b/tests/examples/ransomware_detection/conftest.py @@ -39,7 +39,7 @@ def dask_distributed(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config, use_cpp: bool): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The ransomware detection pipeline utilizes the FIL pipeline mode. """ diff --git a/tests/stages/test_preprocess_fil_stage.py b/tests/stages/test_preprocess_fil_stage.py index 15297292af..638fcaa994 100644 --- a/tests/stages/test_preprocess_fil_stage.py +++ b/tests/stages/test_preprocess_fil_stage.py @@ -27,7 +27,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config, use_cpp: bool): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.feature_length = 1 config.fil = ConfigFIL() config.fil.feature_columns = ["data"] diff --git a/tests/stages/test_preprocess_nlp_stage.py b/tests/stages/test_preprocess_nlp_stage.py index 5b20012b05..22fc99e04a 100644 --- a/tests/stages/test_preprocess_nlp_stage.py +++ b/tests/stages/test_preprocess_nlp_stage.py @@ -29,7 +29,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config, use_cpp: bool): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = [ "address", "bank_acct", diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index de67b99951..80091f3dc5 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name="config") -def config_fixture(config: Config, use_cpp: bool): +def config_fixture(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = ['frogs', 'lizards', 'toads'] yield config diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index cbac7cb368..e454a0e35f 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config, use_cpp: bool): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = ['frogs', 'lizards', 'toads'] config.feature_length = 12 yield config From f16b7c9cf8d3219a314561ba25bcfd9be42c5183 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 13:50:42 -0700 Subject: [PATCH 28/40] Fix typo, units for stop_after should be records not seconds --- examples/llm/vdb_upload/module/rss_source_pipe.py | 4 ++-- examples/llm/vdb_upload/vdb_config.yaml | 4 ++-- examples/llm/vdb_upload/vdb_utils.py | 2 +- morpheus/modules/input/rss_source.py | 2 +- morpheus/modules/schemas/rss_source_schema.py | 2 +- morpheus/stages/input/rss_source_stage.py | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index c424e03dbc..ff61940b8c 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -48,7 +48,7 @@ class RSSSourcePipeSchema(BaseModel): output_batch_size: int = 2048 request_timeout_sec: float = 2.0 run_indefinitely: bool = True - stop_after_sec: int = 0 + stop_after_rec: int = 0 vdb_resource_name: str web_scraper_config: Optional[Dict[Any, Any]] = None @@ -130,7 +130,7 @@ def _rss_source_pipe(builder: mrc.Builder): "cooldown_interval_sec": validated_config.cooldown_interval_sec, "request_timeout_sec": validated_config.request_timeout_sec, "interval_sec": validated_config.interval_sec, - "stop_after_sec": validated_config.stop_after_sec, + "stop_after_rec": validated_config.stop_after_rec, } rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config}) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 0c1af37d22..ac93a47615 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -75,7 +75,7 @@ vdb_pipeline: output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true - stop_after_sec: 0 + stop_after_rec: 0 web_scraper_config: chunk_overlap: 51 chunk_size: 512 @@ -300,4 +300,4 @@ vdb_pipeline: dtype: FLOAT_VECTOR description: Embedding vectors representing the data entry dim: 384 # Size of the embeddings to store in the vector database - description: Collection schema for diverse data sources \ No newline at end of file + description: Collection schema for diverse data sources diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index d5d3dbcc64..7f9f2a3b1b 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -135,7 +135,7 @@ def _build_default_rss_source(enable_cache, "output_batch_size": 2048, "cache_dir": "./.cache/http", "cooldown_interval_sec": interval_secs, - "stop_after_sec": stop_after or 0, + "stop_after_rec": stop_after or 0, "enable_cache": enable_cache, "enable_monitor": enable_monitors, "feed_input": feed_inputs if feed_inputs else build_rss_urls(), diff --git a/morpheus/modules/input/rss_source.py b/morpheus/modules/input/rss_source.py index 6133e3d673..9f5dd6c316 100644 --- a/morpheus/modules/input/rss_source.py +++ b/morpheus/modules/input/rss_source.py @@ -101,7 +101,7 @@ def fetch_feeds() -> MessageMeta: records_emitted += df_size - if (0 < validated_config.stop_after_sec <= records_emitted): + if (0 < validated_config.stop_after_rec <= records_emitted): stop_requested = True logger.info("Stop limit reached... preparing to halt the source.") break diff --git a/morpheus/modules/schemas/rss_source_schema.py b/morpheus/modules/schemas/rss_source_schema.py index b0468b1ace..53c0928391 100644 --- a/morpheus/modules/schemas/rss_source_schema.py +++ b/morpheus/modules/schemas/rss_source_schema.py @@ -30,7 +30,7 @@ class RSSSourceSchema(BaseModel): cooldown_interval_sec: int = 600 request_timeout_sec: float = 2.0 interval_sec: int = 600 - stop_after_sec: int = 0 + stop_after_rec: int = 0 class Config: extra = "forbid" diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index 31e408c290..d56a443542 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -81,7 +81,7 @@ def __init__(self, "rss_source": { "feed_input": feed_input, "interval_sec": interval_secs, - "stop_after_sec": stop_after, + "stop_after_rec": stop_after, "run_indefinitely": run_indefinitely, "batch_size": batch_size, "enable_cache": enable_cache, From 8bde832eb6ab17a597b67eaa7c6e85824c4ccc4e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 15:35:00 -0700 Subject: [PATCH 29/40] First pass at optionally truncating long string values --- morpheus/modules/output/write_to_vector_db.py | 8 +++++-- .../service/vdb/milvus_vector_db_service.py | 23 ++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/morpheus/modules/output/write_to_vector_db.py b/morpheus/modules/output/write_to_vector_db.py index c141aef7c6..e7f77d75a3 100644 --- a/morpheus/modules/output/write_to_vector_db.py +++ b/morpheus/modules/output/write_to_vector_db.py @@ -148,7 +148,8 @@ def on_completed(): try: if accum_stats.data: merged_df = cudf.concat(accum_stats.data) - service.insert_dataframe(name=key, df=merged_df) + + service.insert_dataframe(name=key, df=merged_df, truncate_long_strings=True) final_df_references.append(accum_stats.data) except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) @@ -213,7 +214,10 @@ def on_data(msg: typing.Union[ControlMessage, MultiResponseMessage, MultiMessage merged_df = cudf.concat(accum_stats.data) # pylint: disable=not-a-mapping - service.insert_dataframe(name=key, df=merged_df, **resource_kwargs) + service.insert_dataframe(name=key, + df=merged_df, + truncate_long_strings=True, + **resource_kwargs) # Reset accumulator stats accum_stats.data.clear() accum_stats.last_insert_time = current_time diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 37cd82d1ba..7d4879a363 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -32,6 +32,9 @@ IMPORT_EXCEPTION = None IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." +# https://milvus.io/docs/limitations.md#Length-of-a-string +MAX_STRING_LENGTH = 65535 + try: import pymilvus from pymilvus.orm.mutation import MutationResult @@ -275,7 +278,10 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) return self._insert_result_to_dict(result=result) - def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: + def insert_dataframe(self, + df: typing.Union[cudf.DataFrame, pd.DataFrame], + truncate_long_strings: bool = False, + **kwargs: dict[str, typing.Any]) -> dict: """ Insert a dataframe entires into the vector database. @@ -283,6 +289,8 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa ---------- df : typing.Union[cudf.DataFrame, pd.DataFrame] Dataframe to be inserted into the collection. + truncate_long_strings : bool, optional + When true, truncate strings values that are longer than the max length supported by Milvus (65535). **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. @@ -295,6 +303,19 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa if isinstance(df, cudf.DataFrame): df = df.to_pandas() + if truncate_long_strings: + for col in df: + str_series = df[col] + if str_series.dtype == "object": + max_len = str_series.str.len().max() + if max_len > MAX_STRING_LENGTH: + logger.warning(("Column '%s' has a string length of %d, larger than the max of %d" + "supported by Milvus, truncating"), + col, + max_len, + MAX_STRING_LENGTH) + df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) + # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): From c18e7d65a2ab37a9e73c71bb40e83bb847acde42 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 16:06:08 -0700 Subject: [PATCH 30/40] Move truncation to happen after other operations which may expand strings --- .../service/vdb/milvus_vector_db_service.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 7d4879a363..b4ab91744f 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -303,19 +303,6 @@ def insert_dataframe(self, if isinstance(df, cudf.DataFrame): df = df.to_pandas() - if truncate_long_strings: - for col in df: - str_series = df[col] - if str_series.dtype == "object": - max_len = str_series.str.len().max() - if max_len > MAX_STRING_LENGTH: - logger.warning(("Column '%s' has a string length of %d, larger than the max of %d" - "supported by Milvus, truncating"), - col, - max_len, - MAX_STRING_LENGTH) - df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) - # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): @@ -335,6 +322,19 @@ def insert_dataframe(self, # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] + if truncate_long_strings: + for col in [column_names]: + if df[col].dtype == "object": + max_len = df[col].str.len().max() + if max_len > MAX_STRING_LENGTH: + logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " + "supported by Milvus, truncating"), + col, + max_len, + MAX_STRING_LENGTH) + df[col] = df[col].str.slice(0, MAX_STRING_LENGTH) + logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) + # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=df[column_names], **kwargs) self._collection.flush() From 740d0637eadc277072245174985dce058071686d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 16:07:30 -0700 Subject: [PATCH 31/40] Fix type-o --- morpheus/service/vdb/milvus_vector_db_service.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index b4ab91744f..e8e8a2ed7e 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -323,16 +323,17 @@ def insert_dataframe(self, column_names = [field.name for field in self._fields if not field.auto_id] if truncate_long_strings: - for col in [column_names]: - if df[col].dtype == "object": - max_len = df[col].str.len().max() + for col in column_names: + str_series = df[col] + if str_series.dtype == "object": + max_len = str_series.str.len().max() if max_len > MAX_STRING_LENGTH: logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " "supported by Milvus, truncating"), col, max_len, MAX_STRING_LENGTH) - df[col] = df[col].str.slice(0, MAX_STRING_LENGTH) + df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) # Note: dataframe columns has to be in the order of collection schema fields.s From 16ec801a27250a0e36983330b8940ee971d08934 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 19 Apr 2024 16:35:19 -0700 Subject: [PATCH 32/40] Fix type-o --- morpheus/service/vdb/milvus_vector_db_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index e8e8a2ed7e..4aa1ccc2af 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -334,7 +334,9 @@ def insert_dataframe(self, max_len, MAX_STRING_LENGTH) df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) - logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) + logger.warning("Column '%s' has been truncated to a max length of %d", + col, + df[col].str.len().max()) # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=df[column_names], **kwargs) From a602a18cb4a55d184d5ebb75d7ca76af69d46aec Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 09:10:40 -0700 Subject: [PATCH 33/40] Document multibyte issues even though we don't have a fix for it yet --- morpheus/service/vdb/milvus_vector_db_service.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 4aa1ccc2af..b49fed409e 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -32,8 +32,10 @@ IMPORT_EXCEPTION = None IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." +# Milvus has a max string length in bytes of 65535, that is strings a first encoded into UTF-8, while multi-byte +# characters like "ñ" will have a string length of 1, the actual byte length will be 2 # https://milvus.io/docs/limitations.md#Length-of-a-string -MAX_STRING_LENGTH = 65535 +MAX_STRING_LENGTH_BYTES = 65535 try: import pymilvus @@ -327,13 +329,13 @@ def insert_dataframe(self, str_series = df[col] if str_series.dtype == "object": max_len = str_series.str.len().max() - if max_len > MAX_STRING_LENGTH: + if max_len > MAX_STRING_LENGTH_BYTES: logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " "supported by Milvus, truncating"), col, max_len, - MAX_STRING_LENGTH) - df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) + MAX_STRING_LENGTH_BYTES) + df[col] = str_series.str.slice(0, MAX_STRING_LENGTH_BYTES) logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].str.len().max()) From 142c90902e339f05541e71f874e0076b1927be8b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 10:36:45 -0700 Subject: [PATCH 34/40] Change the default resource name back to 'RSS', allowing the output of running this example to be used as the input for the RAG pipeline --- examples/llm/vdb_upload/run.py | 2 +- examples/llm/vdb_upload/vdb_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 5817445661..974e5ec213 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -128,7 +128,7 @@ def run(): @click.option( "--vector_db_resource_name", type=str, - default="VDBUploadExample", + default="RSS", help="The identifier of the resource on which operations are to be performed in the vector database.", ) @click.option( diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 7f9f2a3b1b..d3aed615d7 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -448,7 +448,7 @@ def build_final_config(vdb_conf_path, interval_secs=60, run_indefinitely=True, stop_after=None, - vector_db_resource_name="VDBUploadExample", + vector_db_resource_name="RSS", content_chunking_size=128, rss_request_timeout_sec=30, feed_inputs=build_rss_urls())) From 2d11e18cc128fa5374c3822059f6bc06bd350833 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 10:38:03 -0700 Subject: [PATCH 35/40] Remove redundant VAR=$VAR statements, add flags to use the rag pipeline with OpenAI --- examples/llm/rag/README.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/llm/rag/README.md b/examples/llm/rag/README.md index 3868fab377..7c1579040e 100644 --- a/examples/llm/rag/README.md +++ b/examples/llm/rag/README.md @@ -214,14 +214,14 @@ pipeline option of `rag`: ```bash export NGC_API_KEY=[YOUR_KEY_HERE] -NGC_API_KEY=${NGC_API_KEY} python examples/llm/main.py rag pipeline +python examples/llm/main.py rag pipeline ``` **Using OpenAI LLM models** ```bash export OPENAI_API_KEY=[YOUR_KEY_HERE] -OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline +python examples/llm/main.py rag pipeline --llm_service=OpenAI --model_name=gpt-3.5-turbo ``` ### Run example (Persistent Pipeline): @@ -232,14 +232,14 @@ OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline ```bash export NGC_API_KEY=[YOUR_KEY_HERE] -python examples/llm/main.py rag persistent +python examples/llm/main.py rag persistent ``` **Using OpenAI LLM models** ```bash export OPENAI_API_KEY=[YOUR_KEY_HERE] -python examples/llm/main.py rag persistent +python examples/llm/main.py rag persistent ``` ### Options: @@ -273,4 +273,3 @@ The `rag` command has its own set of options and commands: - `persistant` - `pipeline` - From 4719f4d8fc83516c24d06438b05891ddcccb1485 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 11:02:24 -0700 Subject: [PATCH 36/40] Revert "Document multibyte issues even though we don't have a fix for it yet" This reverts commit a602a18cb4a55d184d5ebb75d7ca76af69d46aec. --- morpheus/service/vdb/milvus_vector_db_service.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index b49fed409e..4aa1ccc2af 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -32,10 +32,8 @@ IMPORT_EXCEPTION = None IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." -# Milvus has a max string length in bytes of 65535, that is strings a first encoded into UTF-8, while multi-byte -# characters like "ñ" will have a string length of 1, the actual byte length will be 2 # https://milvus.io/docs/limitations.md#Length-of-a-string -MAX_STRING_LENGTH_BYTES = 65535 +MAX_STRING_LENGTH = 65535 try: import pymilvus @@ -329,13 +327,13 @@ def insert_dataframe(self, str_series = df[col] if str_series.dtype == "object": max_len = str_series.str.len().max() - if max_len > MAX_STRING_LENGTH_BYTES: + if max_len > MAX_STRING_LENGTH: logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " "supported by Milvus, truncating"), col, max_len, - MAX_STRING_LENGTH_BYTES) - df[col] = str_series.str.slice(0, MAX_STRING_LENGTH_BYTES) + MAX_STRING_LENGTH) + df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].str.len().max()) From 0d7214ecb50755ce655cc05a84ce252c19841343 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 11:02:37 -0700 Subject: [PATCH 37/40] Revert "Fix type-o" This reverts commit 16ec801a27250a0e36983330b8940ee971d08934. --- morpheus/service/vdb/milvus_vector_db_service.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 4aa1ccc2af..e8e8a2ed7e 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -334,9 +334,7 @@ def insert_dataframe(self, max_len, MAX_STRING_LENGTH) df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) - logger.warning("Column '%s' has been truncated to a max length of %d", - col, - df[col].str.len().max()) + logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=df[column_names], **kwargs) From cd6af8dd276eefc783743c8a6a398768b4878ba0 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 11:02:46 -0700 Subject: [PATCH 38/40] Revert "Fix type-o" This reverts commit 740d0637eadc277072245174985dce058071686d. --- morpheus/service/vdb/milvus_vector_db_service.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index e8e8a2ed7e..b4ab91744f 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -323,17 +323,16 @@ def insert_dataframe(self, column_names = [field.name for field in self._fields if not field.auto_id] if truncate_long_strings: - for col in column_names: - str_series = df[col] - if str_series.dtype == "object": - max_len = str_series.str.len().max() + for col in [column_names]: + if df[col].dtype == "object": + max_len = df[col].str.len().max() if max_len > MAX_STRING_LENGTH: logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " "supported by Milvus, truncating"), col, max_len, MAX_STRING_LENGTH) - df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) + df[col] = df[col].str.slice(0, MAX_STRING_LENGTH) logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) # Note: dataframe columns has to be in the order of collection schema fields.s From d31012391e11676a8e02536b96eaf118e1e9c65e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 11:02:56 -0700 Subject: [PATCH 39/40] Revert "Move truncation to happen after other operations which may expand strings" This reverts commit c18e7d65a2ab37a9e73c71bb40e83bb847acde42. --- .../service/vdb/milvus_vector_db_service.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index b4ab91744f..7d4879a363 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -303,6 +303,19 @@ def insert_dataframe(self, if isinstance(df, cudf.DataFrame): df = df.to_pandas() + if truncate_long_strings: + for col in df: + str_series = df[col] + if str_series.dtype == "object": + max_len = str_series.str.len().max() + if max_len > MAX_STRING_LENGTH: + logger.warning(("Column '%s' has a string length of %d, larger than the max of %d" + "supported by Milvus, truncating"), + col, + max_len, + MAX_STRING_LENGTH) + df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) + # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): @@ -322,19 +335,6 @@ def insert_dataframe(self, # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] - if truncate_long_strings: - for col in [column_names]: - if df[col].dtype == "object": - max_len = df[col].str.len().max() - if max_len > MAX_STRING_LENGTH: - logger.warning(("Column '%s' has a string length of %d, larger than the max of %d " - "supported by Milvus, truncating"), - col, - max_len, - MAX_STRING_LENGTH) - df[col] = df[col].str.slice(0, MAX_STRING_LENGTH) - logger.warning("Column '%s' has been truncated to a max length of %d", col, df[col].len().max()) - # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=df[column_names], **kwargs) self._collection.flush() From c39b845ced4c0eb62d882708065e7f6e51cd4a96 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 22 Apr 2024 11:03:07 -0700 Subject: [PATCH 40/40] Revert "First pass at optionally truncating long string values" This reverts commit 8bde832eb6ab17a597b67eaa7c6e85824c4ccc4e. --- morpheus/modules/output/write_to_vector_db.py | 8 ++----- .../service/vdb/milvus_vector_db_service.py | 23 +------------------ 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/morpheus/modules/output/write_to_vector_db.py b/morpheus/modules/output/write_to_vector_db.py index e7f77d75a3..c141aef7c6 100644 --- a/morpheus/modules/output/write_to_vector_db.py +++ b/morpheus/modules/output/write_to_vector_db.py @@ -148,8 +148,7 @@ def on_completed(): try: if accum_stats.data: merged_df = cudf.concat(accum_stats.data) - - service.insert_dataframe(name=key, df=merged_df, truncate_long_strings=True) + service.insert_dataframe(name=key, df=merged_df) final_df_references.append(accum_stats.data) except Exception as e: logger.error("Unable to upload dataframe entries to vector database: %s", e) @@ -214,10 +213,7 @@ def on_data(msg: typing.Union[ControlMessage, MultiResponseMessage, MultiMessage merged_df = cudf.concat(accum_stats.data) # pylint: disable=not-a-mapping - service.insert_dataframe(name=key, - df=merged_df, - truncate_long_strings=True, - **resource_kwargs) + service.insert_dataframe(name=key, df=merged_df, **resource_kwargs) # Reset accumulator stats accum_stats.data.clear() accum_stats.last_insert_time = current_time diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 7d4879a363..37cd82d1ba 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -32,9 +32,6 @@ IMPORT_EXCEPTION = None IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." -# https://milvus.io/docs/limitations.md#Length-of-a-string -MAX_STRING_LENGTH = 65535 - try: import pymilvus from pymilvus.orm.mutation import MutationResult @@ -278,10 +275,7 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) return self._insert_result_to_dict(result=result) - def insert_dataframe(self, - df: typing.Union[cudf.DataFrame, pd.DataFrame], - truncate_long_strings: bool = False, - **kwargs: dict[str, typing.Any]) -> dict: + def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: """ Insert a dataframe entires into the vector database. @@ -289,8 +283,6 @@ def insert_dataframe(self, ---------- df : typing.Union[cudf.DataFrame, pd.DataFrame] Dataframe to be inserted into the collection. - truncate_long_strings : bool, optional - When true, truncate strings values that are longer than the max length supported by Milvus (65535). **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. @@ -303,19 +295,6 @@ def insert_dataframe(self, if isinstance(df, cudf.DataFrame): df = df.to_pandas() - if truncate_long_strings: - for col in df: - str_series = df[col] - if str_series.dtype == "object": - max_len = str_series.str.len().max() - if max_len > MAX_STRING_LENGTH: - logger.warning(("Column '%s' has a string length of %d, larger than the max of %d" - "supported by Milvus, truncating"), - col, - max_len, - MAX_STRING_LENGTH) - df[col] = str_series.str.slice(0, MAX_STRING_LENGTH) - # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING):