Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix MultiWriterIdGenerator.current_position. #8257

Merged
merged 3 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8257.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix non-user visible bug in implementation of `MultiWriterIdGenerator.get_current_token_for_writer`.
43 changes: 37 additions & 6 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ def __init__(
# should be less than the minimum of this set (if not empty).
self._unfinished_ids = set() # type: Set[int]

# Set of local IDs that we've processed that are larger than the current
# position, due to there being smaller unpersisted IDs.
self._finished_ids = set() # type: Set[int]

# We track the max position where we know everything before has been
# persisted. This is done by a) looking at the min across all instances
# and b) noting that if we have seen a run of persisted positions
Expand Down Expand Up @@ -347,17 +351,44 @@ def get_next_txn(self, txn: LoggingTransaction):

def _mark_id_as_finished(self, next_id: int):
"""The ID has finished being processed so we should advance the
current poistion if possible.
current position if possible.
"""

with self._lock:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._unfinished_ids.discard(next_id)
self._finished_ids.add(next_id)

new_cur = None

if self._unfinished_ids:
# If there are unfinished IDs then the new position will be the
# largest finished ID less than the minimum unfinished ID.
Comment on lines +364 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if it is clearer, but I think something like:

new_cur = min_finished
for s in self._finished_ids:
    if s < min_funished:
        new_cur = max(new_cur, s)
    else:
        finished.add(s)

But that might make the handling later on harder.

Copy link
Member Author

@erikjohnston erikjohnston Sep 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, but then you iterate over the list twice (since you're doing a min(...))? You could write this as:

new_curr = min(self._finished_ids)
self._finished_ids = set(i for i in self._finished_ids if i > new_curr)

or something?


finished = set()

min_unfinshed = min(self._unfinished_ids)
for s in self._finished_ids:
if s < min_unfinshed:
if new_cur is None or new_cur < s:
new_cur = s
else:
finished.add(s)

# We clear these out since they're now all less than the new
# position.
self._finished_ids = finished
else:
# There are no unfinished IDs so the new position is simply the
# largest finished one.
new_cur = max(self._finished_ids)

# We clear these out since they're now all less than the new
# position.
self._finished_ids.clear()

# Figure out if its safe to advance the position by checking there
# aren't any lower allocated IDs that are yet to finish.
if all(c > next_id for c in self._unfinished_ids):
if new_cur:
curr = self._current_positions.get(self._instance_name, 0)
self._current_positions[self._instance_name] = max(curr, next_id)
self._current_positions[self._instance_name] = max(curr, new_cur)

self._add_persisted_position(next_id)

Expand Down Expand Up @@ -425,7 +456,7 @@ def _add_persisted_position(self, new_id: int):
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
min_curr = min(self._current_positions.values())
min_curr = min(self._current_positions.values(), default=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to see how this is directly related to this PR? The change looks fine though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh, sorry, that is a somewhat unrelated fix that broke that particular sytest. Basically, if the first time a worker see's an ID go past it is out of order, you can get here without anything having been added to current positions, causing this to explode.

self._persisted_upto_position = max(min_curr, self._persisted_upto_position)

# We now iterate through the seen positions, discarding those that are
Expand Down
50 changes: 50 additions & 0 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,56 @@ async def _get_next_async():
self.assertEqual(id_gen.get_positions(), {"master": 8})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 8)

def test_out_of_order_finish(self):
"""Test that IDs persisted out of order are correctly handled
"""

# Prefill table with 7 rows written by 'master'
self._insert_rows("master", 7)

id_gen = self._create_id_generator()

self.assertEqual(id_gen.get_positions(), {"master": 7})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 7)

ctx1 = self.get_success(id_gen.get_next())
ctx2 = self.get_success(id_gen.get_next())
ctx3 = self.get_success(id_gen.get_next())
ctx4 = self.get_success(id_gen.get_next())

s1 = ctx1.__enter__()
s2 = ctx2.__enter__()
s3 = ctx3.__enter__()
s4 = ctx4.__enter__()

self.assertEqual(s1, 8)
self.assertEqual(s2, 9)
self.assertEqual(s3, 10)
self.assertEqual(s4, 11)

self.assertEqual(id_gen.get_positions(), {"master": 7})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 7)

ctx2.__exit__(None, None, None)

self.assertEqual(id_gen.get_positions(), {"master": 7})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 7)

ctx1.__exit__(None, None, None)

self.assertEqual(id_gen.get_positions(), {"master": 9})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 9)

ctx4.__exit__(None, None, None)

self.assertEqual(id_gen.get_positions(), {"master": 9})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 9)

ctx3.__exit__(None, None, None)

self.assertEqual(id_gen.get_positions(), {"master": 11})
self.assertEqual(id_gen.get_current_token_for_writer("master"), 11)

def test_multi_instance(self):
"""Test that reads and writes from multiple processes are handled
correctly.
Expand Down