Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly store last prepare steps for replay. #1253

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 85 additions & 106 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use janus_aggregator_core::{
self,
models::{
AggregateShareJob, AggregationJob, AggregationJobState, BatchAggregation,
CollectionJob, CollectionJobState, LeaderStoredReport, PrepareMessageOrShare,
ReportAggregation, ReportAggregationState,
CollectionJob, CollectionJobState, LeaderStoredReport, ReportAggregation,
ReportAggregationState,
},
Datastore, Transaction,
},
Expand Down Expand Up @@ -112,6 +112,7 @@ pub(crate) fn aggregate_step_failure_counter(meter: &Meter) -> Counter<u64> {
// Initialize counters with desired status labels. This causes Prometheus to see the first
// non-zero value we record.
for failure_type in [
"missing_prepare_message",
"missing_leader_input_share",
"missing_helper_input_share",
"prepare_init_failure",
Expand Down Expand Up @@ -1106,26 +1107,16 @@ where
{
report_share: ReportShare,
report_aggregation: ReportAggregation<SEED_SIZE, A>,
prep_result: PrepareStepResult,
existing_report_aggregation: bool,
conflicting_aggregate_share: bool,
}

impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> ReportShareData<SEED_SIZE, A>
where
A: vdaf::Aggregator<SEED_SIZE, 16>,
{
fn new(
report_share: ReportShare,
report_aggregation: ReportAggregation<SEED_SIZE, A>,
prep_result: PrepareStepResult,
) -> Self {
fn new(report_share: ReportShare, report_aggregation: ReportAggregation<SEED_SIZE, A>) -> Self {
Self {
report_share,
report_aggregation,
prep_result,
existing_report_aggregation: false,
conflicting_aggregate_share: false,
}
}
}
Expand Down Expand Up @@ -1178,16 +1169,6 @@ impl VdafOps {
)
.await?;

// Filter out any report shares in the incoming message that wouldn't get written out: we
// don't expect to see those in the datastore.
let incoming_report_share_data: Vec<_> = incoming_report_share_data
.iter()
.filter(|report_share_data| {
!report_share_data.existing_report_aggregation
&& !report_share_data.conflicting_aggregate_share
})
.collect();

if existing_report_aggregations.len() != incoming_report_share_data.len() {
return Ok(false);
}
Expand All @@ -1199,12 +1180,15 @@ impl VdafOps {
if incoming_report_share_data
.iter()
.zip(existing_report_aggregations)
.any(|(incoming_report_share, existing_report_share)| {
!existing_report_share
.report_metadata()
.eq(incoming_report_share.report_share.metadata())
|| !existing_report_share.eq(&incoming_report_share.report_aggregation)
})
.any(
|(incoming_report_share_data, existing_report_aggregation)| {
!existing_report_aggregation
.report_metadata()
.eq(incoming_report_share_data.report_share.metadata())
|| !existing_report_aggregation
.eq(&incoming_report_share_data.report_aggregation)
},
)
{
return Ok(false);
}
Expand Down Expand Up @@ -1370,12 +1354,12 @@ impl VdafOps {
*report_share.metadata().id(),
*report_share.metadata().time(),
ord.try_into()?,
ReportAggregationState::<SEED_SIZE, A>::Waiting(
prep_state,
PrepareMessageOrShare::Helper(prep_share),
),
Some(PrepareStep::new(
*report_share.metadata().id(),
PrepareStepResult::Continued(encoded_prep_share),
)),
ReportAggregationState::<SEED_SIZE, A>::Waiting(prep_state, None),
),
PrepareStepResult::Continued(encoded_prep_share),
)
}

Expand All @@ -1387,9 +1371,12 @@ impl VdafOps {
*report_share.metadata().id(),
*report_share.metadata().time(),
ord.try_into()?,
Some(PrepareStep::new(
*report_share.metadata().id(),
PrepareStepResult::Failed(err),
)),
ReportAggregationState::<SEED_SIZE, A>::Failed(err),
),
PrepareStepResult::Failed(err),
),
});
}
Expand Down Expand Up @@ -1428,7 +1415,7 @@ impl VdafOps {
AggregationJobRound::from(0),
));

