Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard optional dependencies in try/except blocks #1382

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
be39cb6
First pass at updating meta.yaml, untested
dagardner-nv Nov 22, 2023
ace5059
For now exclude the pip only deps
dagardner-nv Nov 22, 2023
48fda28
Specify pip only dependencies
dagardner-nv Nov 22, 2023
d5e29cf
Move pip only deps to runtime.yml
dagardner-nv Nov 22, 2023
07fd159
Add transitive dep for py4j which is needed by databricks-connect
dagardner-nv Nov 22, 2023
40205c7
Add missing milvus mark
dagardner-nv Nov 22, 2023
3388ee3
Remove packages which are only used by a single stage/node as these m…
dagardner-nv Nov 23, 2023
7a7e363
Import optional dependencies in a try/except block, and only raise wh…
dagardner-nv Nov 23, 2023
34b5c2f
Import optional deps in a try/except block
dagardner-nv Nov 23, 2023
69b1716
Merge branch 'david-missing-milvus-mark' into david-conda-deps-1380-p2
dagardner-nv Nov 23, 2023
e628c36
Fix dependency check
dagardner-nv Nov 23, 2023
7ec5ac7
Ensure optional deps are quoted if they appear in type annotations, m…
dagardner-nv Nov 27, 2023
58bb0df
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 27, 2023
79fde60
Fix
dagardner-nv Nov 27, 2023
d91099a
Since our milvus client inherits from the upstream client we can't do…
dagardner-nv Nov 27, 2023
336c9c3
Fix updated calls to _verify_deps
dagardner-nv Nov 27, 2023
7ecd37c
Revert unintentional change
dagardner-nv Nov 27, 2023
2ed7ca2
Remove unused logger definition
dagardner-nv Nov 27, 2023
b1e6123
Don't log errors for optional deps
dagardner-nv Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@ outputs:
- python-confluent-kafka 1.9.2
- pytorch 2.0.1
- pytorch-cuda
- requests=2.31
- requests-cache=1.1
- scikit-learn 1.2.2.*
- sqlalchemy <=2.0 # 2.0 is incompatible with pandas=1.3
- tqdm 4.*
- tritonclient 2.26.*
- typing_utils 0.1.*
- watchdog 2.1.*
- websockets
run_constrained:
# Since we dont explicitly require this but other packages might, constrain the versions.
- {{ pin_compatible('cudatoolkit', min_pin='x.x', max_pin='x') }}
Expand Down
17 changes: 13 additions & 4 deletions morpheus/controllers/elasticsearch_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
import time

import pandas as pd
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('ESConnectionError', 'Elasticsearch', 'parallel_bulk')
IMPORT_ERROR_MESSAGE = "ElasticsearchController requires the elasticsearch package to be installed."

try:
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
except ImportError:
pass


class ElasticsearchController:
"""
Expand All @@ -38,7 +47,7 @@ class ElasticsearchController:
"""

def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
self._client = None
self._last_refresh_time = None
self._raise_on_exception = raise_on_exception
Expand Down
19 changes: 14 additions & 5 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
from dataclasses import dataclass
from urllib.parse import urlparse

import feedparser
import pandas as pd
import requests
import requests_cache
from bs4 import BeautifulSoup

from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('BeautifulSoup', 'feedparser')
IMPORT_ERROR_MESSAGE = "RSSController requires the bs4 and feedparser packages to be installed"

try:
import feedparser
from bs4 import BeautifulSoup
except ImportError:
pass


@dataclass
class FeedStats:
Expand Down Expand Up @@ -72,7 +81,7 @@ def __init__(self,
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600,
request_timeout: float = 2.0):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
if (isinstance(feed_input, str)):
feed_input = [feed_input]

Expand Down Expand Up @@ -151,7 +160,7 @@ def _read_file_content(self, file_path: str) -> str:
with open(file_path, 'r', encoding="utf-8") as file:
return file.read()

def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> feedparser.FeedParserDict:
def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict":

feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input)

Expand Down Expand Up @@ -191,7 +200,7 @@ def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) ->

return feed

def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict:
def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":
is_url = RSSController.is_url(url)

