Skip to content

Commit

Permalink
Truncate strings exceeding max_length when inserting to Milvus (#1665)
Browse files Browse the repository at this point in the history
* Adds new helper methods to `morpheus.io.utils`, `cudf_string_cols_exceed_max_bytes` and `truncate_string_cols_by_bytes`
* When `truncate_long_strings=True` `MilvusVectorDBResourceService` will truncate all `VARCHAR` fields according to the schema's `max_length`
* Add `truncate_long_strings=True` in config for `vdb_upload` pipeline
* Set C++ mode to default for example LLM pipelines
* Remove issues 1650 & 1651 from `known_issues.md`

Closes #1650 
Closes #1651

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1665
  • Loading branch information
dagardner-nv authored May 1, 2024
1 parent bedc169 commit 57d11a2
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 28 deletions.
2 changes: 0 additions & 2 deletions docs/source/extra_info/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,5 @@ limitations under the License.

- TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/pull/1641))
- vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/pull/1649))
- vdb_upload example pipeline error on inserting large strings ([#1650](https://github.com/nv-morpheus/Morpheus/pull/1650))
- vdb_upload example pipeline only works with C++ mode disabled ([#1651](https://github.com/nv-morpheus/Morpheus/pull/1651))

Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues)
2 changes: 1 addition & 1 deletion examples/llm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
callback=parse_log_level,
help="Specify the logging level to use.")
@click.option('--use_cpp',
default=False,
default=True,
type=bool,
help=("Whether or not to use C++ node and message types or to prefer python. "
"Only use as a last resort if bugs are encountered"))
Expand Down
19 changes: 18 additions & 1 deletion examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from vdb_upload.helper import process_vdb_sources

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.pipeline.pipeline import Pipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
Expand Down Expand Up @@ -78,6 +80,20 @@ def pipeline(pipeline_config: Config,
monitor_2 = pipe.add_stage(
MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True))

@stage
def embedding_tensor_to_df(message: ControlMessage, *, embedding_tensor_name='probs') -> ControlMessage:
"""
Copies the probs tensor to the 'embedding' field of the dataframe.
"""
msg_meta = message.payload()
with msg_meta.mutable_dataframe() as df:
embedding_tensor = message.tensors().get_tensor(embedding_tensor_name)
df['embedding'] = embedding_tensor.tolist()

return message

embedding_tensor_to_df_stage = pipe.add_stage(embedding_tensor_to_df(pipeline_config))

vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config))

