From 290094f5c282a10b3901372994d93fd7f65cd376 Mon Sep 17 00:00:00 2001 From: TheDude Date: Mon, 29 Jul 2024 20:31:46 +0530 Subject: [PATCH 1/3] Fix verbosity in component info --- superduper/misc/special_dicts.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/superduper/misc/special_dicts.py b/superduper/misc/special_dicts.py index 3508a3f36..cb29f4eb1 100644 --- a/superduper/misc/special_dicts.py +++ b/superduper/misc/special_dicts.py @@ -359,12 +359,13 @@ def _diff_impl(r1, r2): def _childrens(tree, object, nesting=1): - if not object.builds or not nesting: + if not object.get('_builds', False) or not nesting: return for name, child in object.builds.items(): - child_text = f"{name}: {child.__class__}({child.identifier}): {str(child)[:50]}" + identifier = child.get('uuid', None) + child_text = f"{name}: {child['_path']}({identifier})" subtree = tree.add(Text(child_text, style="yellow")) - for key, value in child.__dict__.items(): + for key, value in child.items(): key_text = Text(f"{key}", style="magenta") value_text = Text(f": {value}", style="blue") subtree.add(Text.assemble(key_text, value_text)) @@ -453,7 +454,7 @@ def _component_info(obj): Text(f'Component Map: {obj.identifier}', style="bold green"), guide_style="bold green", ) - _childrens(tree, obj, nesting=verbosity - 1) + _childrens(tree, obj.encode(), nesting=verbosity - 1) additional_info_panel = Panel( base_component_metadata, title="Component Metadata", border_style="blue" From 7ed96310b36a58be4b9c8b6c2ca8473a3eafd721 Mon Sep 17 00:00:00 2001 From: TheDude Date: Mon, 29 Jul 2024 20:32:13 +0530 Subject: [PATCH 2/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 713dbd330..7bcf63378 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - component info support list - Trigger downstream vector indices. - Fix vector_index function job. +- Fix verbosity in component info ## [0.3.0](https://github.com/superduper-io/superduper/compare/0.3.0...0.2.0]) (2024-Jun-21) From dd5d9ae24ccd5adec4d25846a3b1cc5ca1874e77 Mon Sep 17 00:00:00 2001 From: TheDude Date: Mon, 29 Jul 2024 22:34:52 +0530 Subject: [PATCH 3/3] Remove CSN check in post-create --- superduper/backends/base/compute.py | 6 +++++ superduper/components/listener.py | 4 +-- superduper/components/vector_index.py | 14 +++++------ superduper/vector_search/update_tasks.py | 32 ++++++++++++++++-------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/superduper/backends/base/compute.py b/superduper/backends/base/compute.py index 533286485..628cff6eb 100644 --- a/superduper/backends/base/compute.py +++ b/superduper/backends/base/compute.py @@ -85,3 +85,9 @@ def shutdown(self) -> None: def execute_task(self, job_id, dependencies, compute_kwargs={}): """Execute task function for distributed backends.""" + + def connect(self): + """Connect to address.""" + + def create_handler(self, *args, **kwargs): + """Create handler on component declare.""" diff --git a/superduper/components/listener.py b/superduper/components/listener.py index 27b576fa3..6dd0d5164 100644 --- a/superduper/components/listener.py +++ b/superduper/components/listener.py @@ -7,7 +7,7 @@ from superduper.backends.base.query import Query from superduper.base.document import _OUTPUTS_KEY from superduper.components.model import Mapping -from superduper.misc.server import is_csn, request_server +from superduper.misc.server import request_server from ..jobs.job import Job from .component import Component @@ -86,7 +86,7 @@ def post_create(self, db: "Datalayer") -> None: self.create_output_dest(db, self.uuid, self.model) if self.select is not None: logging.info('Requesting listener setup on CDC service') - if CFG.cluster.cdc.uri and not is_csn('cdc'): + if CFG.cluster.cdc.uri: logging.info('Sending request to add listener') request_server( service='cdc', diff --git a/superduper/components/vector_index.py b/superduper/components/vector_index.py index 4c6167795..f388d1b8a 100644 --- a/superduper/components/vector_index.py +++ b/superduper/components/vector_index.py @@ -15,14 +15,12 @@ from superduper.ext.utils import str_shape from superduper.jobs.job import FunctionJob from superduper.misc.annotations import component -from superduper.misc.server import is_csn, request_server +from superduper.misc.server import request_server from superduper.misc.special_dicts import MongoStyleDict from superduper.vector_search.base import VectorIndexMeasureType from superduper.vector_search.update_tasks import copy_vectors, delete_vectors KeyType = t.Union[str, t.List, t.Dict] -if t.TYPE_CHECKING: - from superduper.jobs.job import Job class VectorIndex(Component): @@ -185,7 +183,7 @@ def post_create(self, db: "Datalayer") -> None: :param db: Data layer instance. """ logging.info('Requesting vector index setup on CDC service') - if CFG.cluster.cdc.uri and not is_csn('cdc'): + if CFG.cluster.cdc.uri: logging.info('Sending request to add vector index') request_server( service='cdc', @@ -243,15 +241,15 @@ def trigger_ids(self, query: Query, primary_ids: t.Sequence): return self._ready_ids(primary_ids) def _ready_ids(self, ids: t.Sequence): - select = self.indexing_listener.outputs_select - data = self.db.execute(select.select_using_ids(ids)) + outputs = self.db[self.indexing_listener.outputs] + data = self.db.execute(outputs.select_using_ids(ids)) key = self.indexing_listener.outputs_key ready_ids = [] for d in data: try: d[key] - ready_ids.append(d[select.primary_id]) + ready_ids.append(d[outputs.primary_id]) except KeyError: continue return ready_ids @@ -299,7 +297,7 @@ def run_jobs( def schedule_jobs( self, db: Datalayer, - dependencies: t.Sequence['Job'] = (), + dependencies: t.Sequence[str] = (), ) -> t.Sequence[t.Any]: """Schedule jobs for the vector index. diff --git a/superduper/vector_search/update_tasks.py b/superduper/vector_search/update_tasks.py index 3e5cbe6af..94dbc0588 100644 --- a/superduper/vector_search/update_tasks.py +++ b/superduper/vector_search/update_tasks.py @@ -1,6 +1,6 @@ import typing as t -from superduper import Document +from superduper import Document, logging from superduper.backends.base.query import Query from superduper.misc.special_dicts import MongoStyleDict from superduper.vector_search.base import VectorItem @@ -51,15 +51,27 @@ def copy_vectors( key = vi.indexing_listener.key if '_outputs__' in key: key = key.split('.')[1] - vectors = [ - { - 'vector': MongoStyleDict(doc)[ - f'_outputs__{vi.indexing_listener.predict_id}' - ], - 'id': str(doc['_source']), - } - for doc in docs - ] + + vectors = [] + nokeys = 0 + for doc in docs: + try: + vector = MongoStyleDict(doc)[f'_outputs__{vi.indexing_listener.predict_id}'] + except KeyError: + nokeys += 1 + continue + vectors.append( + { + 'vector': vector, + 'id': str(doc['_source']), + } + ) + if nokeys: + logging.warn( + f'{nokeys} outputs were missing. \n' + 'Note: This might happen in case of `VectorIndex` schedule jobs ' + 'trigged before model outputs are yet to be computed.' + ) for r in vectors: if hasattr(r['vector'], 'numpy'):