fallback = False
Expand Down
15 changes: 4 additions & 11 deletions morpheus/llm/services/nemo_llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from morpheus.llm.services.llm_service import LLMClient
from morpheus.llm.services.llm_service import LLMService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

Expand All @@ -29,15 +30,7 @@
try:
import nemollm
except ImportError:
logger.error(IMPORT_ERROR_MESSAGE)


def _verify_nemo_llm():
"""
When NemoLLM is not installed, raise an ImportError with a helpful message, rather than an attribute error.
"""
if 'nemollm' not in globals():
raise ImportError(IMPORT_ERROR_MESSAGE)
pass


class NeMoLLMClient(LLMClient):
Expand All @@ -58,7 +51,7 @@ class NeMoLLMClient(LLMClient):

def __init__(self, parent: "NeMoLLMService", model_name: str, **model_kwargs: dict[str, typing.Any]) -> None:
super().__init__()
_verify_nemo_llm()
_verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals())

self._parent = parent
self._model_name = model_name
Expand Down Expand Up @@ -154,7 +147,7 @@ class NeMoLLMService(LLMService):

def __init__(self, *, api_key: str = None, org_id: str = None) -> None:
super().__init__()
_verify_nemo_llm()
_verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals())

api_key = api_key if api_key is not None else os.environ.get("NGC_API_KEY", None)
org_id = org_id if org_id is not None else os.environ.get("NGC_ORG_ID", None)
Expand Down
12 changes: 4 additions & 8 deletions morpheus/llm/services/openai_chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from morpheus.llm.services.llm_service import LLMClient
from morpheus.llm.services.llm_service import LLMService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

Expand All @@ -30,12 +31,7 @@
try:
import openai
except ImportError:
logger.error(IMPORT_ERROR_MESSAGE)


def _verify_openai():
if 'openai' not in globals():
raise ImportError(IMPORT_ERROR_MESSAGE)
pass


class OpenAIChatClient(LLMClient):
Expand All @@ -57,7 +53,7 @@ class OpenAIChatClient(LLMClient):

def __init__(self, model_name: str, set_assistant: bool = False, **model_kwargs: dict[str, typing.Any]) -> None:
super().__init__()
_verify_openai()
_verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals())

self._model_name = model_name
self._set_assistant = set_assistant
Expand Down Expand Up @@ -191,7 +187,7 @@ class OpenAIChatService(LLMService):

def __init__(self) -> None:
super().__init__()
_verify_openai()
_verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals())

