Skip to content

Commit

Permalink
Explicitly store last prepare steps for replay. (#1253)
Browse files Browse the repository at this point in the history
This fixes some buggy behavior in the corners of replay. For
example, if a round which ended up dropping a report was replayed, the
Helper would respond differently on replay: no prepare step at all the
first time, a failed prepare step with error ReportDropped on replay.
[There may be more; I didn't do a close reading of every code path.]
  • Loading branch information
branlwyd authored Apr 17, 2023
1 parent 1136ece commit 26a25be
Show file tree
Hide file tree
Showing 13 changed files with 455 additions and 521 deletions.
191 changes: 85 additions & 106 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use janus_aggregator_core::{
self,
models::{
AggregateShareJob, AggregationJob, AggregationJobState, BatchAggregation,
CollectionJob, CollectionJobState, LeaderStoredReport, PrepareMessageOrShare,
ReportAggregation, ReportAggregationState,
CollectionJob, CollectionJobState, LeaderStoredReport, ReportAggregation,
ReportAggregationState,
},
Datastore, Transaction,
},
Expand Down Expand Up @@ -112,6 +112,7 @@ pub(crate) fn aggregate_step_failure_counter(meter: &Meter) -> Counter<u64> {
// Initialize counters with desired status labels. This causes Prometheus to see the first
// non-zero value we record.
for failure_type in [
"missing_prepare_message",
"missing_leader_input_share",
"missing_helper_input_share",
"prepare_init_failure",
Expand Down Expand Up @@ -1106,26 +1107,16 @@ where
{
report_share: ReportShare,
report_aggregation: ReportAggregation<SEED_SIZE, A>,
prep_result: PrepareStepResult,
existing_report_aggregation: bool,
conflicting_aggregate_share: bool,
}

impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> ReportShareData<SEED_SIZE, A>
where
A: vdaf::Aggregator<SEED_SIZE, 16>,
{
fn new(
report_share: ReportShare,
report_aggregation: ReportAggregation<SEED_SIZE, A>,
prep_result: PrepareStepResult,
) -> Self {
fn new(report_share: ReportShare, report_aggregation: ReportAggregation<SEED_SIZE, A>) -> Self {
Self {
report_share,
report_aggregation,
prep_result,
existing_report_aggregation: false,
conflicting_aggregate_share: false,
}
}
}
Expand Down Expand Up @@ -1178,16 +1169,6 @@ impl VdafOps {
)
.await?;

// Filter out any report shares in the incoming message that wouldn't get written out: we
// don't expect to see those in the datastore.
let incoming_report_share_data: Vec<_> = incoming_report_share_data
.iter()
.filter(|report_share_data| {
!report_share_data.existing_report_aggregation
&& !report_share_data.conflicting_aggregate_share
})
.collect();

if existing_report_aggregations.len() != incoming_report_share_data.len() {
return Ok(false);
}
Expand All @@ -1199,12 +1180,15 @@ impl VdafOps {
if incoming_report_share_data
.iter()
.zip(existing_report_aggregations)
.any(|(incoming_report_share, existing_report_share)| {
!existing_report_share
.report_metadata()
.eq(incoming_report_share.report_share.metadata())
|| !existing_report_share.eq(&incoming_report_share.report_aggregation)
})
.any(
|(incoming_report_share_data, existing_report_aggregation)| {
!existing_report_aggregation
.report_metadata()
.eq(incoming_report_share_data.report_share.metadata())
|| !existing_report_aggregation
.eq(&incoming_report_share_data.report_aggregation)
},
)
{
return Ok(false);
}
Expand Down Expand Up @@ -1370,12 +1354,12 @@ impl VdafOps {
*report_share.metadata().id(),
*report_share.metadata().time(),
ord.try_into()?,
ReportAggregationState::<SEED_SIZE, A>::Waiting(
prep_state,
PrepareMessageOrShare::Helper(prep_share),
),
Some(PrepareStep::new(
*report_share.metadata().id(),
PrepareStepResult::Continued(encoded_prep_share),
)),
ReportAggregationState::<SEED_SIZE, A>::Waiting(prep_state, None),
),
PrepareStepResult::Continued(encoded_prep_share),
)
}

Expand All @@ -1387,9 +1371,12 @@ impl VdafOps {
*report_share.metadata().id(),
*report_share.metadata().time(),
ord.try_into()?,
Some(PrepareStep::new(
*report_share.metadata().id(),
PrepareStepResult::Failed(err),
)),
ReportAggregationState::<SEED_SIZE, A>::Failed(err),
),
PrepareStepResult::Failed(err),
),
});
}
Expand Down Expand Up @@ -1428,7 +1415,7 @@ impl VdafOps {
AggregationJobRound::from(0),
));

