diff --git a/examples/llm/rag/README.md b/examples/llm/rag/README.md index 3868fab377..7c1579040e 100644 --- a/examples/llm/rag/README.md +++ b/examples/llm/rag/README.md @@ -214,14 +214,14 @@ pipeline option of `rag`: ```bash export NGC_API_KEY=[YOUR_KEY_HERE] -NGC_API_KEY=${NGC_API_KEY} python examples/llm/main.py rag pipeline +python examples/llm/main.py rag pipeline ``` **Using OpenAI LLM models** ```bash export OPENAI_API_KEY=[YOUR_KEY_HERE] -OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline +python examples/llm/main.py rag pipeline --llm_service=OpenAI --model_name=gpt-3.5-turbo ``` ### Run example (Persistent Pipeline): @@ -232,14 +232,14 @@ OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline ```bash export NGC_API_KEY=[YOUR_KEY_HERE] -python examples/llm/main.py rag persistent +python examples/llm/main.py rag persistent ``` **Using OpenAI LLM models** ```bash export OPENAI_API_KEY=[YOUR_KEY_HERE] -python examples/llm/main.py rag persistent +python examples/llm/main.py rag persistent ``` ### Options: @@ -273,4 +273,3 @@ The `rag` command has its own set of options and commands: - `persistant` - `pipeline` - diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index c424e03dbc..ff61940b8c 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -48,7 +48,7 @@ class RSSSourcePipeSchema(BaseModel): output_batch_size: int = 2048 request_timeout_sec: float = 2.0 run_indefinitely: bool = True - stop_after_sec: int = 0 + stop_after_rec: int = 0 vdb_resource_name: str web_scraper_config: Optional[Dict[Any, Any]] = None @@ -130,7 +130,7 @@ def _rss_source_pipe(builder: mrc.Builder): "cooldown_interval_sec": validated_config.cooldown_interval_sec, "request_timeout_sec": validated_config.request_timeout_sec, "interval_sec": validated_config.interval_sec, - "stop_after_sec": validated_config.stop_after_sec, + "stop_after_rec": validated_config.stop_after_rec, } rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config}) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 04627f8359..974e5ec213 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -104,7 +104,7 @@ def run(): @click.option("--source_type", multiple=True, type=click.Choice(['rss', 'filesystem'], case_sensitive=False), - default=[], + default=['rss'], show_default=True, help="The type of source to use. Can specify multiple times for different source types.") @click.option( @@ -128,7 +128,7 @@ def run(): @click.option( "--vector_db_resource_name", type=str, - default="VDBUploadExample", + default="RSS", help="The identifier of the resource on which operations are to be performed in the vector database.", ) @click.option( diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 0c1af37d22..ac93a47615 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -75,7 +75,7 @@ vdb_pipeline: output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true - stop_after_sec: 0 + stop_after_rec: 0 web_scraper_config: chunk_overlap: 51 chunk_size: 512 @@ -300,4 +300,4 @@ vdb_pipeline: dtype: FLOAT_VECTOR description: Embedding vectors representing the data entry dim: 384 # Size of the embeddings to store in the vector database - description: Collection schema for diverse data sources \ No newline at end of file + description: Collection schema for diverse data sources diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 2b399fcd21..d3aed615d7 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -135,7 +135,7 @@ def _build_default_rss_source(enable_cache, "output_batch_size": 2048, "cache_dir": "./.cache/http", "cooldown_interval_sec": interval_secs, - "stop_after_sec": stop_after, + "stop_after_rec": stop_after or 0, "enable_cache": enable_cache, "enable_monitor": enable_monitors, "feed_input": feed_inputs if feed_inputs else build_rss_urls(), @@ -448,7 +448,7 @@ def build_final_config(vdb_conf_path, interval_secs=60, run_indefinitely=True, stop_after=None, - vector_db_resource_name="VDBUploadExample", + vector_db_resource_name="RSS", content_chunking_size=128, rss_request_timeout_sec=30, feed_inputs=build_rss_urls())) diff --git a/morpheus/modules/input/rss_source.py b/morpheus/modules/input/rss_source.py index 6133e3d673..9f5dd6c316 100644 --- a/morpheus/modules/input/rss_source.py +++ b/morpheus/modules/input/rss_source.py @@ -101,7 +101,7 @@ def fetch_feeds() -> MessageMeta: records_emitted += df_size - if (0 < validated_config.stop_after_sec <= records_emitted): + if (0 < validated_config.stop_after_rec <= records_emitted): stop_requested = True logger.info("Stop limit reached... preparing to halt the source.") break diff --git a/morpheus/modules/schemas/rss_source_schema.py b/morpheus/modules/schemas/rss_source_schema.py index b0468b1ace..53c0928391 100644 --- a/morpheus/modules/schemas/rss_source_schema.py +++ b/morpheus/modules/schemas/rss_source_schema.py @@ -30,7 +30,7 @@ class RSSSourceSchema(BaseModel): cooldown_interval_sec: int = 600 request_timeout_sec: float = 2.0 interval_sec: int = 600 - stop_after_sec: int = 0 + stop_after_rec: int = 0 class Config: extra = "forbid" diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index e4111926e9..579ddccd53 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -192,7 +192,10 @@ def accepted_types(self) -> typing.Tuple: typing.Tuple Tuple of input types. """ - return (MultiInferenceMessage, ) + if (self._build_cpp_node()): + return (MultiInferenceMessage, ) + + return (MultiInferenceMessage, ControlMessage) def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MultiResponseMessage) diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index 31e408c290..d56a443542 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -81,7 +81,7 @@ def __init__(self, "rss_source": { "feed_input": feed_input, "interval_sec": interval_secs, - "stop_after_sec": stop_after, + "stop_after_rec": stop_after, "run_indefinitely": run_indefinitely, "batch_size": batch_size, "enable_cache": enable_cache, diff --git a/morpheus/stages/preprocess/preprocess_fil_stage.py b/morpheus/stages/preprocess/preprocess_fil_stage.py index 45b1640d72..cbfc6a581f 100644 --- a/morpheus/stages/preprocess/preprocess_fil_stage.py +++ b/morpheus/stages/preprocess/preprocess_fil_stage.py @@ -23,6 +23,7 @@ import cudf +import morpheus._lib.messages as _messages import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.config import Config @@ -32,7 +33,6 @@ from morpheus.messages import MultiInferenceFILMessage from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage logger = logging.getLogger(__name__) @@ -123,7 +123,8 @@ def process_control_message(x: ControlMessage, fea_len: int, fea_cols: typing.Li seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32) seg_ids[:, 2] = fea_len - 1 - x.tensors(CppTensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids})) + # We need the C++ impl of TensorMemory until #1646 is resolved + x.tensors(_messages.TensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids})) return x @staticmethod diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index feace923dc..de610ab52c 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -24,6 +24,7 @@ import cudf +import morpheus._lib.messages as _messages import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath @@ -35,7 +36,6 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series @@ -204,13 +204,14 @@ def process_control_message(message: ControlMessage, del text_series + # We need the C++ impl of TensorMemory until #1646 is resolved message.tensors( - CppTensorMemory(count=tokenized.input_ids.shape[0], - tensors={ - "input_ids": tokenized.input_ids, - "input_mask": tokenized.input_mask, - "seq_ids": tokenized.segment_ids - })) + _messages.TensorMemory(count=tokenized.input_ids.shape[0], + tensors={ + "input_ids": tokenized.input_ids, + "input_mask": tokenized.input_mask, + "seq_ids": tokenized.segment_ids + })) message.set_metadata("inference_memory_params", {"inference_type": "nlp"}) return message diff --git a/tests/examples/gnn_fraud_detection_pipeline/conftest.py b/tests/examples/gnn_fraud_detection_pipeline/conftest.py index a625d51862..30176f71e4 100644 --- a/tests/examples/gnn_fraud_detection_pipeline/conftest.py +++ b/tests/examples/gnn_fraud_detection_pipeline/conftest.py @@ -44,7 +44,7 @@ def cuml_fixture(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The GNN fraud detection pipeline utilizes the "other" pipeline mode. """ diff --git a/tests/examples/log_parsing/conftest.py b/tests/examples/log_parsing/conftest.py index d31891873a..f927c3fcc1 100644 --- a/tests/examples/log_parsing/conftest.py +++ b/tests/examples/log_parsing/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The log_parsing pipelie requires NLP mode. Set this here so all the tests don't need to set it themselves. """ diff --git a/tests/examples/ransomware_detection/conftest.py b/tests/examples/ransomware_detection/conftest.py index e1c5e2541d..a92786555a 100644 --- a/tests/examples/ransomware_detection/conftest.py +++ b/tests/examples/ransomware_detection/conftest.py @@ -39,7 +39,7 @@ def dask_distributed(fail_missing: bool): @pytest.fixture(name="config") -def config_fixture(config): +def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument """ The ransomware detection pipeline utilizes the FIL pipeline mode. """ diff --git a/tests/stages/test_preprocess_fil_stage.py b/tests/stages/test_preprocess_fil_stage.py index eb6dc8b620..638fcaa994 100644 --- a/tests/stages/test_preprocess_fil_stage.py +++ b/tests/stages/test_preprocess_fil_stage.py @@ -27,7 +27,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.feature_length = 1 config.fil = ConfigFIL() config.fil.feature_columns = ["data"] diff --git a/tests/stages/test_preprocess_nlp_stage.py b/tests/stages/test_preprocess_nlp_stage.py index 9c2b5d4e39..22fc99e04a 100644 --- a/tests/stages/test_preprocess_nlp_stage.py +++ b/tests/stages/test_preprocess_nlp_stage.py @@ -29,7 +29,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = [ "address", "bank_acct", diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index 279963ba9a..80091f3dc5 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name="config") -def config_fixture(config: Config): +def config_fixture(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = ['frogs', 'lizards', 'toads'] yield config diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index ad67709959..e454a0e35f 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -31,7 +31,7 @@ @pytest.fixture(name='config') -def fixture_config(config: Config): +def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument config.class_labels = ['frogs', 'lizards', 'toads'] config.feature_length = 12 yield config