Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce requeue flag #286

Merged
merged 6 commits into from
Mar 14, 2023
Merged

Introduce requeue flag #286

merged 6 commits into from
Mar 14, 2023

Conversation

adzialocha
Copy link
Member

@adzialocha adzialocha commented Mar 6, 2023

Introduces a requeue boolean-flag in the input index of our Factory implementation (task worker queue) which allows tasks to get re-scheduled after completion.

This fixes a nasty bug which can be only reproduced on very fast machines: #281

This branches off Sam's work on #285 which surely made the bug easier to track down.

Problem

Let's say we have operations arriving in the following order at the node: C1 [D1], U2 [D1], C1 [D2], U3 [D1] and U2 [D2]. That's an CREATE, UPDATE and another UPDATE operation for a "Document 1" and another CREATE and UPDATE operation for a "Document 2", they arrive at the same time at the GraphQL API and get put in this order on the service bus where they get broadcasted.

The current factory implementation would now look at each of these operations arriving through the service bus and queue them up IF we don't handle that document yet. In the "dispatcher" this would look like that:

  1. C1 [D1] arrives. Do we already work on D1? No, let's queue it!
  2. U2 [D1] arrives. Do we already work on D1? Yes. Ignore.
  3. C1 [D2] arrives. Do we already work on D2? No, let's queue it!
  4. U3 [D1] arrives. Do we already work on D1? Yes. Ignore.
  5. U2 [D2] arrives. Do we already work on D2? Yes. Ignore.

.. concurrently we're already running a whole worker pool eagerly waiting to look at the queue, taking the tasks off it. So depending on how the "dispatcher" races with the "workers" the outcome might always look a little bit different. For example, if we finished working on C1 [D1] before the dispatcher looks at U3 [D1], it might actually get queued as well.

That design of "looking at duplicates" had one purpose: Avoid working on the same document twice, aka "we're already working on it, why do it a second time". This is a nice optimization, but it also assumes that we have all possible operations in the database already in the moment before C1 [D1] or a later D1-related task kicks in.

Funnily, this worked quite well so far, we never noticed a problem. Most of the time the requests were slow enough for the factory to queue every operation and ignore nothing:

  1. C1 [D1] arrives. Do we already work on D1? No, let's queue it!
    ... (the worker happily crunches that operation in the background, seeing C1 in the database)
    ... some short fraction of time later ..
  2. U2 [D1] arrives. Do we already work on D1? No, let's queue it!
    ... (the worker happily crunches that operation in the background, seeing U2 in the database)
    ... and so on ..

Since our requests get faster now (due to client-side caching in shirokuma) we observed the following problem:

  1. C1 [D1] arrives. Do we already work on D1? No, let's queue it!
    ... (the worker happily crunches that operation in the background, seeing C1 in the database)
    ... some short fraction of time later ..
  2. U2 [D1] arrives! Woah. FAST! Surprise!! Do we already work on D1? Yes. Ignore.
    ... and so on ..

The new operations U2 etc. arrived in the database after the worker looked at them but before it finished working on C1! Thus, we lost U2 ..

Solution

After some analysis I realized that the system makes still sense in its basic assumption: We don't need to work on documents more than we have to! aquadoggo is an event-sourcing system where "events" kick-in work, independent of the amount of data arriving. Here is an example:

D: [U U U] [U U U U U U U] U U U U U U U U ...
          Work!           Work! 

Note how the brackets [...] "group" the operations in work units. They simply get loaded from the database as they already were there before the work started.

There is an ongoing stream of incoming UPDATE operations and every time we kick in the worker for that document it takes the "fresh" ones and materializes them. We don't need to have a worker for each operation, we just have to make sure that we consider them all exactly once - and this is where the previous system failed, as operations just got lost ..

In this PR a "re-queue" flag got introduced which restarts a task for a document D when operations have been observed which came in while the worker was already running on D. It works like this:

  1. C1 [D1] arrives. Do we already work on D1? No, let's queue it!
  2. U2 [D1] arrives. Do we already work on D1? Yes. Set requeue flag for D1 to true
  3. C1 [D2] arrives. Do we already work on D2? No, let's queue it!
  4. U3 [D1] arrives. Do we already work on D1? Yes, and the requeue flag is set. Ignore.
  5. U2 [D2] arrives. Do we already work on D2? Yes. Set requeue flag for D2 to true

This scenario assumes that the worker for D1 and the other worker for D2 are still running while all other operations get dispatched, that's the "worst-case scenario" in terms of maximum pressure on the dispatcher which made the previous implementation fail.

What this does now is quite nice: As soon as the work on C1 finished and the requeue flag was detected, it will just add a new task to the queue to continue working on D1, making sure to also account for all these operations which arrived a little bit later (U2 and U3).

What we get from this is:

  1. Some sort of "batching" of operations for the same document when they arrive very fast
  2. The queue does not get "blocked" by waiting operations, we can still continue looking at other operations for other documents and assigning them to different workers
  3. Accounting for moments of very high pressure on the dispatcher without losing any information
  4. Still making sure that we only working on a document once at a time, to avoid concurrent work overwriting each other

