Skip to content

Commit

Permalink
Fix frontend streaming with background task
Browse files Browse the repository at this point in the history
  • Loading branch information
blythed committed Oct 31, 2024
1 parent 5e6bc46 commit f61d3d2
Show file tree
Hide file tree
Showing 45 changed files with 176 additions and 317 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Changes Since Last Release

#### Changed defaults / behaviours
#### Changed defaults / behaviours

- Change images docker superduper/<image> to superduperio/<image>
- Change the image's user from `/home/superduperdb` to `/home/superduper`
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pip install superduper-framework

**Connect** and **apply** a pre-built template:

(***Note:*** *the pre-built templates are only supported by Python 3.10; you may use all of the other features in Python 3.11+.*)

```bash
git clone https://github.com/superduper-io/superduper && cd superduper
superduper apply templates/retrieval_augmented_generation.zip 'mongodb://localhost:27017/test_db' data=docu
Expand Down
176 changes: 7 additions & 169 deletions superduper/base/superduper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,17 @@
__all__ = ('superduper',)


def superduper(item: t.Optional[t.Any] = None, **kwargs) -> t.Any:
"""Superduper API to automatically wrap a URI to a `data_backend`.
def superduper(item: str | None = None, **kwargs) -> t.Any:
"""Build a superduper connection.
:param item: URI
:param kwargs: Additional keyword arguments to pass to the component
:param item: URI of connection.
:param kwargs: Additional parameters to building `Datalayer`
"""
if item is None:
item = CFG.data_backend

if isinstance(item, str):
return _auto_identify_connection_string(item, **kwargs)

return _DuckTyper.run(item, **kwargs)


def _auto_identify_connection_string(item: str, **kwargs) -> t.Any:
from superduper.base.build import build_datalayer

if item is None:
return build_datalayer()

if item.startswith('mongomock://'):
kwargs['data_backend'] = item

Expand All @@ -43,158 +36,3 @@ def _auto_identify_connection_string(item: str, **kwargs) -> t.Any:
raise ValueError(f'{item} is not a valid connection string')
kwargs['data_backend'] = item
return build_datalayer(CFG, **kwargs)


class _DuckTyper:
attrs: t.Sequence[str]
count: int

@staticmethod
def run(item: t.Any, **kwargs) -> t.Any:
"""
Run the DuckTyper on an item.
:param item: The item to run the DuckTyper on.
:param kwargs: Additional keyword arguments to pass to the Duck
"""
dts = [dt for dt in _DuckTyper._DUCK_TYPES if dt.accept(item)]
if not dts:
raise ValueError(
f'Couldn\'t auto-identify {item}, please wrap explicitly using '
'``superduper.components.*``'
)

if len(dts) == 1:
return dts[0].create(item, **kwargs)

raise ValueError(f'{item} matched more than one type: {dts}')

# TODO: Does this item match the DuckType?
@classmethod
def accept(cls, item: t.Any) -> bool:
"""Check if an item matches the DuckType.
The default implementation returns True if the number of attrs that
the item has is exactly equal to self.count.
"""
return sum(hasattr(item, a) for a in cls.attrs) == cls.count

@classmethod
def create(cls, item: t.Any, **kwargs) -> t.Any:
"""Create a component from the item.
This method should be implemented by subclasses.
:param item: The item to create the component from.
"""
raise NotImplementedError

_DUCK_TYPES: t.List[t.Type] = []

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
_DuckTyper._DUCK_TYPES.append(cls)


# TODO not needed
class MongoDbTyper(_DuckTyper):
"""A DuckTyper for MongoDB databases.
This DuckTyper is used to automatically wrap a MongoDB database in a
Datalayer. # noqa
"""

attrs = ('list_collection_names',)
count = len(attrs)

@classmethod
def accept(cls, item: t.Any) -> bool:
"""Check if an item is a MongoDB database.
:param item: The item to check.
"""
return super().accept(item) and item.__class__.__name__ == 'Database'

@classmethod
def create(cls, item: t.Any, **kwargs) -> t.Any:
"""Create a Datalayer from a MongoDB database.
:param item: A MongoDB database.
"""
from mongomock.database import Database as MockDatabase
from pymongo.database import Database

from superduper import logging
from superduper.backends.mongodb.data_backend import MongoDataBackend
from superduper.base.build import build_datalayer

if not isinstance(item, (Database, MockDatabase)):
raise TypeError(f'Expected Database but got {type(item)}')

logging.warn(
'Note: This is only recommended in development mode, since config\
still holds `data_backend` with the default value, services \
like vector search and cdc cannot be reached due to configuration\
mismatch. Services will be configured with a `data_backend` uri using \
config file hence this client config and\
services config will be different.'
)
databackend = MongoDataBackend(conn=item.client, name=item.name)
return build_datalayer(cfg=CFG, databackend=databackend, **kwargs)


# TODO: Do we still need the DuckTyper for model now?
class SklearnTyper(_DuckTyper):
"""A DuckTyper for scikit-learn estimators # noqa.
This DuckTyper is used to automatically wrap a scikit-learn estimator in
an Estimator.
"""

attrs = '_predict', 'fit', 'score', 'transform'
count = 2

@classmethod
def create(cls, item: t.Any, **kwargs) -> t.Any:
"""Create an Estimator from a scikit-learn estimator.
:param item: A scikit-learn estimator.
"""
from sklearn.base import BaseEstimator
from superduper_sklearn.model import Estimator

if not isinstance(item, BaseEstimator):
raise TypeError('Expected BaseEstimator but got {type(item)}')

kwargs['identifier'] = _auto_identify(item)
return Estimator(object=item, **kwargs)


# TODO remove
class TorchTyper(_DuckTyper):
"""A DuckTyper for torch.nn.Module and torch.jit.ScriptModule.
This DuckTyper is used to automatically wrap a torch.nn.Module or
torch.jit.ScriptModule in a TorchModel. # noqa
"""

attrs = 'forward', 'parameters', 'state_dict', '_load_from_state_dict'
count = len(attrs)

@classmethod
def create(cls, item: t.Any, **kwargs) -> t.Any:
"""Create a TorchModel from a torch.nn.Module or torch.jit.ScriptModule.
:param item: A torch.nn.Module or torch.jit.ScriptModule.
"""
from superduper_torch.model import TorchModel
from torch import jit, nn

if isinstance(item, nn.Module) or isinstance(item, jit.ScriptModule):
return TorchModel(identifier=_auto_identify(item), object=item, **kwargs)

raise TypeError(f'Expected a Module but got {type(item)}')


def _auto_identify(instance: t.Any) -> str:
return instance.__class__.__name__.lower()
24 changes: 13 additions & 11 deletions superduper/rest/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def redirect_stdout_to_file(file_path: str):
"""Context manager to redirect stdout to a specified file temporarily."""
original_stdout = sys.stdout
try:
with open(file_path, 'w', buffering=1) as f:
mode = 'w'
if os.path.exists(file_path):
mode = 'a'
with open(file_path, mode, buffering=1) as f:
sys.stdout = Tee(original_stdout, f)
yield
finally:
Expand Down Expand Up @@ -158,8 +161,13 @@ def test_log():
os.remove(log_file)
return {'status': 'ok'}

def _process_db_apply(db, component):
db.apply(component, force=True)
def _process_db_apply(db, component, id: str | None = None):
if id:
log_file = f"/tmp/{id}.log"
with redirect_stdout_to_file(log_file):
db.apply(component, force=True)
else:
db.apply(component, force=True)

def _process_apply_info(db, info):
if '_variables' in info:
Expand Down Expand Up @@ -193,14 +201,8 @@ async def db_apply(
id: str | None = 'test',
db: 'Datalayer' = DatalayerDependency(),
):
if id:
log_file = f"/tmp/{id}.log"
with redirect_stdout_to_file(log_file):
component = _process_apply_info(db, info)
background_tasks.add_task(_process_db_apply, db, component)
else:
component = _process_apply_info(db, info)
background_tasks.add_task(_process_db_apply, db, component)
component = _process_apply_info(db, info)
background_tasks.add_task(_process_db_apply, db, component, id)
return {'status': 'ok'}

import subprocess
Expand Down
2 changes: 1 addition & 1 deletion superduper/rest/out/404.html

Large diffs are not rendered by default.

Loading

0 comments on commit f61d3d2

Please sign in to comment.