def get_client(self,
model_name: str,
Expand Down
17 changes: 0 additions & 17 deletions morpheus/service/vdb/milvus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,9 @@
import typing

from pymilvus import Collection
from pymilvus import DataType
from pymilvus import MilvusClient as PyMilvusClient
from pymilvus.orm.mutation import MutationResult

# Milvus data type mapping dictionary
MILVUS_DATA_TYPE_MAP = {
"int8": DataType.INT8,
"int16": DataType.INT16,
"int32": DataType.INT32,
"int64": DataType.INT64,
"bool": DataType.BOOL,
"float": DataType.FLOAT,
"double": DataType.DOUBLE,
"binary_vector": DataType.BINARY_VECTOR,
"float_vector": DataType.FLOAT_VECTOR,
"string": DataType.STRING,
"varchar": DataType.VARCHAR,
"json": DataType.JSON,
}


def handle_exceptions(func_name: str, error_message: str) -> typing.Callable:
"""
Expand Down
34 changes: 22 additions & 12 deletions morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@
from functools import wraps

import pandas as pd
import pymilvus
from pymilvus.orm.mutation import MutationResult

import cudf

from morpheus.service.vdb.milvus_client import MilvusClient
from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('pymilvus', 'MilvusClient', 'MutationResult')
IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed."

try:
import pymilvus
from pymilvus.orm.mutation import MutationResult

from morpheus.service.vdb.milvus_client import MilvusClient # pylint: disable=ungrouped-imports
except ImportError:
pass


class FieldSchemaEncoder(json.JSONEncoder):

Expand Down Expand Up @@ -75,7 +84,7 @@ def object_hook(obj: dict) -> dict:
return obj

@staticmethod
def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str:
def dump(field: "pymilvus.FieldSchema", f: typing.IO) -> str:
"""
Serialize a FieldSchema object to a JSON file.

Expand All @@ -94,7 +103,7 @@ def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str:
return json.dump(field, f, cls=FieldSchemaEncoder)

@staticmethod
def dumps(field: pymilvus.FieldSchema) -> str:
def dumps(field: "pymilvus.FieldSchema") -> str:
"""
Serialize a FieldSchema object to a JSON-compatible string format.

Expand All @@ -112,7 +121,7 @@ def dumps(field: pymilvus.FieldSchema) -> str:
return json.dumps(field, cls=FieldSchemaEncoder)

@staticmethod
def load(f_obj: typing.IO) -> pymilvus.FieldSchema:
def load(f_obj: typing.IO) -> "pymilvus.FieldSchema":
"""
Deserialize a JSON file to a FieldSchema object.

Expand All @@ -129,7 +138,7 @@ def load(f_obj: typing.IO) -> pymilvus.FieldSchema:
return pymilvus.FieldSchema.construct_from_dict(json.load(f_obj, object_hook=FieldSchemaEncoder.object_hook))

@staticmethod
def loads(field: str) -> pymilvus.FieldSchema:
def loads(field: str) -> "pymilvus.FieldSchema":
"""
Deserialize a JSON-compatible string to a FieldSchema object.

Expand All @@ -147,7 +156,7 @@ def loads(field: str) -> pymilvus.FieldSchema:
return pymilvus.FieldSchema.construct_from_dict(json.loads(field, object_hook=FieldSchemaEncoder.object_hook))

@staticmethod
def from_dict(field: dict) -> pymilvus.FieldSchema:
def from_dict(field: dict) -> "pymilvus.FieldSchema":
"""
Convert a dictionary to a FieldSchema object.

Expand Down Expand Up @@ -216,7 +225,8 @@ class MilvusVectorDBResourceService(VectorDBResourceService):
An instance of the MilvusClient for interaction with the Milvus Vector Database.
"""

def __init__(self, name: str, client: MilvusClient) -> None:
def __init__(self, name: str, client: "MilvusClient") -> None:
_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
super().__init__()

self._name = name
Expand Down Expand Up @@ -525,7 +535,7 @@ def drop(self, **kwargs: dict[str, typing.Any]) -> None:

self._collection.drop(**kwargs)

def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]:
def _insert_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]:
result_dict = {
"primary_keys": result.primary_keys,
"insert_count": result.insert_count,
Expand All @@ -539,7 +549,7 @@ def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any
}
return result_dict

def _update_delete_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]:
def _update_delete_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]:
result_dict = {
"insert_count": result.insert_count,
"delete_count": result.delete_count,
Expand Down Expand Up @@ -613,7 +623,7 @@ def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]:
"""
return self._client.list_collections(**kwargs)

def _create_schema_field(self, field_conf: dict) -> pymilvus.FieldSchema:
def _create_schema_field(self, field_conf: dict) -> "pymilvus.FieldSchema":

field_schema = pymilvus.FieldSchema.construct_from_dict(field_conf)

Expand Down
16 changes: 12 additions & 4 deletions morpheus/stages/input/databricks_deltalake_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
import logging

import mrc
from databricks.connect import DatabricksSession
from pyspark.sql import functions as sf
from pyspark.sql.window import Window

import cudf

Expand All @@ -27,9 +24,20 @@
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.verify_dependencies import _verify_deps

logger = logging.getLogger(__name__)

REQUIRED_DEPS = ('DatabricksSession', 'sf', 'Window')
IMPORT_ERROR_MESSAGE = "DatabricksDeltaLakeSourceStage requires the databricks-connect package to be installed."

try:
from databricks.connect import DatabricksSession
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
except ImportError:
pass


@register_stage("from-databricks-deltalake")
class DataBricksDeltaLakeSourceStage(PreallocatorMixin, SingleOutputSource):
Expand Down Expand Up @@ -59,7 +67,7 @@ def __init__(self,
databricks_host: str = None,
databricks_token: str = None,
databricks_cluster_id: str = None):

_verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals())
super().__init__(config)
self.spark_query = spark_query
self.spark = DatabricksSession.builder.remote(host=databricks_host,
Expand Down
Loading