Skip to content

Commit

Permalink
Remove cdc dependencies from datalayer
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Jul 9, 2024
1 parent b99856b commit e745e9b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 25 deletions.
1 change: 0 additions & 1 deletion superduperdb/backends/mongodb/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(
def _setup(self):
self.db = self.conn[self.name]
self.meta_collection = self.db['_meta']
self.cdc_collection = self.db['_cdc_tables']
self.component_collection = self.db['_objects']
self.job_collection = self.db['_jobs']
self.parent_child_mappings = self.db['_parent_child_mappings']
Expand Down
2 changes: 0 additions & 2 deletions superduperdb/base/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ def show_configuration(cfg):
('Metadata Store', anonymize_url(cfg.metadata_store)),
('Artifact Store', anonymize_url(cfg.artifact_store)),
('Compute', cfg.cluster.compute.uri),
('CDC', cfg.cluster.cdc.uri),
('Vector Search', cfg.cluster.vector_search.uri),
]
for key, value in key_values:
if value:
Expand Down
15 changes: 8 additions & 7 deletions superduperdb/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from superduperdb.base.constant import KEY_BUILDS
from superduperdb.base.cursor import SuperDuperCursor
from superduperdb.base.document import Document
from superduperdb.cdc.cdc import DatabaseChangeDataCapture
from superduperdb.components.component import Component
from superduperdb.components.datatype import DataType, _BaseEncodable
from superduperdb.components.schema import Schema
Expand Down Expand Up @@ -104,7 +103,6 @@ def __init__(
self.databackend = databackend
self.databackend.datalayer = self

self.cdc = DatabaseChangeDataCapture(self)

self.compute = compute
self._server_mode = False
Expand Down Expand Up @@ -340,7 +338,8 @@ def _delete(self, delete: Query, refresh: bool = True) -> DeleteResult:
:param delete: The delete query object specifying the data to be deleted.
"""
result = delete.do_execute(self)
if refresh and not self.cdc.running:
cdc_status = s.CFG.cluster.cdc.uri is not None
if refresh and not cdc_status:
return result, self.refresh_after_delete(delete, ids=result)
return result, None

Expand Down Expand Up @@ -374,7 +373,7 @@ def _insert(

inserted_ids = insert.do_execute(self)

cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None
cdc_status = s.CFG.cluster.cdc.uri is not None

if refresh:
if cdc_status:
Expand Down Expand Up @@ -445,7 +444,7 @@ def _write(self, write: Query, refresh: bool = True) -> UpdateResult:
"""
write_result, updated_ids, deleted_ids = write.do_execute(self)

cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None
cdc_status = s.CFG.cluster.cdc.uri is not None
if refresh:
if cdc_status:
logging.warn('CDC service is active, skipping model/listener refresh')
Expand Down Expand Up @@ -485,7 +484,7 @@ def _update(self, update: Query, refresh: bool = True) -> UpdateResult:
"""
updated_ids = update.do_execute(self)

cdc_status = self.cdc.running or s.CFG.cluster.cdc.uri is not None
cdc_status = s.CFG.cluster.cdc.uri is not None
if refresh and updated_ids:
if cdc_status:
logging.warn('CDC service is active, skipping model/listener refresh')
Expand Down Expand Up @@ -760,12 +759,14 @@ def _build_task_workflow(
),
)

cdc_status = s.CFG.cluster.cdc.uri is not None
for listener in listeners:
G.add_edge(
f'{download_content.__name__}()',
f'{listener.model.identifier}.predict_in_db({listener.uuid})',
)
if not self.cdc.running:

if not cdc_status:
deps = listener.dependencies
for dep in deps:
upstream = self.load(uuid=dep)
Expand Down
2 changes: 0 additions & 2 deletions superduperdb/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ def post_create(self, db: "Datalayer") -> None:
args={'name': self.identifier},
type='get',
)
else:
db.cdc.add(self)

@classmethod
def create_output_dest(cls, db: "Datalayer", uuid, model: Model):
Expand Down
24 changes: 11 additions & 13 deletions superduperdb/components/vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,17 @@ def schedule_jobs(
:param db: The DB instance to process
:param dependencies: A list of dependencies
"""
if not db.cdc.running:
job = FunctionJob(
callable=copy_vectors,
args=[],
kwargs={
'vector_index': self.identifier,
'ids': [],
'query': self.indexing_listener.select.dict().encode(),
},
)
job(db, dependencies=dependencies)
return [job]
return []
job = FunctionJob(
callable=copy_vectors,
args=[],
kwargs={
'vector_index': self.identifier,
'ids': [],
'query': self.indexing_listener.select.dict().encode(),
},
)
job(db, dependencies=dependencies)
return [job]


class EncodeArray:
Expand Down

0 comments on commit e745e9b

Please sign in to comment.