Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't aggregate collected time intervals #1180

Closed
wants to merge 1 commit into from

Conversation

tgeoghegan
Copy link
Contributor

In time interval tasks, the leader should not schedule aggregation jobs that include reports whose timestamp falls into the time interval of a collected batch. With this commit, aggregation_job_creator now checks whether an unaggregated report falls into the time interval of a collection job that is in the finished, deleted or abandoned state, or a collection job in the start state whose lease has been required (which is to say, it is currently or has previously been stepped by collection_job_driver).

Solving this requires us to be clear on when Janus commits a set of reports to a collection. Perhaps surprisingly, this doesn't happen at the time of creating a collection job (i.e., handling PUT /tasks/{task-id}/collection_jobs/{collection-job-id}), but rather the first time that collection_job_driver tries to step the job (because it'll query the database to see what report aggregations are finished which match the query). So it's from that point that Janus leader will refuse to aggregate more reports for the time interval.

This doesn't quite square with what DAP says. DAP-04 section 4.5.2 says (1):

Once an AggregateShareReq has been issued for the batch determined by
a given query, it is an error for the Leader to issue any more
aggregation jobs for additional reports that satisfy the query.

Additionally section 4.4.1.4 tells us:

If the report pertains to a batch that was previously collected, then
make sure the report was already included in all previous collections
for the batch. If not, the input share MUST be marked as invalid with
error "batch_collected". [...] The Leader considers a batch to be
collected once it has completed a collection job for a CollectionReq
message from the Collector
; the Helper considers a batch to be
collected once it has responded to an AggregateShareReq message from
the Leader.

(emphasis mine)

A strict reading of 4.4.1.4 suggests that the leader could admit new reports into a collection time interval right up until it finishes a collection job and sends its results to the collector, but in fact sending AggregateShareReq to the helper commits a set of reports to a collection.

@tgeoghegan tgeoghegan requested a review from a team as a code owner March 31, 2023 21:16
In time interval tasks, the leader should not schedule aggregation jobs
that include reports whose timestamp falls into the time interval of a
collected batch. With this commit, `aggregation_job_creator` now checks
whether an unaggregated report falls into the time interval of a
collection job that is in the finished, deleted or abandoned state, or a
collection job in the start state whose lease has been required (which
is to say, it is currently or has previously been stepped by
`collection_job_driver`).

Solving this requires us to be clear on when Janus commits a set of
reports to a collection. Perhaps surprisingly, this doesn't happen at
the time of creating a collection job (i.e., handling `PUT
/tasks/{task-id}/collection_jobs/{collection-job-id}`), but rather the
first time that `collection_job_driver` tries to step the job (because
it'll query the database to see what report aggregations are finished
which match the query). So it's from that point that Janus leader will
refuse to aggregate more reports for the time interval.

This doesn't quite square with what DAP says. DAP-04 section 4.5.2 says
([1]):

> Once an AggregateShareReq has been issued for the batch determined by
> a given query, it is an error for the Leader to issue any more
> aggregation jobs for additional reports that satisfy the query.

Additionally section 4.4.1.4 tells us:

> If the report pertains to a batch that was previously collected, then
> make sure the report was already included in all previous collections
> for the batch. If not, the input share MUST be marked as invalid with
> error "batch_collected". [...] *The Leader considers a batch to be
> collected once it has completed a collection job for a CollectionReq
> message from the Collector*; the Helper considers a batch to be
> collected once it has responded to an AggregateShareReq message from
> the Leader.

(emphasis mine)

A strict reading of 4.4.1.4 suggests that the leader could admit new
reports into a collection time interval right up until it finishes a
collection job and sends its results to the collector, but in fact
sending `AggregateShareReq` to the helper commits a set of reports to a
collection.

[1]: https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-04#section-4.5.2-16
[2]: https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-04#section-4.4.1.4-3.7.1
@tgeoghegan tgeoghegan force-pushed the timg/leader-agg-jobs-collected-interval branch from bce12f8 to c03a715 Compare March 31, 2023 21:59
Copy link
Contributor