monitor_3 = pipe.add_stage(
Expand All @@ -96,7 +112,8 @@ def pipeline(pipeline_config: Config,
pipe.add_edge(nlp_stage, monitor_1)
pipe.add_edge(monitor_1, embedding_stage)
pipe.add_edge(embedding_stage, monitor_2)
pipe.add_edge(monitor_2, vector_db)
pipe.add_edge(monitor_2, embedding_tensor_to_df_stage)
pipe.add_edge(embedding_tensor_to_df_stage, vector_db)
pipe.add_edge(vector_db, monitor_3)

start_time = time.time()
Expand Down
3 changes: 2 additions & 1 deletion examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,15 @@ def build_cli_configs(source_type,
cli_vdb_conf = {
# Vector db upload has some significant transaction overhead, batch size here should be as large as possible
'batch_size': 16384,
'resource_name': vector_db_resource_name,
'embedding_size': embedding_size,
'recreate': True,
'resource_name': vector_db_resource_name,
'resource_schemas': {
vector_db_resource_name:
build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
},
'service': vector_db_service,
'truncate_long_strings': True,
'uri': vector_db_uri,
}

Expand Down
96 changes: 96 additions & 0 deletions morpheus/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@
# limitations under the License.
"""IO utilities."""

import logging

import pandas as pd

import cudf

from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import SeriesType

logger = logging.getLogger(__name__)


def filter_null_data(x: DataFrameType):
Expand All @@ -31,3 +40,90 @@ def filter_null_data(x: DataFrameType):
return x

return x[~x['data'].isna()]


def cudf_string_cols_exceed_max_bytes(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool:
"""
Checks a cudf DataFrame for string columns that exceed a maximum number of bytes and thus need to be truncated by
calling `truncate_string_cols_by_bytes`.
This method utilizes a cudf method `Series.str.byte_count()` method that pandas lacks, which can avoid a costly
call to truncate_string_cols_by_bytes.
Parameters
----------
df : DataFrameType
The dataframe to check.
column_max_bytes: dict[str, int]
A mapping of string column names to the maximum number of bytes for each column.
Returns
-------
bool
True if truncation is needed, False otherwise.
"""
if not isinstance(df, cudf.DataFrame):
raise ValueError("Expected cudf DataFrame")

for (col, max_bytes) in column_max_bytes.items():
series: cudf.Series = df[col]

assert series.dtype == 'object'

if series.str.byte_count().max() > max_bytes:
return True

return False


def truncate_string_cols_by_bytes(df: DataFrameType,
column_max_bytes: dict[str, int],
warn_on_truncate: bool = True) -> bool:
"""
Truncates all string columns in a dataframe to a maximum number of bytes. This operation is performed in-place on
the dataframe.
Parameters
----------
df : DataFrameType
The dataframe to truncate.
column_max_bytes: dict[str, int]
A mapping of string column names to the maximum number of bytes for each column.
warn_on_truncate: bool, default True
Whether to log a warning when truncating a column.
Returns
-------
bool
True if truncation was performed, False otherwise.
"""

performed_truncation = False
is_cudf = isinstance(df, cudf.DataFrame)

for (col, max_bytes) in column_max_bytes.items():
series: SeriesType = df[col]

if is_cudf:
series: pd.Series = series.to_pandas()

assert series.dtype == 'object', f"Expected string column '{col}'"

encoded_series = series.str.encode(encoding='utf-8', errors='strict')
if encoded_series.str.len().max() > max_bytes:
performed_truncation = True
if warn_on_truncate:
logger.warning("Truncating column '%s' to %d bytes", col, max_bytes)

truncated_series = encoded_series.str.slice(0, max_bytes)

# There is a possibility that slicing by max_len will slice a multi-byte character in half setting
# errors='ignore' will cause the resulting string to be truncated after the last full character
decoded_series = truncated_series.str.decode(encoding='utf-8', errors='ignore')

if is_cudf:
df[col] = cudf.Series.from_pandas(decoded_series)
else:
df[col] = decoded_series

return performed_truncation
73 changes: 52 additions & 21 deletions morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@
import typing
from functools import wraps

import pandas as pd

import cudf

from morpheus.io.utils import cudf_string_cols_exceed_max_bytes
from morpheus.io.utils import truncate_string_cols_by_bytes
from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

IMPORT_EXCEPTION = None
IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed."

# Milvus has a max string length in bytes of 65,535. Multi-byte characters like "ñ" will have a string length of 1, the
# byte length encoded as UTF-8 will be 2
# https://milvus.io/docs/limitations.md#Length-of-a-string
MAX_STRING_LENGTH_BYTES = 65_535

try:
import pymilvus
from pymilvus.orm.mutation import MutationResult
Expand Down Expand Up @@ -222,9 +228,11 @@ class MilvusVectorDBResourceService(VectorDBResourceService):
Name of the resource.
client : MilvusClient
An instance of the MilvusClient for interaction with the Milvus Vector Database.
truncate_long_strings : bool, optional
When true, truncate strings values that are longer than the max length of the field
"""

def __init__(self, name: str, client: "MilvusClient") -> None:
def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: bool = False) -> None:
if IMPORT_EXCEPTION is not None:
raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION

Expand All @@ -239,13 +247,24 @@ def __init__(self, name: str, client: "MilvusClient") -> None:
self._vector_field = None
self._fillna_fields_dict = {}

# Mapping of field name to max length for string fields
self._fields_max_length: dict[str, int] = {}

for field in self._fields:
if field.dtype == pymilvus.DataType.FLOAT_VECTOR:
self._vector_field = field.name
else:
# Intentionally excluding pymilvus.DataType.STRING, in our current version it isn't supported, and in
# some database systems string types don't have a max length.
if field.dtype == pymilvus.DataType.VARCHAR:
max_length = field.params.get('max_length')
if max_length is not None:
self._fields_max_length[field.name] = max_length
if not field.auto_id:
self._fillna_fields_dict[field.name] = field.dtype

self._truncate_long_strings = truncate_long_strings

self._collection.load()

def _set_up_collection(self):
Expand Down Expand Up @@ -275,13 +294,13 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any])

return self._insert_result_to_dict(result=result)

def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict:
def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict:
"""
Insert a dataframe entires into the vector database.
Parameters
----------
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted into the collection.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to the vector database implementation.
Expand All @@ -291,10 +310,6 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa
dict
Returns response content as a dictionary.
"""

if isinstance(df, cudf.DataFrame):
df = df.to_pandas()

# Ensure that there are no None values in the DataFrame entries.
for field_name, dtype in self._fillna_fields_dict.items():
if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING):
Expand All @@ -311,11 +326,24 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa
else:
logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype)

needs_truncate = self._truncate_long_strings
if needs_truncate and isinstance(df, cudf.DataFrame):
# Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the
# string columns are already below the max length
needs_truncate = cudf_string_cols_exceed_max_bytes(df, self._fields_max_length)

# From the schema, this is the list of columns we need, excluding any auto_id columns
column_names = [field.name for field in self._fields if not field.auto_id]

collection_df = df[column_names]
if isinstance(collection_df, cudf.DataFrame):
collection_df = collection_df.to_pandas()

if needs_truncate:
truncate_string_cols_by_bytes(collection_df, self._fields_max_length, warn_on_truncate=True)

# Note: dataframe columns has to be in the order of collection schema fields.s
result = self._collection.insert(data=df[column_names], **kwargs)
result = self._collection.insert(data=collection_df, **kwargs)
self._collection.flush()

return self._insert_result_to_dict(result=result)
Expand Down Expand Up @@ -575,6 +603,8 @@ class MilvusVectorDBService(VectorDBService):
The port number for connecting to the Milvus server.
alias : str, optional
Alias for the Milvus connection, by default "default".
truncate_long_strings : bool, optional
When true, truncate strings values that are longer than the max length of the field
**kwargs : dict
Additional keyword arguments specific to the Milvus connection configuration.
"""
Expand All @@ -589,13 +619,17 @@ def __init__(self,
password: str = "",
db_name: str = "",
token: str = "",
truncate_long_strings: bool = False,
**kwargs: dict[str, typing.Any]):

self._truncate_long_strings = truncate_long_strings
self._client = MilvusClient(uri=uri, user=user, password=password, db_name=db_name, token=token, **kwargs)

def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> MilvusVectorDBResourceService:

return MilvusVectorDBResourceService(name=name, client=self._client, **kwargs)
return MilvusVectorDBResourceService(name=name,
client=self._client,
truncate_long_strings=self._truncate_long_strings,
**kwargs)

def has_store_object(self, name: str) -> bool:
"""
Expand Down Expand Up @@ -688,7 +722,7 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.
for part in partition_conf["partitions"]:
self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout)

def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) -> list[dict]:
def _build_schema_conf(self, df: DataFrameType) -> list[dict]:
fields = []

# Always add a primary key
Expand All @@ -708,7 +742,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) ->
}

if (field_dict["dtype"] == pymilvus.DataType.VARCHAR):
field_dict["max_length"] = 65_535
field_dict["max_length"] = MAX_STRING_LENGTH_BYTES

if (field_dict["dtype"] == pymilvus.DataType.FLOAT_VECTOR
or field_dict["dtype"] == pymilvus.DataType.BINARY_VECTOR):
Expand All @@ -726,7 +760,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) ->

def create_from_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
df: DataFrameType,
overwrite: bool = False,
**kwargs: dict[str, typing.Any]) -> None:
"""
Expand All @@ -736,7 +770,7 @@ def create_from_dataframe(self,
----------
name : str
Name of the collection.
df : Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
The dataframe to create the collection from.
overwrite : bool, optional
Whether to overwrite the collection if it already exists. Default is False.
Expand Down Expand Up @@ -797,18 +831,15 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str,
return resource.insert(data, **kwargs)

@with_collection_lock
def insert_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
**kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""
Converts dataframe to rows and insert to a collection in the Milvus vector database.
Parameters
----------
name : str
Name of the collection to be inserted.
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted in the collection.
**kwargs : dict[str, typing.Any]
Additional keyword arguments containing collection configuration.
Expand Down
Loading

0 comments on commit 57d11a2

Please sign in to comment.