Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't aggregate collected time intervals #1180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 235 additions & 8 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,30 +764,35 @@ mod tests {
use futures::{future::try_join_all, TryFutureExt};
use janus_aggregator_core::{
datastore::{
models::{AggregationJob, CollectionJob, CollectionJobState, LeaderStoredReport},
test_util::ephemeral_datastore,
Transaction,
models::{
AggregationJob, CollectionJob, CollectionJobState, LeaderStoredReport,
ReportAggregation, ReportAggregationState,
},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore, Transaction,
},
query_type::AccumulableQueryType,
task::{test_util::TaskBuilder, QueryType as TaskQueryType},
task::{test_util::TaskBuilder, QueryType as TaskQueryType, Task},
};
use janus_core::{
task::{VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
install_test_trace_subscriber, run_vdaf,
},
time::{Clock, MockClock, TimeExt},
time::{Clock, IntervalExt, MockClock, TimeExt},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
AggregationJobId, AggregationJobRound, Interval, ReportId, Role, TaskId, Time,
AggregationJobId, AggregationJobRound, HpkeCiphertext, HpkeConfigId, Interval, ReportId,
ReportMetadata, Role, TaskId, Time,
};
use prio::{
codec::ParameterizedDecode,
field::Field64,
vdaf::{
prio3::{Prio3, Prio3Count},
Aggregator,
AggregateShare, Aggregator, OutputShare,
},
};
use rand::random;
Expand Down Expand Up @@ -1000,6 +1005,228 @@ mod tests {
assert_eq!(all_report_ids, seen_report_ids);
}

struct CollectedIntervalTestCase {
task: Arc<Task>,
reports: Vec<LeaderStoredReport<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>>,
_ephemeral_datastore: EphemeralDatastore,
datastore: Datastore<MockClock>,
}

impl CollectedIntervalTestCase {
const MIN_AGGREGATION_JOB_SIZE: usize = 50;
const MAX_AGGREGATION_JOB_SIZE: usize = 60;

async fn setup(
collection_job_state: CollectionJobState<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>,
) -> Self {
// Setup.
install_test_trace_subscriber();
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = ephemeral_datastore.datastore(clock.clone());
let task = Arc::new(
TaskBuilder::new(
TaskQueryType::TimeInterval,
VdafInstance::Prio3Count,
Role::Leader,
)
.build(),
);
let vdaf = Prio3Count::new_count(2).unwrap();
let verify_key = task.primary_vdaf_verify_key().unwrap();

// Create enough reports for an aggregation job
let report_time = clock.now();
let reports: Vec<LeaderStoredReport<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>> =
iter::repeat_with(|| {
let report_id = random();
let transcript = run_vdaf(&vdaf, verify_key.as_bytes(), &(), &report_id, &1);
LeaderStoredReport::new(
*task.id(),
ReportMetadata::new(report_id, report_time),
transcript.public_share,
Vec::new(),
transcript.input_shares[0].clone(),
HpkeCiphertext::new(HpkeConfigId::from(13), Vec::new(), Vec::new()),
)
})
.take(Self::MIN_AGGREGATION_JOB_SIZE)
.collect();

// Create a collection job whose time interval includes all the reports
let collection_job =
CollectionJob::<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>::new(
*task.id(),
random(),
Interval::from_time(&report_time).unwrap(),
(),
collection_job_state,
);

datastore
.run_tx(|tx| {
let (vdaf, task, reports, collection_job) = (
vdaf.clone(),
task.clone(),
reports.clone(),
collection_job.clone(),
);
Box::pin(async move {
tx.put_task(&task).await.unwrap();
for report in reports {
tx.put_client_report(&vdaf, &report).await.unwrap();
}
tx.put_collection_job(&collection_job).await.unwrap();

Ok(())
})
})
.await
.unwrap();

Self {
task,
_ephemeral_datastore: ephemeral_datastore,
reports,
datastore,
}
}

async fn run(
self,
) -> Vec<AggregationJob<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>> {
// Run.
let job_creator = Arc::new(AggregationJobCreator {
datastore: self.datastore,
tasks_update_frequency: Duration::from_secs(3600),
aggregation_job_creation_interval: Duration::from_secs(1),
min_aggregation_job_size: Self::MIN_AGGREGATION_JOB_SIZE,
max_aggregation_job_size: Self::MAX_AGGREGATION_JOB_SIZE,
});
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&self.task))
.await
.unwrap();