@divergentdave divergentdave left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this for correctness, given our current architecture, but this will introduce another place where query runtime is affected by the total number of collection jobs, unfortunately.

-- select reports that are not yet associated with a collect job
collection_jobs.state IS NULL
-- select reports whose collect job has been scheduled but never yet leased
OR (
Copy link
Member

@branlwyd branlwyd Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this join is probably fine from a performance perspective.

From an implementation perspective, I think we can drop this particular clause (checking for START & unleased), if we want; I don't think we care that much about this relatively small window of time.

However, I don't think this solves a race condition. Specifically, I think the following is still possible with this PR:

  • Aggregation job creator triggers for a given task, runs this code to grab some client reports, and delays.
  • After this, a collection request arrives & results in the creation of a collection job for an interval overlapping with some of the client reports from the previous step.
  • The aggregation job creator from the first step continues, and creates an aggregation job with reports overlapping with the collection job's interval.

I think creating such an aggregation job is what we are trying to avoid.

I think a working solution might be something like: update the aggregation job driver to not drive (i.e. drop from its requests) any reports which have a conflicting collection job. Alternatively, just don't include such reports in the accumulator, though this might be harder to coordinate between the two aggregators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the race condition. If we move this check to the top of stepping an aggregation job, then the downside is that the aggregation job creator would continue scheduling potentially un-runnable aggregation jobs, but they can be cheaply abandoned by the aggregation job driver.

So in the interests of reducing complexity, I think we should put this check in just one place, and have that be aggregation_job_driver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the aggregation job driver to not drive (i.e. drop from its requests) any reports which have a conflicting collection job

I think this approach has a race condition too. Specifically:

  • Let some aggregation job exist & be one step away from completion.
  • Aggregation job driver picks up the job, begins stepping it, and delays.
  • Concurrently, a collection job is created & driven for a time interval overlapping the aggregation job. This succeeds.
  • The aggregation job driver finishes stepping the job, aggregating additional reports into a collection job that has been completed.

I'm going to think about this further, but I wanted to point out the flaw in this approach before any implementation work happens.

Copy link
Contributor

@divergentdave divergentdave Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach has a race condition too. Specifically:

  • Let some aggregation job exist & be one step away from completion.
  • Aggregation job driver picks up the job, begins stepping it, and delays.
  • Concurrently, a collection job is created & driven for a time interval overlapping the aggregation job. This succeeds.
  • The aggregation job driver finishes stepping the job, aggregating additional reports into a collection job that has been completed.

The collection job acquisition query would not pick up a collection job if it sees a related aggregation job that is in progress. If we stipulate that the collection job acquisition query's transaction takes a long time to run, and the aggregation job from the first bullet point got created and advanced after the start of the collection job driver's transaction, then we could have this same issue thanks to a phantom read anomaly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the following checks:

  • An aggregation job will not drive reports with a conflicting collect job.
  • A collect job will not step until all (potentially?) intersecting aggregation jobs reach a terminal state.

I think this works because: due to the way Repeatable Read works, one of these jobs must be committed first. The check on the other job will then keep it from creating a race condition.

Note that the aggregation job in particular "will not drive reports" rather than "will abandon the whole aggregation job" -- aggregation jobs are no longer necessarily within a single time interval/batch. We can't abandon the whole aggregation job or we'll potentially lose reports in other time intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suspect that to make useful progress here, we need to clarify our state machine(s) for reports, aggregation jobs, leader collection jobs and helper aggregate share jobs. Then we can understand the synchronization points and figure out how to implement them properly. So I think this change should go on the backburner as I want to focus on production issues more immediately.

@tgeoghegan
Copy link
Contributor Author

Obsoleted by #1254

@tgeoghegan tgeoghegan closed this Apr 26, 2023
@tgeoghegan tgeoghegan deleted the timg/leader-agg-jobs-collected-interval branch May 23, 2023 22:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants