Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vdb_upload runtime error #1643

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
af9bb17
Add tests to reproduce #1639
dagardner-nv Apr 18, 2024
61d391c
More repo tests
dagardner-nv Apr 18, 2024
9cfed05
Rollback change
dagardner-nv Apr 18, 2024
8f6209a
Only set the pe_count for python impls
dagardner-nv Apr 18, 2024
2d124ad
Revert "More repo tests"
dagardner-nv Apr 18, 2024
1aec440
Revert "Add tests to reproduce #1639"
dagardner-nv Apr 18, 2024
8444f77
Remove the launch options entirely since we currently don't have a re…
dagardner-nv Apr 18, 2024
ee03438
Revert "Remove the launch options entirely since we currently don't h…
dagardner-nv Apr 18, 2024
a0092c7
Revert "Only set the pe_count for python impls"
dagardner-nv Apr 18, 2024
d0d2e70
Disable the hammah validation script until #1641 is resolved
dagardner-nv Apr 18, 2024
e35748e
Don't hard-code the thread-count to 1
dagardner-nv Apr 18, 2024
ab3b144
Ensure the C++ impl of the triton stage only uses a single progress e…
dagardner-nv Apr 18, 2024
c401291
WIP : bypassed the message incompatibility error, but now failing on …
dagardner-nv Apr 18, 2024
a52d6e9
Revert "WIP : bypassed the message incompatibility error, but now fai…
dagardner-nv Apr 19, 2024
1df9642
Mark ControlMessage as one of the accepted input messages
dagardner-nv Apr 19, 2024
f1e703d
Set a default non-None value for stop_after_sec
dagardner-nv Apr 19, 2024
31679dd
Fix import for CppTensorMemory
dagardner-nv Apr 19, 2024
9f09023
Use conda env create --yes instead of --force (#1636)
efajardo-nv Apr 17, 2024
9a7a6ab
Merge branch 'david-triton-sid-threading' of github.com:dagardner-nv/…
dagardner-nv Apr 19, 2024
9a96b3c
Only accept ControlMessage when in Python mode
dagardner-nv Apr 19, 2024
ae7c5c8
Default to rss source
dagardner-nv Apr 19, 2024
ede695d
Disable pylint warning
dagardner-nv Apr 19, 2024
7d758cf
Fix import
dagardner-nv Apr 19, 2024
dbfa525
Fix tests to use the use_cpp fixture
dagardner-nv Apr 19, 2024
3b4e26f
Ensure the C++ impl for tensor memory is used
dagardner-nv Apr 19, 2024
2be4426
Ensure the use_cpp fixture is used
dagardner-nv Apr 19, 2024
cd045a9
Ensure the use_cpp fixture is used in all overloads of the config fix…
dagardner-nv Apr 19, 2024
d5f607f
Fix pyling warnings
dagardner-nv Apr 19, 2024
0a2059a
Merge branch 'branch-24.03' into david-vdb-upload-control-msg-1642
dagardner-nv Apr 19, 2024
f16b7c9
Fix typo, units for stop_after should be records not seconds
dagardner-nv Apr 19, 2024
6101390
Merge branch 'david-vdb-upload-control-msg-1642' of github.com:dagard…
dagardner-nv Apr 19, 2024
8bde832
First pass at optionally truncating long string values
dagardner-nv Apr 19, 2024
c18e7d6
Move truncation to happen after other operations which may expand str…
dagardner-nv Apr 19, 2024
740d063
Fix type-o
dagardner-nv Apr 19, 2024
16ec801
Fix type-o
dagardner-nv Apr 19, 2024
a602a18
Document multibyte issues even though we don't have a fix for it yet
dagardner-nv Apr 22, 2024
142c909
Change the default resource name back to 'RSS', allowing the output o…
dagardner-nv Apr 22, 2024
2d11e18
Remove redundant VAR=$VAR statements, add flags to use the rag pipeli…
dagardner-nv Apr 22, 2024
4719f4d
Revert "Document multibyte issues even though we don't have a fix for…
dagardner-nv Apr 22, 2024
0d7214e
Revert "Fix type-o"
dagardner-nv Apr 22, 2024
cd6af8d
Revert "Fix type-o"
dagardner-nv Apr 22, 2024
d310123
Revert "Move truncation to happen after other operations which may ex…
dagardner-nv Apr 22, 2024
c39b845
Revert "First pass at optionally truncating long string values"
dagardner-nv Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/llm/rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ pipeline option of `rag`:

```bash
export NGC_API_KEY=[YOUR_KEY_HERE]
NGC_API_KEY=${NGC_API_KEY} python examples/llm/main.py rag pipeline
python examples/llm/main.py rag pipeline
```

**Using OpenAI LLM models**

```bash
export OPENAI_API_KEY=[YOUR_KEY_HERE]
OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline
python examples/llm/main.py rag pipeline --llm_service=OpenAI --model_name=gpt-3.5-turbo
```

### Run example (Persistent Pipeline):
Expand All @@ -232,14 +232,14 @@ OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline

```bash
export NGC_API_KEY=[YOUR_KEY_HERE]
python examples/llm/main.py rag persistent
python examples/llm/main.py rag persistent
```

**Using OpenAI LLM models**

```bash
export OPENAI_API_KEY=[YOUR_KEY_HERE]
python examples/llm/main.py rag persistent
python examples/llm/main.py rag persistent
```

### Options:
Expand Down Expand Up @@ -273,4 +273,3 @@ The `rag` command has its own set of options and commands:

- `persistant`
- `pipeline`

4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/module/rss_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RSSSourcePipeSchema(BaseModel):
output_batch_size: int = 2048
request_timeout_sec: float = 2.0
run_indefinitely: bool = True
stop_after_sec: int = 0
stop_after_rec: int = 0
vdb_resource_name: str
web_scraper_config: Optional[Dict[Any, Any]] = None

Expand Down Expand Up @@ -130,7 +130,7 @@ def _rss_source_pipe(builder: mrc.Builder):
"cooldown_interval_sec": validated_config.cooldown_interval_sec,
"request_timeout_sec": validated_config.request_timeout_sec,
"interval_sec": validated_config.interval_sec,
"stop_after_sec": validated_config.stop_after_sec,
"stop_after_rec": validated_config.stop_after_rec,
}
rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config})