// Verify.
job_creator
.datastore
.run_tx(|tx| {
let task_id = *self.task.id();
Box::pin(async move {
Ok(tx.get_aggregation_jobs_for_task(&task_id).await.unwrap())
})
})
.await
.unwrap()
}
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_started_no_lease() {
// If the collection job has been created but never leased, then we can still run new
// aggregation jobs.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Start).await;
assert!(!test_case.run().await.is_empty());
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_started_with_lease()
{
// If the collection job has been leased, then we've committed to a set of reports and may
// create no additional aggregation jobs for the interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Start).await;

// Populate datastore with necessary report aggregations and aggregation job so that the
// collection job will be acquirable.
let aggregation_job_id = test_case
.datastore
.run_tx(|tx| {
let (task_id, reports) = (*test_case.task.id(), test_case.reports.clone());
Box::pin(async move {
let aggregation_job =
AggregationJob::<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>::new(
task_id,
random(),
(),
(),
Interval::from_time(reports[0].metadata().time()).unwrap(),
janus_aggregator_core::datastore::models::AggregationJobState::Finished,
AggregationJobRound::from(1),
);

tx.put_aggregation_job(&aggregation_job).await.unwrap();

for (ord, report) in reports.iter().enumerate() {
let report_aggregation =
ReportAggregation::<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>::new(
task_id,
*aggregation_job.id(),
*report.metadata().id(),
*report.metadata().time(),
ord as u64,
ReportAggregationState::Start,
);
tx.put_report_aggregation(&report_aggregation)
.await
.unwrap();
}

let leases = tx
.acquire_incomplete_time_interval_collection_jobs(
&Duration::from_secs(60),
10,
)
.await
.unwrap();
assert!(!leases.is_empty());

Ok(*aggregation_job.id())
})
})
.await
.unwrap();

let aggregation_jobs = test_case.run().await;
assert_eq!(aggregation_jobs.len(), 1);
assert_eq!(aggregation_jobs[0].id(), &aggregation_job_id);
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_finished() {
// If the collection job is finished, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Finished {
report_count: 1,
encrypted_helper_aggregate_share: HpkeCiphertext::new(
HpkeConfigId::from(12),
Vec::new(),
Vec::new(),
),
leader_aggregate_share: AggregateShare::from(OutputShare::from(Vec::from([
Field64::from(7),
]))),
})
.await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_abandoned() {
// If the collection job is abandoned, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Abandoned).await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_deleted() {
// If the collection job is deleted, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Deleted).await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_not_enough_reports() {
// Setup.
Expand Down
32 changes: 23 additions & 9 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,13 @@ impl<C: Clock> Transaction<'_, C> {
}

/// `get_unaggregated_client_report_ids_for_task` returns some report IDs corresponding to
/// unaggregated client reports for the task identified by the given task ID. Returned client
/// reports are marked as aggregation-started: the caller must either create an aggregation job
/// with, or call `mark_reports_unaggregated` on each returned report as part of the same
/// transaction.
/// unaggregated client reports for the task identified by the given task ID. Reports whose
/// timestamps fall into an interval covered by a collection job that has been acquired or is
/// finished, abandoned or deleted are not returned.
///
/// Returned client reports are marked as aggregation-started: the caller must either create an
/// aggregation job with, or call `mark_reports_unaggregated` on each returned report as part of
/// the same transaction.
///
/// This should only be used with VDAFs that have an aggregation parameter of the unit type. It
/// relies on this assumption to find relevant reports without consulting collection jobs. For
Expand All @@ -1035,11 +1038,22 @@ impl<C: Clock> Transaction<'_, C> {
.prepare_cached(
"UPDATE client_reports SET aggregation_started = TRUE
WHERE id IN (
SELECT id FROM client_reports
WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)
AND aggregation_started = FALSE
FOR UPDATE SKIP LOCKED
LIMIT 5000
SELECT client_reports.id FROM client_reports LEFT JOIN collection_jobs
ON client_reports.task_id = collection_jobs.task_id
AND collection_jobs.batch_interval @> client_reports.client_timestamp
WHERE client_reports.task_id = (SELECT tasks.id FROM tasks WHERE tasks.task_id = $1)
AND client_reports.aggregation_started = FALSE
AND (
-- select reports that are not yet associated with a collect job
collection_jobs.state IS NULL
-- select reports whose collect job has been scheduled but never yet leased
OR (
Copy link
Contributor

@branlwyd branlwyd Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this join is probably fine from a performance perspective.

From an implementation perspective, I think we can drop this particular clause (checking for START & unleased), if we want; I don't think we care that much about this relatively small window of time.

However, I don't think this solves a race condition. Specifically, I think the following is still possible with this PR:

  • Aggregation job creator triggers for a given task, runs this code to grab some client reports, and delays.
  • After this, a collection request arrives & results in the creation of a collection job for an interval overlapping with some of the client reports from the previous step.
  • The aggregation job creator from the first step continues, and creates an aggregation job with reports overlapping with the collection job's interval.

I think creating such an aggregation job is what we are trying to avoid.

I think a working solution might be something like: update the aggregation job driver to not drive (i.e. drop from its requests) any reports which have a conflicting collection job. Alternatively, just don't include such reports in the accumulator, though this might be harder to coordinate between the two aggregators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about the race condition. If we move this check to the top of stepping an aggregation job, then the downside is that the aggregation job creator would continue scheduling potentially un-runnable aggregation jobs, but they can be cheaply abandoned by the aggregation job driver.

So in the interests of reducing complexity, I think we should put this check in just one place, and have that be aggregation_job_driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the aggregation job driver to not drive (i.e. drop from its requests) any reports which have a conflicting collection job

I think this approach has a race condition too. Specifically:

  • Let some aggregation job exist & be one step away from completion.
  • Aggregation job driver picks up the job, begins stepping it, and delays.
  • Concurrently, a collection job is created & driven for a time interval overlapping the aggregation job. This succeeds.
  • The aggregation job driver finishes stepping the job, aggregating additional reports into a collection job that has been completed.

I'm going to think about this further, but I wanted to point out the flaw in this approach before any implementation work happens.

Copy link
Collaborator

@divergentdave divergentdave Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach has a race condition too. Specifically:

  • Let some aggregation job exist & be one step away from completion.
  • Aggregation job driver picks up the job, begins stepping it, and delays.
  • Concurrently, a collection job is created & driven for a time interval overlapping the aggregation job. This succeeds.
  • The aggregation job driver finishes stepping the job, aggregating additional reports into a collection job that has been completed.

The collection job acquisition query would not pick up a collection job if it sees a related aggregation job that is in progress. If we stipulate that the collection job acquisition query's transaction takes a long time to run, and the aggregation job from the first bullet point got created and advanced after the start of the collection job driver's transaction, then we could have this same issue thanks to a phantom read anomaly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the following checks:

  • An aggregation job will not drive reports with a conflicting collect job.
  • A collect job will not step until all (potentially?) intersecting aggregation jobs reach a terminal state.

I think this works because: due to the way Repeatable Read works, one of these jobs must be committed first. The check on the other job will then keep it from creating a race condition.

Note that the aggregation job in particular "will not drive reports" rather than "will abandon the whole aggregation job" -- aggregation jobs are no longer necessarily within a single time interval/batch. We can't abandon the whole aggregation job or we'll potentially lose reports in other time intervals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suspect that to make useful progress here, we need to clarify our state machine(s) for reports, aggregation jobs, leader collection jobs and helper aggregate share jobs. Then we can understand the synchronization points and figure out how to implement them properly. So I think this change should go on the backburner as I want to focus on production issues more immediately.

collection_jobs.state = 'START'
AND collection_jobs.lease_expiry = TIMESTAMP '-infinity'
)
)
FOR UPDATE OF client_reports SKIP LOCKED
LIMIT 5000
)
RETURNING report_id, client_timestamp",
)
Expand Down