Skip to content

Commit

Permalink
Added general components in the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed Jul 9, 2024
1 parent a8ec862 commit 4d2e11f
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 111 deletions.
8 changes: 6 additions & 2 deletions superduperdb/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ def get_local_client(self):
"""Returns a local version of self."""
pass

def broadcast(self, ids: t.List, to: tuple = ()):
pass
def broadcast(self, events: t.List, to: tuple = ()):
"""Broadcast events to the corresponding component.
:param events: List of events.
:param to: Destination component.
"""
pass

@abstractmethod
def submit(self, function: t.Callable, **kwargs) -> t.Any:
Expand Down
17 changes: 12 additions & 5 deletions superduperdb/backends/base/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,15 @@ def _set_db(r, db):
parts.append((part, part_args, part_kwargs))
self.parts = parts

def dependencies(self, ):
@property
def dependencies(
self,
):
"""List of dependencies."""
listeners = self.db.show('listener')
vector_indices = self.db.show('vector_index')
dependencies = []

def _check_query_match(listener, query):
if (
listener.select.table_or_collection.identifier
Expand All @@ -213,18 +218,20 @@ def _check_query_match(listener, query):
return True
return False


for listener in listeners:

listener = self.db.listeners[listener]
if _check_query_match(listener, self):
dependencies.append({'type_id': 'listener', 'identifier': listener.identifier})
dependencies.append(
{'type_id': 'listener', 'identifier': listener.identifier}
)

for vi in vector_indices:
vi = self.db.vector_indices[vi]
listener = vi.indexing_listener
if _check_query_match(listener, self):
dependencies.append({'type_id': 'vector_index', 'identifier': vi.identifier})
dependencies.append(
{'type_id': 'vector_index', 'identifier': vi.identifier}
)

return dependencies

Expand Down
15 changes: 11 additions & 4 deletions superduperdb/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def __init__(self, _uri: t.Optional[str] = None):
self.__outputs: t.Dict = {}
self.queue = LocalSequentialQueue()


@property
def remote(self) -> bool:
"""Return if remote compute engine."""
Expand All @@ -37,12 +36,20 @@ def component_hook(self, *args, **kwargs):
"""Hook for component."""
pass

def broadcast(self, ids: t.List, to: tuple = ()):
def broadcast(self, events: t.List, to: tuple = ()):
"""Broadcast events to the corresponding component.
:param events: List of events.
:param to: Destination component.
"""
jobs = []
if isinstance(to, (list, tuple)):
for dep in to:
self.queue.publish(ids, to=dep)
jobs.append(self.queue.publish(events, to=dep))
else:
self.queue.publish(ids, to=to)
job = self.queue.publish(events, to=to)
jobs.append(job)
return jobs

def submit(
self, function: t.Callable, *args, compute_kwargs: t.Dict = {}, **kwargs
Expand Down
35 changes: 17 additions & 18 deletions superduperdb/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from superduperdb.misc.annotations import deprecated
from superduperdb.misc.colors import Colors
from superduperdb.misc.data import ibatch
from superduperdb.misc.download import download_content, download_from_one
from superduperdb.misc.download import download_from_one
from superduperdb.misc.retry import db_retry
from superduperdb.misc.special_dicts import recursive_update
from superduperdb.vector_search.base import BaseVectorSearcher, VectorItem
Expand All @@ -46,23 +46,28 @@
ExecuteResult = t.Union[SelectResult, DeleteResult, UpdateResult, InsertResult]


@dc.dataclass
class Event:
insert= 'insert'
delete= 'delete'
update= 'update'
upsert= 'upsert'
"""Event to represent database events."""

insert = 'insert'
delete = 'delete'
update = 'update'
upsert = 'upsert'

@staticmethod
def chunk_by_event(lst):
"""Helper method to chunk events on type."""
chunks = {}
for item in lst:
item_type = item['type']

if item_type not in chunks:
chunks[item_type] = []
chunks[item_type].append(item)
return chunks


class Datalayer:
"""
Base database connector for SuperDuperDB.
Expand Down Expand Up @@ -409,21 +414,15 @@ def _select(self, select: Query, reference: bool = True) -> SelectResult:
"""
return select.do_execute(db=self)


def on_event(
self,
query: Query,
ids: t.Sequence[str],
event_type: str = 'insert'
):
def on_event(self, query: Query, ids: t.Sequence[str], event_type: str = 'insert'):
"""
Trigger computation jobs after data insertion.
:param query: The select or update query object that reduces
the scope of computations.
:param ids: IDs that further reduce the scope of computations.
"""
deps = query.dependencies()
deps = query.dependencies
events = [{'identifier': id, 'type': event_type} for id in ids]
return self.compute.broadcast(events, to=deps)

Expand Down Expand Up @@ -456,9 +455,7 @@ def _write(self, write: Query, refresh: bool = True) -> UpdateResult:
q = d['query']
ids = d['ids']
job_update = self.on_event(
query=q,
ids=ids,
event_type=Event.delete
query=q, ids=ids, event_type=Event.delete
)
jobs.append(job_update)

Expand Down Expand Up @@ -734,7 +731,9 @@ def _apply(
if parent is not None:
self.metadata.create_parent_child(parent, object.uuid)

dependencies = [*[j.job_id for j in jobs], *dependencies] # type: ignore[list-item]
for job in jobs:
if not isinstance(job, dict):
dependencies.append(job.job_id)

object.post_create(self)
self._add_component_to_cache(object)
Expand Down
1 change: 0 additions & 1 deletion superduperdb/base/leaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def metadata(self):
def __post_init__(self, db: t.Optional['Datalayer'] = None):
self.db = db


@property
def leaves(self):
"""Get all leaves in the object."""
Expand Down
21 changes: 17 additions & 4 deletions superduperdb/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ def init(self, db=None):
self.db = self.db or db
self.unpack(db=db)

def on_db_event(self, db, event):
pass

def unpack(self, db=None):
"""Method to unpack the component.
Expand Down Expand Up @@ -547,12 +544,28 @@ def create_validation_job(
},
)

def run_jobs(
self,
db: Datalayer,
dependencies: t.Sequence[Job] = (),
ids: t.Sequence = [],
event_type: str = 'insert',
) -> t.Sequence[t.Any]:
"""Run the job for this component.
:param db: The db to process.
:param dependencies: A sequence of dependencies.
:param ids: List of ids.
:param event_type: Type of event.
"""
return []

def schedule_jobs(
self,
db: Datalayer,
dependencies: t.Sequence[Job] = (),
) -> t.Sequence[t.Any]:
"""Run the job for this listener.
"""Schedule the job for this component.
:param db: The db to process.
:param dependencies: A sequence of dependencies.
Expand Down
19 changes: 6 additions & 13 deletions superduperdb/components/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ def post_create(self, db: "Datalayer") -> None:
:param db: Data layer instance.
"""

from superduperdb.base.datalayer import Event
self.create_output_dest(db, self.uuid, self.model)
if self.select is not None and self.active and not db.server_mode:
if CFG.cluster.cdc.uri:
Expand All @@ -100,15 +98,10 @@ def post_create(self, db: "Datalayer") -> None:
args={'name': self.identifier},
type='get',
)
else:
db.cdc.add(self)

db.compute.queue.declare_component(self)
db.compute.component_hook(self.identifier, type_id='listener')




@classmethod
def create_output_dest(cls, db: "Datalayer", uuid, model: Model):
"""
Expand Down Expand Up @@ -171,28 +164,28 @@ def schedule_jobs(
:param dependencies: A list of dependencies.
:param overwrite: Overwrite the existing data.
"""

from superduperdb.base.datalayer import Event

ids = db.execute(self.select.select_ids)
ids = [id[self.select.primary_id] for id in ids]
events = [{'identifier': id, 'type': Event.insert} for id in ids]
to = {'type_id': 'listener', 'identifier': self.identifier}
db.compute.broadcast(events, to=to)
return []

to = {'type_id': self.type_id, 'identifier': self.identifier}
return db.compute.broadcast(events, to=to)

def run_jobs(
self,
db: "Datalayer",
dependencies: t.Sequence[Job] = (),
overwrite: bool = False,
ids: t.Optional[t.List] = []
ids: t.Optional[t.List] = [],
event_type: str = 'insert',
) -> t.Sequence[t.Any]:
"""Schedule jobs for the listener.
:param db: Data layer instance to process.
:param dependencies: A list of dependencies.
:param overwrite: Overwrite the existing data.
:param event_type: Type of event.
"""
if not self.active:
return []
Expand Down
Loading

0 comments on commit 4d2e11f

Please sign in to comment.