Expand Down
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def run():
@click.option("--source_type",
multiple=True,
type=click.Choice(['rss', 'filesystem'], case_sensitive=False),
default=[],
default=['rss'],
show_default=True,
help="The type of source to use. Can specify multiple times for different source types.")
@click.option(
Expand All @@ -128,7 +128,7 @@ def run():
@click.option(
"--vector_db_resource_name",
type=str,
default="VDBUploadExample",
default="RSS",
help="The identifier of the resource on which operations are to be performed in the vector database.",
)
@click.option(
Expand Down
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ vdb_pipeline:
output_batch_size: 2048 # Number of chunked documents per output batch
request_timeout_sec: 2.0
run_indefinitely: true
stop_after_sec: 0
stop_after_rec: 0
web_scraper_config:
chunk_overlap: 51
chunk_size: 512
Expand Down Expand Up @@ -300,4 +300,4 @@ vdb_pipeline:
dtype: FLOAT_VECTOR
description: Embedding vectors representing the data entry
dim: 384 # Size of the embeddings to store in the vector database
description: Collection schema for diverse data sources
description: Collection schema for diverse data sources
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _build_default_rss_source(enable_cache,
"output_batch_size": 2048,
"cache_dir": "./.cache/http",
"cooldown_interval_sec": interval_secs,
"stop_after_sec": stop_after,
"stop_after_rec": stop_after or 0,
"enable_cache": enable_cache,
"enable_monitor": enable_monitors,
"feed_input": feed_inputs if feed_inputs else build_rss_urls(),
Expand Down Expand Up @@ -448,7 +448,7 @@ def build_final_config(vdb_conf_path,
interval_secs=60,
run_indefinitely=True,
stop_after=None,
vector_db_resource_name="VDBUploadExample",
vector_db_resource_name="RSS",
content_chunking_size=128,
rss_request_timeout_sec=30,
feed_inputs=build_rss_urls()))
Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/input/rss_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def fetch_feeds() -> MessageMeta:

records_emitted += df_size

if (0 < validated_config.stop_after_sec <= records_emitted):
if (0 < validated_config.stop_after_rec <= records_emitted):
stop_requested = True
logger.info("Stop limit reached... preparing to halt the source.")
break
Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/schemas/rss_source_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RSSSourceSchema(BaseModel):
cooldown_interval_sec: int = 600
request_timeout_sec: float = 2.0
interval_sec: int = 600
stop_after_sec: int = 0
stop_after_rec: int = 0

class Config:
extra = "forbid"
5 changes: 4 additions & 1 deletion morpheus/stages/inference/inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ def accepted_types(self) -> typing.Tuple:
typing.Tuple
Tuple of input types.
"""
return (MultiInferenceMessage, )
if (self._build_cpp_node()):
return (MultiInferenceMessage, )

return (MultiInferenceMessage, ControlMessage)

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MultiResponseMessage)
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self,
"rss_source": {
"feed_input": feed_input,
"interval_sec": interval_secs,
"stop_after_sec": stop_after,
"stop_after_rec": stop_after,
"run_indefinitely": run_indefinitely,
"batch_size": batch_size,
"enable_cache": enable_cache,
Expand Down
5 changes: 3 additions & 2 deletions morpheus/stages/preprocess/preprocess_fil_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import cudf

import morpheus._lib.messages as _messages
import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
Expand All @@ -32,7 +33,6 @@
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiMessage
from morpheus.messages import TensorMemory as CppTensorMemory
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,7 +123,8 @@ def process_control_message(x: ControlMessage, fea_len: int, fea_cols: typing.Li
seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1

x.tensors(CppTensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids}))
# We need the C++ impl of TensorMemory until #1646 is resolved
x.tensors(_messages.TensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids}))
return x

@staticmethod
Expand Down
15 changes: 8 additions & 7 deletions morpheus/stages/preprocess/preprocess_nlp_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import cudf

import morpheus._lib.messages as _messages
import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.cli.utils import MorpheusRelativePath
Expand All @@ -35,7 +36,6 @@
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import MultiMessage
from morpheus.messages import TensorMemory as CppTensorMemory
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage
from morpheus.utils.cudf_subword_helper import tokenize_text_series

Expand Down Expand Up @@ -204,13 +204,14 @@ def process_control_message(message: ControlMessage,

del text_series

# We need the C++ impl of TensorMemory until #1646 is resolved
message.tensors(
CppTensorMemory(count=tokenized.input_ids.shape[0],
tensors={
"input_ids": tokenized.input_ids,
"input_mask": tokenized.input_mask,
"seq_ids": tokenized.segment_ids
}))
_messages.TensorMemory(count=tokenized.input_ids.shape[0],
tensors={
"input_ids": tokenized.input_ids,
"input_mask": tokenized.input_mask,
"seq_ids": tokenized.segment_ids
}))

message.set_metadata("inference_memory_params", {"inference_type": "nlp"})
return message
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/gnn_fraud_detection_pipeline/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def cuml_fixture(fail_missing: bool):


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The GNN fraud detection pipeline utilizes the "other" pipeline mode.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/log_parsing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The log_parsing pipelie requires NLP mode. Set this here so all the tests don't need to set it themselves.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/ransomware_detection/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def dask_distributed(fail_missing: bool):


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The ransomware detection pipeline utilizes the FIL pipeline mode.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/stages/test_preprocess_fil_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.feature_length = 1
config.fil = ConfigFIL()
config.fil.feature_columns = ["data"]
Expand Down
2 changes: 1 addition & 1 deletion tests/stages/test_preprocess_nlp_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = [
"address",
"bank_acct",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_add_classifications_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@pytest.fixture(name="config")
def config_fixture(config: Config):
def config_fixture(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = ['frogs', 'lizards', 'toads']
yield config

Expand Down
2 changes: 1 addition & 1 deletion tests/test_add_scores_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = ['frogs', 'lizards', 'toads']
config.feature_length = 12
yield config
Expand Down
Loading