Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate strings exceeding max_length when inserting to Milvus #1665

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ab7eaf4
First pass at a truncate strings method
dagardner-nv Apr 23, 2024
54e7681
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Apr 23, 2024
2f14478
Tests for new truncate_string_cols_by_bytes method and the _cudf_need…
dagardner-nv Apr 23, 2024
83ad8ad
Log a warning when we truncate a column
dagardner-nv Apr 23, 2024
f91d9ac
Refactor such that each column has it's own max length
dagardner-nv Apr 24, 2024
10a68f9
Expand tests
dagardner-nv Apr 24, 2024
c80fc27
Add in truncating of long string fields.
dagardner-nv Apr 24, 2024
4b771e8
Use DataFrameType alias
dagardner-nv Apr 24, 2024
3b6408e
Cleanup
dagardner-nv Apr 24, 2024
9c5435c
Exclude string type from max_length checking
dagardner-nv Apr 24, 2024
dbe34dc
Ensure truncate_long_strings parameter is set in configs and passed a…
dagardner-nv Apr 24, 2024
9acea9e
Add docstring for truncate_long_strings
dagardner-nv Apr 24, 2024
078cd19
Add docstring for warn_on_truncate
dagardner-nv Apr 24, 2024
00f8170
Merge branch 'branch-24.06' into david-truncate-milvus-1650
dagardner-nv Apr 24, 2024
2b72313
Add type-alias for Series type
dagardner-nv Apr 25, 2024
f26ce68
Refactor cudf_string_cols_exceed_max_bytes and truncate_string_cols_b…
dagardner-nv Apr 25, 2024
4a6de1b
Refactor to call cudf_string_cols_exceed_max_bytes prior to convertin…
dagardner-nv Apr 25, 2024
e121b33
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Apr 25, 2024
5f3804f
Test WIP [no ci]
dagardner-nv Apr 25, 2024
701e1f9
Fix bug where truncate was always enabled for cudf dataframes
dagardner-nv Apr 25, 2024
38d3125
Finish up tests
dagardner-nv Apr 25, 2024
a5f05e1
Remove parametarization based on num_rows, not important for this tes…
dagardner-nv Apr 25, 2024
35dc567
Remove old param
dagardner-nv Apr 25, 2024
a6cb131
Lint fixes
dagardner-nv Apr 25, 2024
148d398
Remove stray print method
dagardner-nv Apr 25, 2024
d352930
Don't hard-code the name of the probabilities tensor, don't assume it…
dagardner-nv Apr 25, 2024
bc1e3c2
Re-work hard-coded probs->embeddings copy that used to exist in infer…
dagardner-nv Apr 25, 2024
078780f
Lint fix
dagardner-nv Apr 25, 2024
f3d0334
Re-enable C++ mode support
dagardner-nv Apr 25, 2024
abeb811
Remove the two issues this PR should resolve
dagardner-nv Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/source/extra_info/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,5 @@ limitations under the License.

- TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/pull/1641))
- vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/pull/1649))
- vdb_upload example pipeline error on inserting large strings ([#1650](https://github.com/nv-morpheus/Morpheus/pull/1650))
- vdb_upload example pipeline only works with C++ mode disabled ([#1651](https://github.com/nv-morpheus/Morpheus/pull/1651))

Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues)
2 changes: 1 addition & 1 deletion examples/llm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
callback=parse_log_level,
help="Specify the logging level to use.")
@click.option('--use_cpp',
default=False,
default=True,
type=bool,
help=("Whether or not to use C++ node and message types or to prefer python. "
"Only use as a last resort if bugs are encountered"))
Expand Down
19 changes: 18 additions & 1 deletion examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from vdb_upload.helper import process_vdb_sources

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.pipeline.pipeline import Pipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
Expand Down Expand Up @@ -78,6 +80,20 @@ def pipeline(pipeline_config: Config,
monitor_2 = pipe.add_stage(
MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True))

@stage
def embedding_tensor_to_df(message: ControlMessage, *, embedding_tensor_name='probs') -> ControlMessage:
"""
Copies the probs tensor to the 'embedding' field of the dataframe.
"""
msg_meta = message.payload()
with msg_meta.mutable_dataframe() as df:
embedding_tensor = message.tensors().get_tensor(embedding_tensor_name)
df['embedding'] = embedding_tensor.tolist()

return message

embedding_tensor_to_df_stage = pipe.add_stage(embedding_tensor_to_df(pipeline_config))

vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config))