let prep_steps = datastore
Ok(datastore
.run_tx_with_name("aggregate_init", |tx| {
let (vdaf, task, req, aggregation_job, mut report_share_data) = (
vdaf.clone(),
Expand All @@ -1439,14 +1426,14 @@ impl VdafOps {
);

Box::pin(async move {
for mut share_data in report_share_data.iter_mut() {
for mut report_share_data in &mut report_share_data {
// Verify that we haven't seen this report ID and aggregation parameter
// before in another aggregation job, and that the report isn't for a batch
// interval that has already started collection.
let (report_aggregation_exists, conflicting_aggregate_share_jobs) = try_join!(
tx.check_other_report_aggregation_exists::<SEED_SIZE, A>(
task.id(),
share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().id(),
aggregation_job.aggregation_parameter(),
aggregation_job.id(),
),
Expand All @@ -1455,12 +1442,31 @@ impl VdafOps {
&vdaf,
task.id(),
req.batch_selector().batch_identifier(),
share_data.report_share.metadata()
report_share_data.report_share.metadata()
),
)?;

share_data.existing_report_aggregation = report_aggregation_exists;
share_data.conflicting_aggregate_share = !conflicting_aggregate_share_jobs.is_empty();
if report_aggregation_exists {
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::ReportReplayed))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed))
));
} else if !conflicting_aggregate_share_jobs.is_empty() {
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::BatchCollected))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::BatchCollected))
));
}
}

// Write aggregation job.
Expand Down Expand Up @@ -1493,77 +1499,52 @@ impl VdafOps {

// Construct a response and write any new report shares and report aggregations
// as we go.
let mut accumulator = Accumulator::<SEED_SIZE, Q, A>::new(
Arc::clone(&task),
batch_aggregation_shard_count,
aggregation_job.aggregation_parameter().clone(),
);

let mut prep_steps = Vec::new();
for report_share_data in report_share_data
{
if report_share_data.existing_report_aggregation {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed),
));
continue;
}
if report_share_data.conflicting_aggregate_share {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::BatchCollected),
));
continue;
}
if !replayed_request {
let mut accumulator = Accumulator::<SEED_SIZE, Q, A>::new(
Arc::clone(&task),
batch_aggregation_shard_count,
aggregation_job.aggregation_parameter().clone(),
);

if !replayed_request {
for report_share_data in &mut report_share_data
{
// Write client report & report aggregation.
if let Err(error) = tx.put_report_share(
task.id(),
&report_share_data.report_share
).await {
match error {
if let Err(err) = tx.put_report_share(task.id(), &report_share_data.report_share).await {
match err {
datastore::Error::MutationTargetAlreadyExists => {
prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed),
));
continue;
}
e => return Err(e),
report_share_data.report_aggregation =
report_share_data.report_aggregation
.clone()
.with_state(ReportAggregationState::Failed(
ReportShareError::ReportReplayed))
.with_last_prep_step(Some(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
PrepareStepResult::Failed(ReportShareError::ReportReplayed))
));
},
err => return Err(err),
}
}
tx.put_report_aggregation(&report_share_data.report_aggregation).await?;
}

if let ReportAggregationState::<SEED_SIZE, A>::Finished(output_share) =
report_share_data.report_aggregation.state()
{
accumulator.update(
aggregation_job.partial_batch_identifier(),
report_share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().time(),
output_share,
)?;
if let ReportAggregationState::Finished(output_share) = report_share_data.report_aggregation.state()
{
accumulator.update(
aggregation_job.partial_batch_identifier(),
report_share_data.report_share.metadata().id(),
report_share_data.report_share.metadata().time(),
output_share,
)?;
}
}

prep_steps.push(PrepareStep::new(
*report_share_data.report_share.metadata().id(),
report_share_data.prep_result.clone(),
));
}

if !replayed_request {
accumulator.flush_to_datastore(tx, &vdaf).await?;
}
Ok(prep_steps)

Ok(Self::aggregation_job_resp_for(report_share_data.into_iter().map(|data| data.report_aggregation)))
})
})
.await?;

// Construct response and return.
Ok(AggregationJobResp::new(prep_steps))
.await?)
}

async fn handle_aggregate_continue_generic<
Expand Down Expand Up @@ -1662,9 +1643,7 @@ impl VdafOps {
}
}
}
return Self::replay_aggregation_job_round::<C, SEED_SIZE, Q, A>(
report_aggregations,
);
return Ok(Self::aggregation_job_resp_for(report_aggregations));
} else if helper_aggregation_job.round().increment()
!= leader_aggregation_job.round()
{
Expand All @@ -1685,14 +1664,14 @@ impl VdafOps {
// compute the next round of prepare messages and state.
Self::step_aggregation_job(
tx,
&task,
&vdaf,
task,
vdaf,
batch_aggregation_shard_count,
helper_aggregation_job,
report_aggregations,
&leader_aggregation_job,
leader_aggregation_job,
request_hash,
&aggregate_step_failure_counter,
aggregate_step_failure_counter,
)
.await
})
Expand Down
Loading