let prep_steps = datastore
Ok(datastore
.run_tx_with_name("aggregate_init", |tx| {
let (vdaf, task, req, aggregation_job, mut report_share_data) = (
vdaf.clone(),
Expand All @@ -1439,14 +1426,14 @@ impl VdafOps {
);

Box::pin(async move {
for mut share_data in report_share_data.iter_mut() {
for mut report_share_data in &mut report_share_data {
// Verify that we haven't seen this report ID and aggregation parameter
// before in another aggregation job, and that the report isn't for a batch
// interval that has already started collection.
let (report_aggregation_exists, conflicting_aggregate_share_jobs) = try_join!(
tx.check_other_report_aggregation_exists::<SEED_SIZE, A>(
task.id(),
share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().id(),
aggregation_job.aggregation_parameter(),
aggregation_job.id(),
),
Expand All @@ -1455,12 +1442,31 @@ impl VdafOps {
&vdaf,
task.id(),
req.batch_selector().batch_identifier(),
share_data.report_share.metadata()
report_share_data.report_share.metadata()
),
)?;

share_data.existing_report_aggregation = report_aggregation_exists;
share_data.conflicting_aggregate_share = !conflicting_aggregate_share_jobs.is_empty();
if report_aggregation_exists {
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::ReportReplayed))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed))
));
} else if !conflicting_aggregate_share_jobs.is_empty() {
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::BatchCollected))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::BatchCollected))
));
}
}

// Write aggregation job.
Expand Down Expand Up @@ -1493,77 +1499,52 @@ impl VdafOps {

// Construct a response and write any new report shares and report aggregations
// as we go.
let mut accumulator = Accumulator::<SEED_SIZE, Q, A>::new(
Arc::clone(&task),
batch_aggregation_shard_count,
aggregation_job.aggregation_parameter().clone(),
);

let mut prep_steps = Vec::new();
for report_share_data in report_share_data
{
if report_share_data.existing_report_aggregation {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed),
));
continue;
}
if report_share_data.conflicting_aggregate_share {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::BatchCollected),
));
continue;
}
if !replayed_request {
let mut accumulator = Accumulator::<SEED_SIZE, Q, A>::new(
Arc::clone(&task),
batch_aggregation_shard_count,
aggregation_job.aggregation_parameter().clone(),
);

if !replayed_request {
for report_share_data in &mut report_share_data
{
// Write client report & report aggregation.
if let Err(error) = tx.put_report_share(
task.id(),
&report_share_data.report_share
).await {
match error {
if let Err(err) = tx.put_report_share(task.id(), &report_share_data.report_share).await {
match err {
datastore::Error::MutationTargetAlreadyExists => {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed),
));
continue;
}
e => return Err(e),
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::ReportReplayed))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed))
));
},
err => return Err(err),
}
}
tx.put_report_aggregation(&report_share_data.report_aggregation).await?;
}

if let ReportAggregationState::<SEED_SIZE, A>::Finished(output_share) =
report_share_data.report_aggregation.state()
{
accumulator.update(
aggregation_job.partial_batch_identifier(),
report_share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().time(),
output_share,
)?;
if let ReportAggregationState::Finished(output_share) = report_share_data.report_aggregation.state()
{
accumulator.update(
aggregation_job.partial_batch_identifier(),
report_share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().time(),
output_share,
)?;
}
}

prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
report_share_data.prep_result.clone(),
));
}

if !replayed_request {
accumulator.flush_to_datastore(tx, &vdaf).await?;
}
Ok(prep_steps)

Ok(Self::aggregation_job_resp_for(report_share_data.into_iter().map(|data| data.report_aggregation)))
})
})
.await?;

// Construct response and return.
Ok(AggregationJobResp::new(prep_steps))
.await?)
}

async fn handle_aggregate_continue_generic<
Expand Down Expand Up @@ -1662,9 +1643,7 @@ impl VdafOps {
}
}
}
return Self::replay_aggregation_job_round::<C, SEED_SIZE, Q, A>(
report_aggregations,
);
return Ok(Self::aggregation_job_resp_for(report_aggregations));
} else if helper_aggregation_job.round().increment()
!= leader_aggregation_job.round()
{
Expand All @@ -1685,14 +1664,14 @@ impl VdafOps {
// compute the next round of prepare messages and state.
Self::step_aggregation_job(
tx,
&task,
&vdaf,
task,
vdaf,
batch_aggregation_shard_count,
helper_aggregation_job,
report_aggregations,
&leader_aggregation_job,
leader_aggregation_job,
request_hash,
&aggregate_step_failure_counter,
aggregate_step_failure_counter,
)
.await
})
Expand Down
Loading

0 comments on commit 26a25be

Please sign in to comment.