monitor_3 = pipe.add_stage(
Expand All @@ -96,7 +112,8 @@ def pipeline(pipeline_config: Config,
pipe.add_edge(nlp_stage, monitor_1)
pipe.add_edge(monitor_1, embedding_stage)
pipe.add_edge(embedding_stage, monitor_2)
pipe.add_edge(monitor_2, vector_db)
pipe.add_edge(monitor_2, embedding_tensor_to_df_stage)
pipe.add_edge(embedding_tensor_to_df_stage, vector_db)
pipe.add_edge(vector_db, monitor_3)

start_time = time.time()
Expand Down
3 changes: 2 additions & 1 deletion examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,15 @@ def build_cli_configs(source_type,
cli_vdb_conf = {
# Vector db upload has some significant transaction overhead, batch size here should be as large as possible
'batch_size': 16384,
'resource_name': vector_db_resource_name,
'embedding_size': embedding_size,
'recreate': True,
'resource_name': vector_db_resource_name,
'resource_schemas': {
vector_db_resource_name:
build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
},
'service': vector_db_service,
'truncate_long_strings': True,
'uri': vector_db_uri,
}

Expand Down
96 changes: 96 additions & 0 deletions morpheus/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@
# limitations under the License.
"""IO utilities."""

import logging

import pandas as pd

import cudf

from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import SeriesType

logger = logging.getLogger(__name__)


def filter_null_data(x: DataFrameType):
Expand All @@ -31,3 +40,90 @@ def filter_null_data(x: DataFrameType):
return x

return x[~x['data'].isna()]


def cudf_string_cols_exceed_max_bytes(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool:
"""
Checks a cudf DataFrame for string columns that exceed a maximum number of bytes and thus need to be truncated by
calling `truncate_string_cols_by_bytes`.

This method utilizes a cudf method `Series.str.byte_count()` method that pandas lacks, which can avoid a costly
call to truncate_string_cols_by_bytes.

Parameters
----------
df : DataFrameType
The dataframe to check.
column_max_bytes: dict[str, int]
A mapping of string column names to the maximum number of bytes for each column.

Returns
-------
bool
True if truncation is needed, False otherwise.
"""
if not isinstance(df, cudf.DataFrame):
raise ValueError("Expected cudf DataFrame")

for (col, max_bytes) in column_max_bytes.items():
series: cudf.Series = df[col]

assert series.dtype == 'object'

if series.str.byte_count().max() > max_bytes:
return True

return False


def truncate_string_cols_by_bytes(df: DataFrameType,
column_max_bytes: dict[str, int],
warn_on_truncate: bool = True) -> bool:
"""
Truncates all string columns in a dataframe to a maximum number of bytes. This operation is performed in-place on
the dataframe.

Parameters
----------
df : DataFrameType
The dataframe to truncate.
column_max_bytes: dict[str, int]
A mapping of string column names to the maximum number of bytes for each column.
warn_on_truncate: bool, default True
Whether to log a warning when truncating a column.

Returns
-------
bool
True if truncation was performed, False otherwise.
"""

performed_truncation = False
is_cudf = isinstance(df, cudf.DataFrame)

for (col, max_bytes) in column_max_bytes.items():
series: SeriesType = df[col]

if is_cudf:
series: pd.Series = series.to_pandas()

assert series.dtype == 'object', f"Expected string column '{col}'"

encoded_series = series.str.encode(encoding='utf-8', errors='strict')
if encoded_series.str.len().max() > max_bytes:
performed_truncation = True
if warn_on_truncate:
logger.warning("Truncating column '%s' to %d bytes", col, max_bytes)

truncated_series = encoded_series.str.slice(0, max_bytes)

# There is a possibility that slicing by max_len will slice a multi-byte character in half setting
# errors='ignore' will cause the resulting string to be truncated after the last full character
decoded_series = truncated_series.str.decode(encoding='utf-8', errors='ignore')

if is_cudf:
df[col] = cudf.Series.from_pandas(decoded_series)
else:
df[col] = decoded_series

return performed_truncation
73 changes: 52 additions & 21 deletions morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@
import typing
from functools import wraps

import pandas as pd

import cudf

from morpheus.io.utils import cudf_string_cols_exceed_max_bytes
from morpheus.io.utils import truncate_string_cols_by_bytes
from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

IMPORT_EXCEPTION = None
IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed."

# Milvus has a max string length in bytes of 65,535. Multi-byte characters like "ñ" will have a string length of 1, the
# byte length encoded as UTF-8 will be 2
# https://milvus.io/docs/limitations.md#Length-of-a-string
MAX_STRING_LENGTH_BYTES = 65_535

try:
import pymilvus
from pymilvus.orm.mutation import MutationResult
Expand Down Expand Up @@ -222,9 +228,11 @@ class MilvusVectorDBResourceService(VectorDBResourceService):
Name of the resource.
client : MilvusClient
An instance of the MilvusClient for interaction with the Milvus Vector Database.
truncate_long_strings : bool, optional
When true, truncate strings values that are longer than the max length of the field
"""

def __init__(self, name: str, client: "MilvusClient") -> None:
def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: bool = False) -> None:
if IMPORT_EXCEPTION is not None:
raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION

Expand All @@ -239,13 +247,24 @@ def __init__(self, name: str, client: "MilvusClient") -> None:
self._vector_field = None
self._fillna_fields_dict = {}

# Mapping of field name to max length for string fields
self._fields_max_length: dict[str, int] = {}

for field in self._fields:
if field.dtype == pymilvus.DataType.FLOAT_VECTOR:
self._vector_field = field.name
else:
# Intentionally excluding pymilvus.DataType.STRING, in our current version it isn't supported, and in
# some database systems string types don't have a max length.
if field.dtype == pymilvus.DataType.VARCHAR:
max_length = field.params.get('max_length')
if max_length is not None:
self._fields_max_length[field.name] = max_length
if not field.auto_id:
self._fillna_fields_dict[field.name] = field.dtype

self._truncate_long_strings = truncate_long_strings

self._collection.load()

def _set_up_collection(self):
Expand Down Expand Up @@ -275,13 +294,13 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any])

return self._insert_result_to_dict(result=result)

def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict:
def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict:
"""
Insert a dataframe entires into the vector database.

Parameters
----------
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted into the collection.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to the vector database implementation.
Expand All @@ -291,10 +310,6 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa
dict
Returns response content as a dictionary.
"""

if isinstance(df, cudf.DataFrame):
df = df.to_pandas()

# Ensure that there are no None values in the DataFrame entries.
for field_name, dtype in self._fillna_fields_dict.items():
if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING):
Expand All @@ -311,11 +326,24 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa
else:
logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype)

needs_truncate = self._truncate_long_strings
if needs_truncate and isinstance(df, cudf.DataFrame):
# Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the
# string columns are already below the max length
needs_truncate = cudf_string_cols_exceed_max_bytes(df, self._fields_max_length)

# From the schema, this is the list of columns we need, excluding any auto_id columns
column_names = [field.name for field in self._fields if not field.auto_id]

collection_df = df[column_names]
if isinstance(collection_df, cudf.DataFrame):
collection_df = collection_df.to_pandas()

if needs_truncate:
truncate_string_cols_by_bytes(collection_df, self._fields_max_length, warn_on_truncate=True)

# Note: dataframe columns has to be in the order of collection schema fields.s
result = self._collection.insert(data=df[column_names], **kwargs)
result = self._collection.insert(data=collection_df, **kwargs)
self._collection.flush()

return self._insert_result_to_dict(result=result)
Expand Down Expand Up @@ -575,6 +603,8 @@ class MilvusVectorDBService(VectorDBService):
The port number for connecting to the Milvus server.
alias : str, optional
Alias for the Milvus connection, by default "default".
truncate_long_strings : bool, optional
When true, truncate strings values that are longer than the max length of the field
**kwargs : dict
Additional keyword arguments specific to the Milvus connection configuration.
"""
Expand All @@ -589,13 +619,17 @@ def __init__(self,
password: str = "",
db_name: str = "",
token: str = "",
truncate_long_strings: bool = False,
**kwargs: dict[str, typing.Any]):

self._truncate_long_strings = truncate_long_strings
self._client = MilvusClient(uri=uri, user=user, password=password, db_name=db_name, token=token, **kwargs)

def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> MilvusVectorDBResourceService:

return MilvusVectorDBResourceService(name=name, client=self._client, **kwargs)
return MilvusVectorDBResourceService(name=name,
client=self._client,
truncate_long_strings=self._truncate_long_strings,
**kwargs)

def has_store_object(self, name: str) -> bool:
"""
Expand Down Expand Up @@ -688,7 +722,7 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.
for part in partition_conf["partitions"]:
self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout)

def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) -> list[dict]:
def _build_schema_conf(self, df: DataFrameType) -> list[dict]:
fields = []

# Always add a primary key
Expand All @@ -708,7 +742,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) ->
}

if (field_dict["dtype"] == pymilvus.DataType.VARCHAR):
field_dict["max_length"] = 65_535
field_dict["max_length"] = MAX_STRING_LENGTH_BYTES

if (field_dict["dtype"] == pymilvus.DataType.FLOAT_VECTOR
or field_dict["dtype"] == pymilvus.DataType.BINARY_VECTOR):
Expand All @@ -726,7 +760,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) ->

def create_from_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
df: DataFrameType,
overwrite: bool = False,
**kwargs: dict[str, typing.Any]) -> None:
"""
Expand All @@ -736,7 +770,7 @@ def create_from_dataframe(self,
----------
name : str
Name of the collection.
df : Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
The dataframe to create the collection from.
overwrite : bool, optional
Whether to overwrite the collection if it already exists. Default is False.
Expand Down Expand Up @@ -797,18 +831,15 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str,
return resource.insert(data, **kwargs)

@with_collection_lock
def insert_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
**kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""
Converts dataframe to rows and insert to a collection in the Milvus vector database.

Parameters
----------
name : str
Name of the collection to be inserted.
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted in the collection.
**kwargs : dict[str, typing.Any]
Additional keyword arguments containing collection configuration.
Expand Down
Loading
Loading