-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Move event stream handling out of slave store. #7491
Conversation
2408b58
to
0054b10
Compare
0054b10
to
e7f5ac4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems generally plausible, but unfortunately I appear to have Opinions.
if hs.config.worker.worker_app is None: | ||
self._push_rules_stream_id_gen = ChainedIdGenerator( | ||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" | ||
) # type: Union[ChainedIdGenerator, SlavedIdTracker] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like if this is a thing we need, we should consider declaring a protocol or interface type for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I briefly thought about it then screamed a bit. We could make a protocol that encompasses both but then I'm not sure that is that helpful if half the functions are stubbed out in both. In general the typing here is a bit of a mess and I think we should do something properly about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'm happy to add a protocol here, just not sure its usefulness in practice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough. I do think there needs to be some sort of class here, if only because we'll need it to get mypy working properly. However, I'm happy for it to be punted for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, though I think we'll need to do some refactoring of some form to make the types make sense, possibly.
def advance(self, token: int): | ||
"""Stub implementation for advancing the token when receiving updates | ||
over replication; raises an exception as this instance should be the | ||
only source of updates. | ||
""" | ||
|
||
raise Exception( | ||
"Attempted to advance token on source for table %r", self._table | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again: it feels like there should be a Protocol or abstract base class that we're implementing here.
if hs.config.worker.worker_app is None: | ||
self._push_rules_stream_id_gen = ChainedIdGenerator( | ||
self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" | ||
) # type: Union[ChainedIdGenerator, SlavedIdTracker] | ||
else: | ||
self._push_rules_stream_id_gen = SlavedIdTracker( | ||
db_conn, "push_rules_stream", "stream_id" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not obvious to me why this code needs to move (at all, but particularly in the same PR as moving the events stream)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to move either way (as we hadn't called super().__init__
previously), and broadly it felt easier/better to move it here than to below the super().__init__
call.
@@ -74,6 +76,26 @@ class EventsWorkerStore(SQLBaseStore): | |||
def __init__(self, database: Database, db_conn, hs): | |||
super(EventsWorkerStore, self).__init__(database, db_conn, hs) | |||
|
|||
if hs.config.worker_app is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kinda hate this pattern; it's everywhere and it feels magical. It feels like there should be an hs.am_I_the_source_for_the_events_stream()
method or something. maybe that's one to punt to a different PR where we can try to kill off other instances of the same thing though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or maybe you have a better plan for this anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how we currently do it, but yes configuration for this sort of thing is incoming :)
@@ -66,7 +71,24 @@ def get_all_updated_caches_txn(txn): | |||
) | |||
|
|||
def process_replication_rows(self, stream_name, instance_name, token, rows): | |||
if stream_name == "caches": | |||
if stream_name == "events": | |||
self._stream_id_gen.advance(token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CacheInvalidationWorkerStore
seems like a funny place for this. Shouldn't it be in EventsWorkerStore
, if that's where _stream_id_gen
lives?
(similarly for _backfill_id_gen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, yes. Have moved those. I've just moved the advancing of the tokens, but not totally sure where the cache invalidation should live.
if stream_name == "events": | ||
self._stream_id_gen.advance(token) | ||
for row in rows: | ||
self._process_event_stream_row(token, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moving this in here runs the risk of changing the order in which we carry out the operations we do when processing a replication row (since it basically relies on the MRO of process_replication_rows
. Have you had a think about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the store stuff should just be invalidating caches or advancing tokens so the order doesn't matter (certainly I don't think we've thought about MRO before). The store process_replication_rows
gets called before we otherwise handle rows too, so I think that is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm then
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master. We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
This allows us to have the logic on both master and workers, which is necessary to move event persistence off master.
We also combine the instantiation of ID generators from DataStore and slave stores to the base worker stores. This allows us to select which process writes events independently of the master/worker splits.
Based on #7490