-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race between aggregation & collection. #1254
Conversation
4608199
to
5fc10bf
Compare
d06b9b3
to
e8cbad9
Compare
The basic strategy is to introduce a new "batch aggregation state" column, which records the current state of the batch aggregation. The accumulator notes batch aggregations which ave been collected & refuses to collect them. If an aggregation occurs concurrently with a collection, the aggregation process' write to the batch_aggregations row (to update the share/count/checksum) will conflict with the collection process' write to the batch_aggregations row (to update the state), incurring a retry -- this gives a DB-level protection against races here. On collection, we additionally write any relevant batch aggregation rows which do not yet exist; this protects against the corner case that some batch aggregation shard didn't get written at all, which would dodge the write conflict we depend on to protect against race conditions. Note that this change does not attempt to address the semantics of "when is the earliest time we can give up on a report share because it is for a batch that has been collected" -- we might (wastefully) aggregate such reports; we are just now guaranteed that they won't be merged into an aggregate share which has been collected, throwing their contribution away instead. This also does not guarantee that the Leader won't send an aggregate share request concurrently with an aggregation request which finishes an aggregation -- this could cause the Leader & Helper to fall out-of-sync in terms of which reports are included in the batch. (I think this concurrency issue can still occur, though it is unlikely since an aggregation job would need to be created & completed in the time it takes for a single aggregate share request to be processed.)
e8cbad9
to
da77b74
Compare
Humorously, I didn't detect this during local testing because the schema-version enforcement was merged concurrently with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posting some initial basic thoughts on the change. I want to spend more time thinking about this and will review again tomorrow.
'COLLECTED' -- this batch aggregation has been collected & no longer permits aggregation | ||
); | ||
|
||
ALTER TABLE batch_aggregations ADD COLUMN state BATCH_AGGREGATION_STATE NOT NULL; -- the current state of this batch aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it help to make this DEFAULT 'AGGREGATING'
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I don't think it makes a difference.
Without DEFAULT 'AGGREGATING'
(or some other default value), this migration can't be applied if there are any batch_aggregations
rows since we would be trying to create a new, non-nullable column with no default value.
But DEFAULT 'AGGREGATING'
is not correct for a system which already has batch aggregations; for correctness, we would need already-collected aggregations to move to state 'COLLECTED'. But we'd need to implement a data migration to make this happen.
Since we are currently not implementing data migrations for non-backwards-compatible schema changes, I'd prefer not specifying a default value: the main application code doesn't need it; it might hypothetically mask bugs in application code; and the lack of a default can be viewed as providing a safeguard against applying this migration to a system with data, since it will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the questions I put inline, I think this needs more test coverage. This stuff is quite subtle, so I worry about the risk of regressions. It should be possible to construct tests that fill a database with the appropriate state and then try to step aggregation jobs, collections etc. I suspect some of the tests I wrote in #1180 would help.
aggregator/src/aggregator.rs
Outdated
.iter() | ||
.map(|ba| (ba.batch_identifier(), ba.ord())) | ||
.collect(); | ||
let empty_batch_aggregations: Vec<_> = iproduct!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to set some floor on the time precision, and/or ceiling on how large of a collect request we will accept, relative to the time precision. Otherwise, someone could innocently set time precision to one second and create a lot of database write load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I think this is beyond the scope of this PR. This was already a problem: a too-small time precision would cause a collect request of a few hours to attempt to read many thousands of rows.
tx.release_collection_job(&lease).await?; | ||
try_join!( | ||
tx.update_collection_job::<SEED_SIZE, Q, A>(&collection_job), | ||
try_join_all(batch_aggregations.iter().map(|ba| tx.update_batch_aggregation(ba))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that this update could stomp on concurrent changes from a concurrent aggregation flow, running between the step_collection_job_1
and step_collection_job_2
transactions. The intent of this update is to flip the batch aggregation's state enum, but if the aggregate share/report count/checksum/timestamp interval have been updated in the database, we'd lose those changes, when we should instead retry in hopes of getting a consistent view of the batch. Our leasing system only protects changes to the collection job, not batch aggregations. (the below put_batch_aggregation()
call should be okay, because any concurrently created batch aggregation rows would result in an error, and an eventual retry at the job driver level)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the update of the batch aggregation state to the first transaction to avoid this possibility.
Good point -- I augmented aggregation tests to check the new behavior of aggregating into an already-collected batch. These tests don't test behavior when a collect races with an aggregation job, but this is not usually the job of a unit test IMO. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to reassign the migration a new date, since the 20230424220336_rm-out-share
landed before this, and our migrations ought to be append-only when sorted by timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to do this, as the 20230424220336_rm-out-share
migration hasn't been released yet, so both migrations will be released as part of the same Janus version. That they will be applied out of merge order will not cause a problem -- our tests show this as they run in this order.
You highlight a real problem, however, if a release was involved. Something like the following can occur:
- PR A introduces a migration, and ends up stuck in review for a while.
- PR B introduces a separate migration (dated after PR A's migration); this PR is merged quickly.
- A Janus release occurs; PR B's migration is deployed.
- PR A is merged.
The next release will be busted, because PR A's migration is "before" PR B's migration, but has not been applied, which breaks sqlx
's expectations; I'm not sure how sqlx
would respond to this situation but I suspect we'd need to manually fix things up.
I think we should abandon sqlx
's date-based scheme and replace it with an arbitrary-number scheme (so migrations' "dates" are e.g. 00000000000001
, 00000000000002
, 00000000000003
, etc). This would ensure a strict ordering of schema updates based on merge order, with git enforcing conflict resolutions. I don't think this would be too hard, as long as sqlx
doesn't break when handling "dates" of this format -- our current sqlx migrate add -r <name> --source /path/to/janus/db
would just become "manually create a couple of files with the appropriate naming scheme".
I'll experiment with this change in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1304 for the change from timestamp-based ordering to opaque-counter-based ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(merging from main
, I renumbered the migration in this PR to match the new numbering scheme, which places it at the end.)
The basic strategy is to introduce a new "batch aggregation state"
column, which records the current state of the batch aggregation. The
accumulator notes batch aggregations which have been collected & refuses
to aggregate into them. If an aggregation occurs concurrently with a
collection, the aggregation process' write to the batch_aggregations row
(to update the share/count/checksum) will conflict with the collection
process' write to the batch_aggregations row (to update the state),
incurring a retry -- this gives a DB-level protection against races
here.
On collection, we additionally write any relevant batch aggregation rows
which do not yet exist; this protects against the corner case that some
batch aggregation shard didn't get written at all, which would dodge the
write conflict we depend on to protect against race conditions.
Note that this change does not attempt to address the semantics of "when
is the earliest time we can give up on a report share because it is for
a batch that has been collected" -- we might (wastefully) aggregate such
reports; we are just now guaranteed that they won't be merged into an
aggregate share which has been collected, throwing their contribution
away instead. This also does not guarantee that the Leader won't send an
aggregate share request concurrently with an aggregation request which
finishes an aggregation -- this could cause the Leader & Helper to fall
out-of-sync in terms of which reports are included in the batch. (I
think this concurrency issue can still occur, though it is unlikely
since an aggregation job would need to be created & completed in the
time it takes for a single aggregate share request to be processed.)