📋 Checklist

  • Add tests that cover your changes
  • Add this PR to the Unreleased section in CHANGELOG.md
  • Link this PR to any issues it closes
  • New files contain a SPDX license header

@adzialocha adzialocha changed the base branch from main to do-transactions-differently March 6, 2023 17:44
@codecov
Copy link

codecov bot commented Mar 6, 2023

Codecov Report

Patch coverage: 36.36% and project coverage change: -0.32 ⚠️

Comparison is base (335be5a) 91.74% compared to head (9d98a7e) 91.42%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #286      +/-   ##
==========================================
- Coverage   91.74%   91.42%   -0.32%     
==========================================
  Files          64       64              
  Lines        4626     4644      +18     
==========================================
+ Hits         4244     4246       +2     
- Misses        382      398      +16     
Impacted Files Coverage Δ
aquadoggo/src/materializer/worker.rs 81.52% <36.36%> (-4.92%) ⬇️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@adzialocha adzialocha linked an issue Mar 6, 2023 that may be closed by this pull request
@adzialocha
Copy link
Member Author

materialize_document_from_bus seems to still be failing following the fixes @sandreae, I wonder whats hidden there 😆

@sandreae
Copy link
Member

sandreae commented Mar 6, 2023

This is amazing work. Sorry I only have the ability to respond in emoji reactions right now.

@adzialocha adzialocha marked this pull request as ready for review March 14, 2023 11:53
@adzialocha
Copy link
Member Author

adzialocha commented Mar 14, 2023

I think this is ready for review, we should just merge the other branch before and then rebase 👍🏻

@sandreae
Copy link
Member

I just read through your great description of the new logic, it all makes sense , very elegant. Just to check I get it: the requeue flag is flipped to true if an operation arrives for a document D1 while a task processing D1 is in progress. This means a task for D1 will be queued up again once the current one finishes. This means all operations will be accounted for, now concurrent work on the same document will occur, and we don't inefficiently queue up a new task for every operation.

@sandreae
Copy link
Member

Gunna review now 👍

Copy link
Member

@sandreae sandreae left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks all good to me, thanks for tracking down this tricky bug.

I never managed to reproduce this particular race condition on my (slow??) laptop, I'm assuming it fixed what you did manage to observe.

Regarding the still failing test, I'd say we merge and look at that in a new PR.

@sandreae
Copy link
Member

I'll go look at the other branch now and get that merged first.

Base automatically changed from do-transactions-differently to main March 14, 2023 14:23
@sandreae sandreae self-requested a review March 14, 2023 14:27
@sandreae
Copy link
Member

Ah, I actually had a change request. Think it would be good to add debug logging for when an operation is requeued. I can add this, let me know if you have any logging related thoughts @adzialocha.

@adzialocha
Copy link
Member Author

Just to check I get it: the requeue flag is flipped to true if an operation arrives for a document D1 while a task processing D1 is in progress. This means a task for D1 will be queued up again once the current one finishes. This means all operations will be accounted for, now concurrent work on the same document will occur, and we don't inefficiently queue up a new task for every operation.

I think you got the it, just maybe clarifying that "concurrent work on the same document will occur" is not true. There is no concurrent work on any same document allowed, otherwise we tap into undefined behaviour because of the database reads and writes on that document.

We also never queued up a new task for every operation. We queue up a task for every document one could say, the operation is the trigger for that. If a second operation for the same document comes in while we're already having a task, a "requeue" is flagged.

I never managed to reproduce this particular race condition on my (slow??) laptop, I'm assuming it fixed what you did manage to observe.

You mean the materialize_document_from_bus test, right?

Ah, I actually had a change request. Think it would be good to add debug logging for when an operation is requeued. I can add this, let me know if you have any logging related thoughts @adzialocha.

Sure, please go ahead!

@adzialocha
Copy link
Member Author

Thank you for rebasing!

@sandreae
Copy link
Member

I think you got the it, just maybe clarifying that "concurrent work on the same document will occur" is not true. There is no concurrent work on any same document allowed, otherwise we tap into undefined behaviour because of the database reads and writes on that document.

Sorry, that was a typo on my part, should have read "no concurrent work...."

@sandreae
Copy link
Member

You mean the materialize_document_from_bus test, right?

No I mean, this queue bug, I couldn't reproduce it locally, it was only observable on your fast computer.

@p2panda p2panda deleted a comment from sandreae Mar 14, 2023
@adzialocha adzialocha merged commit c0775be into main Mar 14, 2023
@adzialocha adzialocha deleted the introduce-requeue-flag branch March 14, 2023 15:33
adzialocha added a commit that referenced this pull request Mar 16, 2023
* main:
  Update breaking API calls for new `p2panda-rs` version (#293)
  Update Cargo.lock as well
  Use released p2panda-rs version 0.7.0
  Migrate CLI from `structopt` to `clap` (#289)
  Increase timeout for failing materializer test
  Introduce requeue flag (#286)
  Do transactions correctly (#285)
  Add libp2p service and configuration (#282)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Updates not getting materialized
2 participants