Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/component/verbosity #2355

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- component info support list
- Trigger downstream vector indices.
- Fix vector_index function job.
- Fix verbosity in component info

## [0.3.0](https://github.com/superduper-io/superduper/compare/0.3.0...0.2.0]) (2024-Jun-21)

Expand Down
6 changes: 6 additions & 0 deletions superduper/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,9 @@ def shutdown(self) -> None:

def execute_task(self, job_id, dependencies, compute_kwargs={}):
"""Execute task function for distributed backends."""

def connect(self):
"""Connect to address."""

def create_handler(self, *args, **kwargs):
"""Create handler on component declare."""
4 changes: 2 additions & 2 deletions superduper/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from superduper.backends.base.query import Query
from superduper.base.document import _OUTPUTS_KEY
from superduper.components.model import Mapping
from superduper.misc.server import is_csn, request_server
from superduper.misc.server import request_server

from ..jobs.job import Job
from .component import Component
Expand Down Expand Up @@ -86,7 +86,7 @@ def post_create(self, db: "Datalayer") -> None:
self.create_output_dest(db, self.uuid, self.model)
if self.select is not None:
logging.info('Requesting listener setup on CDC service')
if CFG.cluster.cdc.uri and not is_csn('cdc'):
if CFG.cluster.cdc.uri:
logging.info('Sending request to add listener')
request_server(
service='cdc',
Expand Down
14 changes: 6 additions & 8 deletions superduper/components/vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
from superduper.ext.utils import str_shape
from superduper.jobs.job import FunctionJob
from superduper.misc.annotations import component
from superduper.misc.server import is_csn, request_server
from superduper.misc.server import request_server
from superduper.misc.special_dicts import MongoStyleDict
from superduper.vector_search.base import VectorIndexMeasureType
from superduper.vector_search.update_tasks import copy_vectors, delete_vectors

KeyType = t.Union[str, t.List, t.Dict]
if t.TYPE_CHECKING:
from superduper.jobs.job import Job


class VectorIndex(Component):
Expand Down Expand Up @@ -185,7 +183,7 @@ def post_create(self, db: "Datalayer") -> None:
:param db: Data layer instance.
"""
logging.info('Requesting vector index setup on CDC service')
if CFG.cluster.cdc.uri and not is_csn('cdc'):
if CFG.cluster.cdc.uri:
logging.info('Sending request to add vector index')
request_server(
service='cdc',
Expand Down Expand Up @@ -243,15 +241,15 @@ def trigger_ids(self, query: Query, primary_ids: t.Sequence):
return self._ready_ids(primary_ids)

def _ready_ids(self, ids: t.Sequence):
select = self.indexing_listener.outputs_select
data = self.db.execute(select.select_using_ids(ids))
outputs = self.db[self.indexing_listener.outputs]
data = self.db.execute(outputs.select_using_ids(ids))
key = self.indexing_listener.outputs_key

ready_ids = []
for d in data:
try:
d[key]
ready_ids.append(d[select.primary_id])
ready_ids.append(d[outputs.primary_id])
except KeyError:
continue
return ready_ids
Expand Down Expand Up @@ -299,7 +297,7 @@ def run_jobs(
def schedule_jobs(
self,
db: Datalayer,
dependencies: t.Sequence['Job'] = (),
dependencies: t.Sequence[str] = (),
) -> t.Sequence[t.Any]:
"""Schedule jobs for the vector index.

Expand Down
9 changes: 5 additions & 4 deletions superduper/misc/special_dicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,13 @@ def _diff_impl(r1, r2):


def _childrens(tree, object, nesting=1):
if not object.builds or not nesting:
if not object.get('_builds', False) or not nesting:
return
for name, child in object.builds.items():
child_text = f"{name}: {child.__class__}({child.identifier}): {str(child)[:50]}"
identifier = child.get('uuid', None)
child_text = f"{name}: {child['_path']}({identifier})"
subtree = tree.add(Text(child_text, style="yellow"))
for key, value in child.__dict__.items():
for key, value in child.items():
key_text = Text(f"{key}", style="magenta")
value_text = Text(f": {value}", style="blue")
subtree.add(Text.assemble(key_text, value_text))
Expand Down Expand Up @@ -453,7 +454,7 @@ def _component_info(obj):
Text(f'Component Map: {obj.identifier}', style="bold green"),
guide_style="bold green",
)
_childrens(tree, obj, nesting=verbosity - 1)
_childrens(tree, obj.encode(), nesting=verbosity - 1)

additional_info_panel = Panel(
base_component_metadata, title="Component Metadata", border_style="blue"
Expand Down
32 changes: 22 additions & 10 deletions superduper/vector_search/update_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing as t

from superduper import Document
from superduper import Document, logging
from superduper.backends.base.query import Query
from superduper.misc.special_dicts import MongoStyleDict
from superduper.vector_search.base import VectorItem
Expand Down Expand Up @@ -51,15 +51,27 @@ def copy_vectors(
key = vi.indexing_listener.key
if '_outputs__' in key:
key = key.split('.')[1]
vectors = [
{
'vector': MongoStyleDict(doc)[
f'_outputs__{vi.indexing_listener.predict_id}'
],
'id': str(doc['_source']),
}
for doc in docs
]

vectors = []
nokeys = 0
for doc in docs:
try:
vector = MongoStyleDict(doc)[f'_outputs__{vi.indexing_listener.predict_id}']
except KeyError:
nokeys += 1
continue
vectors.append(
{
'vector': vector,
'id': str(doc['_source']),
}
)
if nokeys:
logging.warn(
f'{nokeys} outputs were missing. \n'
'Note: This might happen in case of `VectorIndex` schedule jobs '
'trigged before model outputs are yet to be computed.'
)

for r in vectors:
if hasattr(r['vector'], 'numpy'):
Expand Down
Loading