Skip to content

Commit

Permalink
Don't aggregate collected time intervals
Browse files Browse the repository at this point in the history
In time interval tasks, the leader should not schedule aggregation jobs
that include reports whose timestamp falls into the time interval of a
collected batch. With this commit, `aggregation_job_creator` now checks
whether an unaggregated report falls into the time interval of a
collection job that is in the finished, deleted or abandoned state, or a
collection job in the start state whose lease has been required (which
is to say, it is currently or has previously been stepped by
`collection_job_driver`).

Solving this requires us to be clear on when Janus commits a set of
reports to a collection. Perhaps surprisingly, this doesn't happen at
the time of creating a collection job (i.e., handling `PUT
/tasks/{task-id}/collection_jobs/{collection-job-id}`), but rather the
first time that `collection_job_driver` tries to step the job (because
it'll query the database to see what report aggregations are finished
which match the query). So it's from that point that Janus leader will
refuse to aggregate more reports for the time interval.

This doesn't quite square with what DAP says. DAP-04 section 4.5.2 says
([1]):

> Once an AggregateShareReq has been issued for the batch determined by
> a given query, it is an error for the Leader to issue any more
> aggregation jobs for additional reports that satisfy the query.

Additionally section 4.4.1.4 tells us:

> If the report pertains to a batch that was previously collected, then
> make sure the report was already included in all previous collections
> for the batch. If not, the input share MUST be marked as invalid with
> error "batch_collected". [...] *The Leader considers a batch to be
> collected once it has completed a collection job for a CollectionReq
> message from the Collector*; the Helper considers a batch to be
> collected once it has responded to an AggregateShareReq message from
> the Leader.

(emphasis mine)

A strict reading of 4.4.1.4 suggests that the leader could admit new
reports into a collection time interval right up until it finishes a
collection job and sends its results to the collector, but in fact
sending `AggregateShareReq` to the helper commits a set of reports to a
collection.

[1]: https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-04#section-4.5.2-16
[2]: https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-04#section-4.4.1.4-3.7.1
  • Loading branch information
tgeoghegan committed Mar 31, 2023
1 parent 50c67bb commit c03a715
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 17 deletions.
243 changes: 235 additions & 8 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,30 +764,35 @@ mod tests {
use futures::{future::try_join_all, TryFutureExt};
use janus_aggregator_core::{
datastore::{
models::{AggregationJob, CollectionJob, CollectionJobState, LeaderStoredReport},
test_util::ephemeral_datastore,
Transaction,
models::{
AggregationJob, CollectionJob, CollectionJobState, LeaderStoredReport,
ReportAggregation, ReportAggregationState,
},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore, Transaction,
},
query_type::AccumulableQueryType,
task::{test_util::TaskBuilder, QueryType as TaskQueryType},
task::{test_util::TaskBuilder, QueryType as TaskQueryType, Task},
};
use janus_core::{
task::{VdafInstance, PRIO3_VERIFY_KEY_LENGTH},
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
install_test_trace_subscriber, run_vdaf,
},
time::{Clock, MockClock, TimeExt},
time::{Clock, IntervalExt, MockClock, TimeExt},
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
AggregationJobId, AggregationJobRound, Interval, ReportId, Role, TaskId, Time,
AggregationJobId, AggregationJobRound, HpkeCiphertext, HpkeConfigId, Interval, ReportId,
ReportMetadata, Role, TaskId, Time,
};
use prio::{
codec::ParameterizedDecode,
field::Field64,
vdaf::{
prio3::{Prio3, Prio3Count},
Aggregator,
AggregateShare, Aggregator, OutputShare,
},
};
use rand::random;
Expand Down Expand Up @@ -1000,6 +1005,228 @@ mod tests {
assert_eq!(all_report_ids, seen_report_ids);
}

struct CollectedIntervalTestCase {
task: Arc<Task>,
reports: Vec<LeaderStoredReport<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>>,
_ephemeral_datastore: EphemeralDatastore,
datastore: Datastore<MockClock>,
}

