From e745e9b2f6c5250108c67f597ad2b17795bb71f5 Mon Sep 17 00:00:00 2001 From: TheDude Date: Tue, 9 Jul 2024 14:19:09 +0530 Subject: [PATCH] Remove cdc dependencies from datalayer --- superduperdb/backends/mongodb/metadata.py | 1 - superduperdb/base/build.py | 2 -- superduperdb/base/datalayer.py | 15 +++++++------- superduperdb/components/listener.py | 2 -- superduperdb/components/vector_index.py | 24 +++++++++++------------ 5 files changed, 19 insertions(+), 25 deletions(-) diff --git a/superduperdb/backends/mongodb/metadata.py b/superduperdb/backends/mongodb/metadata.py index 7204d2c30..59532cbd4 100644 --- a/superduperdb/backends/mongodb/metadata.py +++ b/superduperdb/backends/mongodb/metadata.py @@ -39,7 +39,6 @@ def __init__( def _setup(self): self.db = self.conn[self.name] self.meta_collection = self.db['_meta'] - self.cdc_collection = self.db['_cdc_tables'] self.component_collection = self.db['_objects'] self.job_collection = self.db['_jobs'] self.parent_child_mappings = self.db['_parent_child_mappings'] diff --git a/superduperdb/base/build.py b/superduperdb/base/build.py index b825948b1..44409f021 100644 --- a/superduperdb/base/build.py +++ b/superduperdb/base/build.py @@ -199,8 +199,6 @@ def show_configuration(cfg): ('Metadata Store', anonymize_url(cfg.metadata_store)), ('Artifact Store', anonymize_url(cfg.artifact_store)), ('Compute', cfg.cluster.compute.uri), - ('CDC', cfg.cluster.cdc.uri), - ('Vector Search', cfg.cluster.vector_search.uri), ] for key, value in key_values: if value: diff --git a/superduperdb/base/datalayer.py b/superduperdb/base/datalayer.py index f444cf427..c9545f58b 100644 --- a/superduperdb/base/datalayer.py +++ b/superduperdb/base/datalayer.py @@ -22,7 +22,6 @@ from superduperdb.base.constant import KEY_BUILDS from superduperdb.base.cursor import SuperDuperCursor from superduperdb.base.document import Document -from superduperdb.cdc.cdc import DatabaseChangeDataCapture from superduperdb.components.component import Component from superduperdb.components.datatype import DataType, _BaseEncodable from superduperdb.components.schema import Schema @@ -104,7 +103,6 @@ def __init__( self.databackend = databackend self.databackend.datalayer = self - self.cdc = DatabaseChangeDataCapture(self) self.compute = compute self._server_mode = False @@ -340,7 +338,8 @@ def _delete(self, delete: Query, refresh: bool = True) -> DeleteResult: :param delete: The delete query object specifying the data to be deleted. """ result = delete.do_execute(self) - if refresh and not self.cdc.running: + cdc_status = s.CFG.cluster.cdc.uri is not None + if refresh and not cdc_status: return result, self.refresh_after_delete(delete, ids=result) return result, None @@ -374,7 +373,7 @@ def _insert( inserted_ids = insert.do_execute(self) - cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None + cdc_status = s.CFG.cluster.cdc.uri is not None if refresh: if cdc_status: @@ -445,7 +444,7 @@ def _write(self, write: Query, refresh: bool = True) -> UpdateResult: """ write_result, updated_ids, deleted_ids = write.do_execute(self) - cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None + cdc_status = s.CFG.cluster.cdc.uri is not None if refresh: if cdc_status: logging.warn('CDC service is active, skipping model/listener refresh') @@ -485,7 +484,7 @@ def _update(self, update: Query, refresh: bool = True) -> UpdateResult: """ updated_ids = update.do_execute(self) - cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None + cdc_status = s.CFG.cluster.cdc.uri is not None if refresh and updated_ids: if cdc_status: logging.warn('CDC service is active, skipping model/listener refresh') @@ -760,12 +759,14 @@ def _build_task_workflow( ), ) + cdc_status = s.CFG.cluster.cdc.uri is not None for listener in listeners: G.add_edge( f'{download_content.__name__}()', f'{listener.model.identifier}.predict_in_db({listener.uuid})', ) - if not self.cdc.running: + + if not cdc_status: deps = listener.dependencies for dep in deps: upstream = self.load(uuid=dep) diff --git a/superduperdb/components/listener.py b/superduperdb/components/listener.py index c5801a8de..3540b6119 100644 --- a/superduperdb/components/listener.py +++ b/superduperdb/components/listener.py @@ -98,8 +98,6 @@ def post_create(self, db: "Datalayer") -> None: args={'name': self.identifier}, type='get', ) - else: - db.cdc.add(self) @classmethod def create_output_dest(cls, db: "Datalayer", uuid, model: Model): diff --git a/superduperdb/components/vector_index.py b/superduperdb/components/vector_index.py index 1e221cd95..8a54e152e 100644 --- a/superduperdb/components/vector_index.py +++ b/superduperdb/components/vector_index.py @@ -208,19 +208,17 @@ def schedule_jobs( :param db: The DB instance to process :param dependencies: A list of dependencies """ - if not db.cdc.running: - job = FunctionJob( - callable=copy_vectors, - args=[], - kwargs={ - 'vector_index': self.identifier, - 'ids': [], - 'query': self.indexing_listener.select.dict().encode(), - }, - ) - job(db, dependencies=dependencies) - return [job] - return [] + job = FunctionJob( + callable=copy_vectors, + args=[], + kwargs={ + 'vector_index': self.identifier, + 'ids': [], + 'query': self.indexing_listener.select.dict().encode(), + }, + ) + job(db, dependencies=dependencies) + return [job] class EncodeArray: