Skip to content

Commit

Permalink
Guard optional dependencies in try/except blocks (#1382)
Browse files Browse the repository at this point in the history
* Add missing dependencies for `requests`, `requests-cache` and `websockets` to `meta.yaml`
* Dependencies which are only utilized by a single stage are intentionally omitted from `meta.yaml`, even if they appear in `cuda11.8_dev.yml`
* Current that list is: `bs4`, `elasticsearch`, `feedparser` and `milvus`
* Move `MILVUS_DATA_TYPE_MAP` from `morpheus/service/vdb/milvus_client.py` to `tests/test_milvus_vector_db_service.py` which was the only place it was used.

Note: Since these dependencies all exist in `cuda11.8_dev.yml`, there is no effort made to update the tests to optionally skip their associated tests.

Closes #1380

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1382
  • Loading branch information
dagardner-nv authored Nov 27, 2023
1 parent f87f16c commit 119ec42
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 83 deletions.
3 changes: 3 additions & 0 deletions ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@ outputs:
- python-confluent-kafka 1.9.2
- pytorch 2.0.1
- pytorch-cuda
- requests=2.31
- requests-cache=1.1
- scikit-learn 1.2.2.*
- sqlalchemy <=2.0 # 2.0 is incompatible with pandas=1.3
- tqdm 4.*
- tritonclient 2.26.*
- typing_utils 0.1.*
- watchdog 2.1.*
- websockets
run_constrained:
# Since we dont explicitly require this but other packages might, constrain the versions.
- {{ pin_compatible('cudatoolkit', min_pin='x.x', max_pin='x') }}
Expand Down
17 changes: 13 additions & 4 deletions morpheus/controllers/elasticsearch_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
import time

import pandas as pd
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('ESConnectionError', 'Elasticsearch', 'parallel_bulk')
IMPORT_ERROR_MESSAGE = "ElasticsearchController requires the elasticsearch package to be installed."

try:
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
except ImportError:
pass


class ElasticsearchController:
"""
Expand All @@ -38,7 +47,7 @@ class ElasticsearchController:
"""

def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
self._client = None
self._last_refresh_time = None
self._raise_on_exception = raise_on_exception
Expand Down
19 changes: 14 additions & 5 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
from dataclasses import dataclass
from urllib.parse import urlparse

import feedparser
import pandas as pd
import requests
import requests_cache
from bs4 import BeautifulSoup

from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('BeautifulSoup', 'feedparser')
IMPORT_ERROR_MESSAGE = "RSSController requires the bs4 and feedparser packages to be installed"

try:
import feedparser
from bs4 import BeautifulSoup
except ImportError:
pass


@dataclass
class FeedStats:
Expand Down Expand Up @@ -72,7 +81,7 @@ def __init__(self,
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600,
request_timeout: float = 2.0):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
if (isinstance(feed_input, str)):
feed_input = [feed_input]

Expand Down Expand Up @@ -151,7 +160,7 @@ def _read_file_content(self, file_path: str) -> str:
with open(file_path, 'r', encoding="utf-8") as file:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> feedparser.FeedParserDict:
def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)

Expand Down Expand Up @@ -191,7 +200,7 @@ def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) ->

return feed

def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict:
def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":
is_url = RSSController.is_url(url)

fallback = False
Expand Down
15 changes: 4 additions & 11 deletions morpheus/llm/services/nemo_llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from morpheus.llm.services.llm_service import LLMClient
from morpheus.llm.services.llm_service import LLMService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

Expand All @@ -29,15 +30,7 @@
try:
import nemollm
except ImportError:
logger.error(IMPORT_ERROR_MESSAGE)


def _verify_nemo_llm():
"""
When NemoLLM is not installed, raise an ImportError with a helpful message, rather than an attribute error.
"""
if 'nemollm' not in globals():
raise ImportError(IMPORT_ERROR_MESSAGE)
pass