impl CollectedIntervalTestCase {
const MIN_AGGREGATION_JOB_SIZE: usize = 50;
const MAX_AGGREGATION_JOB_SIZE: usize = 60;

async fn setup(
collection_job_state: CollectionJobState<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>,
) -> Self {
// Setup.
install_test_trace_subscriber();
let clock = MockClock::default();
let ephemeral_datastore = ephemeral_datastore().await;
let datastore = ephemeral_datastore.datastore(clock.clone());
let task = Arc::new(
TaskBuilder::new(
TaskQueryType::TimeInterval,
VdafInstance::Prio3Count,
Role::Leader,
)
.build(),
);
let vdaf = Prio3Count::new_count(2).unwrap();
let verify_key = task.primary_vdaf_verify_key().unwrap();

// Create enough reports for an aggregation job
let report_time = clock.now();
let reports: Vec<LeaderStoredReport<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>> =
iter::repeat_with(|| {
let report_id = random();
let transcript = run_vdaf(&vdaf, verify_key.as_bytes(), &(), &report_id, &1);
LeaderStoredReport::new(
*task.id(),
ReportMetadata::new(report_id, report_time),
transcript.public_share,
Vec::new(),
transcript.input_shares[0].clone(),
HpkeCiphertext::new(HpkeConfigId::from(13), Vec::new(), Vec::new()),
)
})
.take(Self::MIN_AGGREGATION_JOB_SIZE)
.collect();

// Create a collection job whose time interval includes all the reports
let collection_job =
CollectionJob::<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>::new(
*task.id(),
random(),
Interval::from_time(&report_time).unwrap(),
(),
collection_job_state,
);

datastore
.run_tx(|tx| {
let (vdaf, task, reports, collection_job) = (
vdaf.clone(),
task.clone(),
reports.clone(),
collection_job.clone(),
);
Box::pin(async move {
tx.put_task(&task).await.unwrap();
for report in reports {
tx.put_client_report(&vdaf, &report).await.unwrap();
}
tx.put_collection_job(&collection_job).await.unwrap();

Ok(())
})
})
.await
.unwrap();

Self {
task,
_ephemeral_datastore: ephemeral_datastore,
reports,
datastore,
}
}

async fn run(
self,
) -> Vec<AggregationJob<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>> {
// Run.
let job_creator = Arc::new(AggregationJobCreator {
datastore: self.datastore,
tasks_update_frequency: Duration::from_secs(3600),
aggregation_job_creation_interval: Duration::from_secs(1),
min_aggregation_job_size: Self::MIN_AGGREGATION_JOB_SIZE,
max_aggregation_job_size: Self::MAX_AGGREGATION_JOB_SIZE,
});
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&self.task))
.await
.unwrap();

