Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't table scan events on worker startup #8419

Merged
merged 4 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8419.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sharding event persister.
26 changes: 25 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,19 @@ def _load_current_ids(

# Load the current positions of all writers for the stream.
if self._writers:
# We delete any stale entries in the positions table. This is
# important if we add back a writer after a long time; we want to
# consider that a "new" writer, rather than using the old stale
# entry here.
Comment on lines +276 to +279
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to the check on line 300 if instance in self._writers? 😄

I don't think it can hurt, especially since the DELETE and SELECT queries are in different transactions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we don't, no. The SELECT and DELETE are in the same transaction.

Not sure if its clearer to leave it or remove the check tbh.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 The logic becomes simplified a bit, but you then need to combine the two SQL statements to understand exactly what that dictionary is made up of.

sql = """
DELETE FROM stream_positions
WHERE
stream_name = ?
AND instance_name != ALL(?)
Comment on lines +281 to +284
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing, instance_name != ALL(?) follows this logic:

column_name != ALL (subquery) the expression evaluates to true if a value is not equal to any value returned by the subquery.

So this is saying to delete any stream position where the stream name matches our current stream and the instance name is not any of the current writers.

"""
sql = self._db.engine.convert_param_style(sql)
cur.execute(sql, (self._stream_name, self._writers))

sql = """
SELECT instance_name, stream_id FROM stream_positions
WHERE stream_name = ?
Expand Down Expand Up @@ -453,11 +466,22 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer.
"""

# If we don't have an entry for the given instance name, we assume it's a
# new writer.
#
# For new writers we assume their initial position to be the current
# persisted up to position. This stops Synapse from doing a full table
# scan when a new writer announces itself over replication.
with self._lock:
return self._return_factor * self._current_positions.get(instance_name, 0)
return self._return_factor * self._current_positions.get(
instance_name, self._persisted_upto_position
)

def get_positions(self) -> Dict[str, int]:
"""Get a copy of the current positon map.

Note that this won't necessarily include all configured writers if some
writers haven't written anything yet.
"""

with self._lock:
Expand Down
18 changes: 18 additions & 0 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,28 @@ def test_writer_config_change(self):
# Initial config has two writers
id_gen = self._create_id_generator("first", writers=["first", "second"])
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
self.assertEqual(id_gen.get_current_token_for_writer("first"), 3)
self.assertEqual(id_gen.get_current_token_for_writer("second"), 5)

# New config removes one of the configs. Note that if the writer is
# removed from config we assume that it has been shut down and has
# finished persisting, hence why the persisted upto position is 5.
id_gen_2 = self._create_id_generator("second", writers=["second"])
self.assertEqual(id_gen_2.get_persisted_upto_position(), 5)
self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5)

# This config points to a single, previously unused writer.
id_gen_3 = self._create_id_generator("third", writers=["third"])
self.assertEqual(id_gen_3.get_persisted_upto_position(), 5)

# For new writers we assume their initial position to be the current
# persisted up to position. This stops Synapse from doing a full table
# scan when a new writer comes along.
self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5)

id_gen_4 = self._create_id_generator("fourth", writers=["third"])
self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5)

# Check that we get a sane next stream ID with this new config.

async def _get_next_async():
Expand All @@ -410,6 +421,13 @@ async def _get_next_async():
self.get_success(_get_next_async())
self.assertEqual(id_gen_3.get_persisted_upto_position(), 6)

# If we add back the old "first" then we shouldn't see the persisted up
# to position revert back to 3.
id_gen_5 = self._create_id_generator("five", writers=["first", "third"])
self.assertEqual(id_gen_5.get_persisted_upto_position(), 6)
self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6)
self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6)

def test_sequence_consistency(self):
"""Test that we error out if the table and sequence diverges.
"""
Expand Down