class NeMoLLMClient(LLMClient):
Expand All @@ -58,7 +51,7 @@ class NeMoLLMClient(LLMClient):

def __init__(self, parent: "NeMoLLMService", model_name: str, **model_kwargs: dict[str, typing.Any]) -> None:
super().__init__()
_verify_nemo_llm()
_verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals())

self._parent = parent
self._model_name = model_name
Expand Down Expand Up @@ -154,7 +147,7 @@ class NeMoLLMService(LLMService):

def __init__(self, *, api_key: str = None, org_id: str = None) -> None:
super().__init__()
_verify_nemo_llm()
_verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals())

api_key = api_key if api_key is not None else os.environ.get("NGC_API_KEY", None)
org_id = org_id if org_id is not None else os.environ.get("NGC_ORG_ID", None)
Expand Down
12 changes: 4 additions & 8 deletions morpheus/llm/services/openai_chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from morpheus.llm.services.llm_service import LLMClient
from morpheus.llm.services.llm_service import LLMService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

Expand All @@ -30,12 +31,7 @@
try:
import openai
except ImportError:
logger.error(IMPORT_ERROR_MESSAGE)


def _verify_openai():
if 'openai' not in globals():
raise ImportError(IMPORT_ERROR_MESSAGE)
pass


class OpenAIChatClient(LLMClient):
Expand All @@ -57,7 +53,7 @@ class OpenAIChatClient(LLMClient):

def __init__(self, model_name: str, set_assistant: bool = False, **model_kwargs: dict[str, typing.Any]) -> None:
super().__init__()
_verify_openai()
_verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals())

self._model_name = model_name
self._set_assistant = set_assistant
Expand Down Expand Up @@ -191,7 +187,7 @@ class OpenAIChatService(LLMService):

def __init__(self) -> None:
super().__init__()
_verify_openai()
_verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals())