// Verify.
job_creator
.datastore
.run_tx(|tx| {
let task_id = *self.task.id();
Box::pin(async move {
Ok(tx.get_aggregation_jobs_for_task(&task_id).await.unwrap())
})
})
.await
.unwrap()
}
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_started_no_lease() {
// If the collection job has been created but never leased, then we can still run new
// aggregation jobs.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Start).await;
assert!(!test_case.run().await.is_empty());
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_started_with_lease()
{
// If the collection job has been leased, then we've committed to a set of reports and may
// create no additional aggregation jobs for the interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Start).await;

// Populate datastore with necessary report aggregations and aggregation job so that the
// collection job will be acquirable.
let aggregation_job_id = test_case
.datastore
.run_tx(|tx| {
let (task_id, reports) = (*test_case.task.id(), test_case.reports.clone());
Box::pin(async move {
let aggregation_job =
AggregationJob::<PRIO3_VERIFY_KEY_LENGTH, TimeInterval, Prio3Count>::new(
task_id,
random(),
(),
(),
Interval::from_time(reports[0].metadata().time()).unwrap(),
janus_aggregator_core::datastore::models::AggregationJobState::Finished,
AggregationJobRound::from(1),
);

tx.put_aggregation_job(&aggregation_job).await.unwrap();

for (ord, report) in reports.iter().enumerate() {
let report_aggregation =
ReportAggregation::<PRIO3_VERIFY_KEY_LENGTH, Prio3Count>::new(
task_id,
*aggregation_job.id(),
*report.metadata().id(),
*report.metadata().time(),
ord as u64,
ReportAggregationState::Start,
);
tx.put_report_aggregation(&report_aggregation)
.await
.unwrap();
}

let leases = tx
.acquire_incomplete_time_interval_collection_jobs(
&Duration::from_secs(60),
10,
)
.await
.unwrap();
assert!(!leases.is_empty());

Ok(*aggregation_job.id())
})
})
.await
.unwrap();

let aggregation_jobs = test_case.run().await;
assert_eq!(aggregation_jobs.len(), 1);
assert_eq!(aggregation_jobs[0].id(), &aggregation_job_id);
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_finished() {
// If the collection job is finished, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Finished {
report_count: 1,
encrypted_helper_aggregate_share: HpkeCiphertext::new(
HpkeConfigId::from(12),
Vec::new(),
Vec::new(),
),
leader_aggregate_share: AggregateShare::from(OutputShare::from(Vec::from([
Field64::from(7),
]))),
})
.await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_abandoned() {
// If the collection job is abandoned, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Abandoned).await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_collected_interval_deleted() {
// If the collection job is deleted, we may create no additional aggregation jobs for the
// interval.
let test_case = CollectedIntervalTestCase::setup(CollectionJobState::Deleted).await;
assert!(test_case.run().await.is_empty())
}

#[tokio::test]
async fn create_aggregation_jobs_for_time_interval_task_not_enough_reports() {
// Setup.
Expand Down
32 changes: 23 additions & 9 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,13 @@ impl<C: Clock> Transaction<'_, C> {
}

/// `get_unaggregated_client_report_ids_for_task` returns some report IDs corresponding to
/// unaggregated client reports for the task identified by the given task ID. Returned client
/// reports are marked as aggregation-started: the caller must either create an aggregation job
/// with, or call `mark_reports_unaggregated` on each returned report as part of the same
/// transaction.
/// unaggregated client reports for the task identified by the given task ID. Reports whose
/// timestamps fall into an interval covered by a collection job that has been acquired or is
/// finished, abandoned or deleted are not returned.
///
/// Returned client reports are marked as aggregation-started: the caller must either create an
/// aggregation job with, or call `mark_reports_unaggregated` on each returned report as part of
/// the same transaction.
///
/// This should only be used with VDAFs that have an aggregation parameter of the unit type. It
/// relies on this assumption to find relevant reports without consulting collection jobs. For
Expand All @@ -1035,11 +1038,22 @@ impl<C: Clock> Transaction<'_, C> {
.prepare_cached(
"UPDATE client_reports SET aggregation_started = TRUE
WHERE id IN (
SELECT id FROM client_reports
WHERE task_id = (SELECT id FROM tasks WHERE task_id = $1)
AND aggregation_started = FALSE
FOR UPDATE SKIP LOCKED
LIMIT 5000
SELECT client_reports.id FROM client_reports LEFT JOIN collection_jobs
ON client_reports.task_id = collection_jobs.task_id
AND collection_jobs.batch_interval @> client_reports.client_timestamp
WHERE client_reports.task_id = (SELECT tasks.id FROM tasks WHERE tasks.task_id = $1)
AND client_reports.aggregation_started = FALSE
AND (
-- select reports that are not yet associated with a collect job
collection_jobs.state IS NULL
-- select reports whose collect job has been scheduled but never yet leased
OR (
collection_jobs.state = 'START'
AND collection_jobs.lease_expiry = TIMESTAMP '-infinity'
)
)
FOR UPDATE OF client_reports SKIP LOCKED
LIMIT 5000
)
RETURNING report_id, client_timestamp",
)
Expand Down

0 comments on commit c03a715

Please sign in to comment.