def get_client(self,
model_name: str,
Expand Down
17 changes: 0 additions & 17 deletions morpheus/service/vdb/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,9 @@
import typing

from pymilvus import Collection
from pymilvus import DataType
from pymilvus import MilvusClient as PyMilvusClient
from pymilvus.orm.mutation import MutationResult

# Milvus data type mapping dictionary
MILVUS_DATA_TYPE_MAP = {
"int8": DataType.INT8,
"int16": DataType.INT16,
"int32": DataType.INT32,
"int64": DataType.INT64,
"bool": DataType.BOOL,
"float": DataType.FLOAT,
"double": DataType.DOUBLE,
"binary_vector": DataType.BINARY_VECTOR,
"float_vector": DataType.FLOAT_VECTOR,
"string": DataType.STRING,
"varchar": DataType.VARCHAR,
"json": DataType.JSON,
}


def handle_exceptions(func_name: str, error_message: str) -> typing.Callable:
"""
Expand Down
34 changes: 22 additions & 12 deletions morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@
from functools import wraps

import pandas as pd
import pymilvus
from pymilvus.orm.mutation import MutationResult

import cudf

from morpheus.service.vdb.milvus_client import MilvusClient
from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('pymilvus', 'MilvusClient', 'MutationResult')
IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed."

try:
import pymilvus
from pymilvus.orm.mutation import MutationResult

from morpheus.service.vdb.milvus_client import MilvusClient # pylint: disable=ungrouped-imports
except ImportError:
pass


class FieldSchemaEncoder(json.JSONEncoder):

Expand Down Expand Up @@ -75,7 +84,7 @@ def object_hook(obj: dict) -> dict:
return obj

@staticmethod
def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str:
def dump(field: "pymilvus.FieldSchema", f: typing.IO) -> str:
"""
Serialize a FieldSchema object to a JSON file.
Expand All @@ -94,7 +103,7 @@ def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str:
return json.dump(field, f, cls=FieldSchemaEncoder)

@staticmethod
def dumps(field: pymilvus.FieldSchema) -> str:
def dumps(field: "pymilvus.FieldSchema") -> str:
"""
Serialize a FieldSchema object to a JSON-compatible string format.
Expand All @@ -112,7 +121,7 @@ def dumps(field: pymilvus.FieldSchema) -> str:
return json.dumps(field, cls=FieldSchemaEncoder)

@staticmethod
def load(f_obj: typing.IO) -> pymilvus.FieldSchema:
def load(f_obj: typing.IO) -> "pymilvus.FieldSchema":
"""
Deserialize a JSON file to a FieldSchema object.
Expand All @@ -129,7 +138,7 @@ def load(f_obj: typing.IO) -> pymilvus.FieldSchema:
return pymilvus.FieldSchema.construct_from_dict(json.load(f_obj, object_hook=FieldSchemaEncoder.object_hook))

@staticmethod
def loads(field: str) -> pymilvus.FieldSchema:
def loads(field: str) -> "pymilvus.FieldSchema":
"""
Deserialize a JSON-compatible string to a FieldSchema object.
Expand All @@ -147,7 +156,7 @@ def loads(field: str) -> pymilvus.FieldSchema:
return pymilvus.FieldSchema.construct_from_dict(json.loads(field, object_hook=FieldSchemaEncoder.object_hook))

@staticmethod
def from_dict(field: dict) -> pymilvus.FieldSchema:
def from_dict(field: dict) -> "pymilvus.FieldSchema":
"""
Convert a dictionary to a FieldSchema object.
Expand Down Expand Up @@ -216,7 +225,8 @@ class MilvusVectorDBResourceService(VectorDBResourceService):
An instance of the MilvusClient for interaction with the Milvus Vector Database.
"""

def __init__(self, name: str, client: MilvusClient) -> None:
def __init__(self, name: str, client: "MilvusClient") -> None:
_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
super().__init__()

self._name = name
Expand Down Expand Up @@ -525,7 +535,7 @@ def drop(self, **kwargs: dict[str, typing.Any]) -> None:

self._collection.drop(**kwargs)

def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]:
def _insert_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]:
result_dict = {
"primary_keys": result.primary_keys,
"insert_count": result.insert_count,
Expand All @@ -539,7 +549,7 @@ def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any
}
return result_dict

def _update_delete_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]:
def _update_delete_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]:
result_dict = {
"insert_count": result.insert_count,
"delete_count": result.delete_count,
Expand Down Expand Up @@ -613,7 +623,7 @@ def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]:
"""
return self._client.list_collections(**kwargs)

def _create_schema_field(self, field_conf: dict) -> pymilvus.FieldSchema:
def _create_schema_field(self, field_conf: dict) -> "pymilvus.FieldSchema":

field_schema = pymilvus.FieldSchema.construct_from_dict(field_conf)

Expand Down
16 changes: 12 additions & 4 deletions morpheus/stages/input/databricks_deltalake_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
import logging

import mrc
from databricks.connect import DatabricksSession
from pyspark.sql import functions as sf
from pyspark.sql.window import Window

import cudf

Expand All @@ -27,9 +24,20 @@
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('DatabricksSession', 'sf', 'Window')
IMPORT_ERROR_MESSAGE = "DatabricksDeltaLakeSourceStage requires the databricks-connect package to be installed."

try:
from databricks.connect import DatabricksSession
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
except ImportError:
pass


@register_stage("from-databricks-deltalake")
class DataBricksDeltaLakeSourceStage(PreallocatorMixin, SingleOutputSource):
Expand Down Expand Up @@ -59,7 +67,7 @@ def __init__(self,
databricks_host: str = None,
databricks_token: str = None,
databricks_cluster_id: str = None):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
super().__init__(config)
self.spark_query = spark_query
self.spark = DatabricksSession.builder.remote(host=databricks_host,
Expand Down
Loading

0 comments on commit 119ec42

Please sign in to comment.