From 9c1457a7c8e694d9564aca63daa7d1ff0b393558 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Mon, 17 Apr 2023 17:21:18 -0700 Subject: [PATCH] Check DB schema version at startup Creating a `Datastore` now checks the version of the most recently applied migration script against an array of supported versions compiled into Janus. If the current schema is not supported, `Datastore::new` returns an error. Also adds a configuration option to disable this new feature. This is needed because the Janus interop binaries apply schema migrations at startup, after being handed control by `janus_main`. Resolves #1241 --- aggregator/src/aggregator.rs | 2 +- .../src/aggregator/aggregate_init_tests.rs | 2 +- .../aggregator/aggregation_job_continue.rs | 2 +- .../src/aggregator/aggregation_job_creator.rs | 10 +- .../src/aggregator/aggregation_job_driver.rs | 14 +- .../src/aggregator/collection_job_driver.rs | 8 +- .../src/aggregator/collection_job_tests.rs | 2 +- .../src/aggregator/garbage_collector.rs | 8 +- aggregator/src/aggregator/http_handlers.rs | 42 ++--- aggregator/src/bin/janus_cli.rs | 24 ++- aggregator/src/binary_utils.rs | 13 +- aggregator/src/config.rs | 11 +- aggregator/tests/graceful_shutdown.rs | 2 +- aggregator_api/src/lib.rs | 12 +- aggregator_core/src/datastore.rs | 161 +++++++++++++----- aggregator_core/src/datastore/test_util.rs | 16 +- integration_tests/src/janus.rs | 5 +- .../config/aggregation_job_creator.yaml | 1 + .../config/aggregation_job_driver.yaml | 1 + .../config/collection_job_driver.yaml | 1 + .../config/janus_interop_aggregator.yaml | 1 + 21 files changed, 230 insertions(+), 108 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 36d962451..e2a4d8a49 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -2548,7 +2548,7 @@ mod tests { .build(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_task(&task).await.unwrap(); diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index be7a73532..eb559b314 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -99,7 +99,7 @@ pub(super) async fn setup_aggregate_init_test() -> AggregationJobInitTestCase { let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_task(&task).await.unwrap(); diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 72a433c92..82f3f433f 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -417,7 +417,7 @@ mod tests { TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let report_generator = ReportShareGenerator::new( clock.clone(), diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 1c4913a59..79320381e 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -812,7 +812,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); // TODO(#234): consider using tokio::time::pause() to make time deterministic, and allow @@ -913,7 +913,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); const MIN_AGGREGATION_JOB_SIZE: usize = 50; const MAX_AGGREGATION_JOB_SIZE: usize = 60; @@ -1009,7 +1009,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); let task = Arc::new( TaskBuilder::new( @@ -1121,7 +1121,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); const MIN_AGGREGATION_JOB_SIZE: usize = 50; @@ -1240,7 +1240,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; const MAX_AGGREGATION_JOB_SIZE: usize = 10; diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index e0be308a5..ec2a940b6 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -925,7 +925,7 @@ mod tests { let clock = MockClock::default(); let mut runtime_manager = TestRuntimeManager::new(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( QueryType::TimeInterval, @@ -1149,7 +1149,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( @@ -1443,7 +1443,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( @@ -1675,7 +1675,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( @@ -1964,7 +1964,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( @@ -2235,7 +2235,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let task = TaskBuilder::new( @@ -2421,7 +2421,7 @@ mod tests { let clock = MockClock::default(); let mut runtime_manager = TestRuntimeManager::new(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let task = TaskBuilder::new( QueryType::TimeInterval, diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 21ac91332..2c082e60a 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -637,7 +637,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let time_precision = Duration::from_seconds(500); let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader) @@ -904,7 +904,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let (_, lease, collection_job) = setup_collection_job_test_case(&mut server, clock, Arc::clone(&ds), true).await; @@ -959,7 +959,7 @@ mod tests { let clock = MockClock::default(); let mut runtime_manager = TestRuntimeManager::new(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let (task, _, collection_job) = setup_collection_job_test_case(&mut server, clock.clone(), Arc::clone(&ds), false) @@ -1052,7 +1052,7 @@ mod tests { let mut server = mockito::Server::new_async().await; let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let (task, lease, collection_job) = setup_collection_job_test_case(&mut server, clock, Arc::clone(&ds), true).await; diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 242a8bcc2..3f68782eb 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -124,7 +124,7 @@ pub(crate) async fn setup_collection_job_test_case( .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_task(&task).await.unwrap(); diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index b90e5fb26..8bc7d0a8b 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -113,7 +113,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = dummy_vdaf::Vdaf::new(); // Setup. @@ -267,7 +267,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = dummy_vdaf::Vdaf::new(); // Setup. @@ -434,7 +434,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = dummy_vdaf::Vdaf::new(); // Setup. @@ -592,7 +592,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let ds = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = dummy_vdaf::Vdaf::new(); // Setup. diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 89028f4e0..a421c31c0 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -605,7 +605,7 @@ mod tests { let unknown_task_id: TaskId = random(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -720,7 +720,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -799,7 +799,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_task(&task).await.unwrap(); let report = create_report(&task, clock.now()); @@ -1028,7 +1028,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; let task = TaskBuilder::new( QueryType::TimeInterval, @@ -1089,7 +1089,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -1177,7 +1177,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -1274,7 +1274,7 @@ mod tests { TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = dummy_vdaf::Vdaf::new(); let verify_key: VerifyKey<0> = task.primary_vdaf_verify_key().unwrap(); @@ -1845,7 +1845,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; let hpke_key = task.current_hpke_key(); datastore.put_task(&task).await.unwrap(); @@ -1914,7 +1914,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; let hpke_key = task.current_hpke_key(); datastore.put_task(&task).await.unwrap(); @@ -1982,7 +1982,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -2049,7 +2049,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); let verify_key: VerifyKey = @@ -2365,7 +2365,7 @@ mod tests { let aggregation_job_id_0 = random(); let aggregation_job_id_1 = random(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(MockClock::default())); + let datastore = Arc::new(ephemeral_datastore.datastore(MockClock::default()).await); let first_batch_interval_clock = MockClock::default(); let second_batch_interval_clock = MockClock::new( first_batch_interval_clock @@ -2994,7 +2994,7 @@ mod tests { ); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); // Setup datastore. datastore @@ -3092,7 +3092,7 @@ mod tests { ); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); // Setup datastore. datastore @@ -3241,7 +3241,7 @@ mod tests { ); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; // Setup datastore. datastore @@ -3342,7 +3342,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; // Setup datastore. datastore @@ -3477,7 +3477,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; // Setup datastore. datastore @@ -3691,7 +3691,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -4390,7 +4390,7 @@ mod tests { TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader).build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -4451,7 +4451,7 @@ mod tests { .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(clock.clone()); + let datastore = ephemeral_datastore.datastore(clock.clone()).await; datastore.put_task(&task).await.unwrap(); @@ -4553,7 +4553,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone())); + let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); datastore.put_task(&task).await.unwrap(); diff --git a/aggregator/src/bin/janus_cli.rs b/aggregator/src/bin/janus_cli.rs index 0a8a3a162..88fa1446e 100644 --- a/aggregator/src/bin/janus_cli.rs +++ b/aggregator/src/bin/janus_cli.rs @@ -341,7 +341,9 @@ async fn datastore_from_opts( &kubernetes_secret_options .datastore_keys(&command_line_options.common_options, kube_client) .await?, + true, ) + .await } #[derive(Debug, Parser)] @@ -586,7 +588,12 @@ mod tests { #[tokio::test] async fn write_schema() { let ephemeral_datastore = ephemeral_datastore_no_schema().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = Datastore::new_without_supported_versions( + ephemeral_datastore.pool(), + ephemeral_datastore.crypter(), + RealClock::default(), + ) + .await; // Verify that the query we will run later returns an error if there is no database schema written. ds.run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) @@ -607,7 +614,12 @@ mod tests { #[tokio::test] async fn write_schema_dry_run() { let ephemeral_datastore = ephemeral_datastore_no_schema().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = Datastore::new_without_supported_versions( + ephemeral_datastore.pool(), + ephemeral_datastore.crypter(), + RealClock::default(), + ) + .await; ds.run_tx(|tx| Box::pin(async move { tx.get_tasks().await })) .await @@ -648,7 +660,7 @@ mod tests { #[tokio::test] async fn provision_tasks() { let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = ephemeral_datastore.datastore(RealClock::default()).await; let tasks = Vec::from([ TaskBuilder::new( @@ -682,7 +694,7 @@ mod tests { #[tokio::test] async fn provision_task_dry_run() { let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = ephemeral_datastore.datastore(RealClock::default()).await; let tasks = Vec::from([TaskBuilder::new( QueryType::TimeInterval, @@ -722,7 +734,7 @@ mod tests { ]); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = ephemeral_datastore.datastore(RealClock::default()).await; let mut tasks_file = NamedTempFile::new().unwrap(); tasks_file @@ -828,7 +840,7 @@ mod tests { "#; let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(RealClock::default()); + let ds = ephemeral_datastore.datastore(RealClock::default()).await; let mut tasks_file = NamedTempFile::new().unwrap(); tasks_file diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index c2f7bd4f3..61444a845 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -126,10 +126,11 @@ pub async fn database_pool(db_config: &DbConfig, db_password: Option<&str>) -> R /// Connects to a datastore, given a connection pool to the underlying database. `datastore_keys` /// is a list of AES-128-GCM keys, encoded in base64 with no padding, used to protect secret values /// stored in the datastore; it must not be empty. -pub fn datastore( +pub async fn datastore( pool: Pool, clock: C, datastore_keys: &[String], + check_schema_version: bool, ) -> Result> { let datastore_keys = datastore_keys .iter() @@ -150,7 +151,13 @@ pub fn datastore( return Err(anyhow!("datastore_keys is empty")); } - Ok(Datastore::new(pool, Crypter::new(datastore_keys), clock)) + let datastore = if check_schema_version { + Datastore::new(pool, Crypter::new(datastore_keys), clock).await? + } else { + Datastore::new_without_supported_versions(pool, Crypter::new(datastore_keys), clock).await + }; + + Ok(datastore) } /// Options for Janus binaries. @@ -303,7 +310,9 @@ where pool, clock.clone(), &options.common_options().datastore_keys, + config.common_config().database.check_schema_version, ) + .await .context("couldn't create datastore")?; let logging_config = config.common_config().logging_config.clone(); diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index 3eb54bcc0..b3a2e09d1 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -67,6 +67,10 @@ pub struct DbConfig { /// `deadpool_postgres::Timeouts` value. #[serde(default = "DbConfig::default_connection_pool_timeout")] pub connection_pool_timeouts_secs: u64, + /// If false, the program will not check whether the database's current + /// schema version is supported. + #[serde(default = "DbConfig::default_check_schema_version")] + pub check_schema_version: bool, // TODO(#231): add option for connecting to database over TLS, if necessary } @@ -74,6 +78,10 @@ impl DbConfig { fn default_connection_pool_timeout() -> u64 { 60 } + + fn default_check_schema_version() -> bool { + true + } } /// Non-secret configuration options for Janus Job Driver jobs. @@ -136,7 +144,8 @@ pub mod test_util { pub fn generate_db_config() -> DbConfig { DbConfig { url: Url::parse("postgres://postgres:postgres@localhost:5432/postgres").unwrap(), - connection_pool_timeouts_secs: 60, + connection_pool_timeouts_secs: DbConfig::default_connection_pool_timeout(), + check_schema_version: DbConfig::default_check_schema_version(), } } diff --git a/aggregator/tests/graceful_shutdown.rs b/aggregator/tests/graceful_shutdown.rs index 90f1d4d59..0f02f0aa0 100644 --- a/aggregator/tests/graceful_shutdown.rs +++ b/aggregator/tests/graceful_shutdown.rs @@ -110,7 +110,7 @@ async fn graceful_shutdown(binary: &Path, mut config: Mapping) { // This datastore will be used indirectly by the child process, which // will connect to its backing database separately. let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(RealClock::default()); + let datastore = ephemeral_datastore.datastore(RealClock::default()).await; let health_check_port = select_open_port().await.unwrap(); let health_check_listen_address = SocketAddr::from((Ipv4Addr::LOCALHOST, health_check_port)); diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index 737922810..16ef9e093 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -406,7 +406,7 @@ mod tests { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; let handler = aggregator_api_handler( - ephemeral_datastore.datastore(MockClock::default()), + ephemeral_datastore.datastore(MockClock::default()).await, Config { auth_tokens: Vec::from([SecretBytes::new(AUTH_TOKEN.as_bytes().to_vec())]), }, @@ -419,7 +419,7 @@ mod tests { async fn get_task_ids() { // Setup: write a few tasks to the datastore. let (handler, ephemeral_datastore) = setup_api_test().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let mut task_ids: Vec<_> = ds .run_tx(|tx| { @@ -497,7 +497,7 @@ mod tests { async fn post_task() { // Setup: create a datastore & handler. let (handler, ephemeral_datastore) = setup_api_test().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; // Verify: posting a task creates a new task which matches the request. let req = PostTaskReq { @@ -581,7 +581,7 @@ mod tests { async fn get_task() { // Setup: write a task to the datastore. let (handler, ephemeral_datastore) = setup_api_test().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader).build(); @@ -638,7 +638,7 @@ mod tests { async fn delete_task() { // Setup: write a task to the datastore. let (handler, ephemeral_datastore) = setup_api_test().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task_id = ds .run_tx(|tx| { @@ -711,7 +711,7 @@ mod tests { const REPORT_AGGREGATION_COUNT: usize = 4; let (handler, ephemeral_datastore) = setup_api_test().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task_id = ds .run_tx(|tx| { Box::pin(async move { diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 03c237d98..81be1b816 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -41,7 +41,7 @@ use ring::aead::{self, LessSafeKey, AES_128_GCM}; use std::{ collections::HashMap, convert::TryFrom, - fmt::Display, + fmt::{Debug, Display}, future::Future, io::Cursor, mem::size_of, @@ -61,6 +61,10 @@ pub mod test_util; // TODO(#196): retry network-related & other transient failures once we know what they look like +/// List of schema versions that this version of Janus can safely run on. If any other schema +/// version is seen, [`Datastore::new`] fails. +const SUPPORTED_SCHEMA_VERSIONS: &[i64] = &[20230405185602, 20230417204528]; + /// Datastore represents a datastore for Janus, with support for transactional reads and writes. /// In practice, Datastore instances are currently backed by a PostgreSQL database. pub struct Datastore { @@ -72,10 +76,52 @@ pub struct Datastore { transaction_duration_histogram: Histogram, } +impl Debug for Datastore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Datastore") + } +} + impl Datastore { - /// new creates a new Datastore using the given Client for backing storage. It is assumed that - /// the Client is connected to a database with a compatible version of the Janus database schema. - pub fn new(pool: deadpool_postgres::Pool, crypter: Crypter, clock: C) -> Datastore { + /// `new` creates a new Datastore using the provided connection pool. An error is returned if + /// the current database migration version is not supported by this version of Janus. + pub async fn new( + pool: deadpool_postgres::Pool, + crypter: Crypter, + clock: C, + ) -> Result, Error> { + Self::new_with_supported_versions(pool, crypter, clock, SUPPORTED_SCHEMA_VERSIONS).await + } + + async fn new_with_supported_versions( + pool: deadpool_postgres::Pool, + crypter: Crypter, + clock: C, + supported_schema_versions: &[i64], + ) -> Result, Error> { + let datastore = Self::new_without_supported_versions(pool, crypter, clock).await; + + let (current_version, migration_description) = datastore + .run_tx_with_name("check schema version", |tx| { + Box::pin(async move { tx.get_current_schema_migration_version().await }) + }) + .await?; + + if !supported_schema_versions.contains(¤t_version) { + return Err(Error::DbState(format!( + "unsupported schema version {current_version} / {migration_description}" + ))); + } + + Ok(datastore) + } + + /// Creates a new datastore using the provided connection pool. + pub async fn new_without_supported_versions( + pool: deadpool_postgres::Pool, + crypter: Crypter, + clock: C, + ) -> Datastore { let meter = opentelemetry::global::meter("janus_aggregator"); let transaction_status_counter = meter .u64_counter("janus_database_transactions_total") @@ -312,6 +358,23 @@ impl Transaction<'_, C> { self.retry.store(true, Ordering::Relaxed); } + /// Returns the current schema version of the datastore and the description of the migration + /// script that applied it. + async fn get_current_schema_migration_version(&self) -> Result<(i64, String), Error> { + let stmt = self + .prepare_cached( + "SELECT version, description FROM _sqlx_migrations + WHERE success = TRUE ORDER BY version DESC LIMIT(1)", + ) + .await?; + let row = self.query_one(&stmt, &[]).await?; + + let version = row.try_get("version")?; + let description = row.try_get("description")?; + + Ok((version, description)) + } + /// Writes a task into the datastore. #[tracing::instrument(skip(self, task), fields(task_id = ?task.id()), err)] pub async fn put_task(&self, task: &Task) -> Result<(), Error> { @@ -5384,11 +5447,27 @@ mod tests { time::Duration as StdDuration, }; + #[tokio::test] + async fn reject_unsupported_schema_version() { + install_test_trace_subscriber(); + let ephemeral_datastore = ephemeral_datastore().await; + let error = Datastore::new_with_supported_versions( + ephemeral_datastore.pool(), + ephemeral_datastore.crypter(), + MockClock::default(), + &[0], + ) + .await + .unwrap_err(); + + assert_matches!(error, Error::DbState(_)); + } + #[tokio::test] async fn roundtrip_task() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; // Insert tasks, check that they can be retrieved by ID. let mut want_tasks = HashMap::new(); @@ -5501,7 +5580,7 @@ mod tests { async fn get_task_metrics() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { Box::pin(async move { @@ -5636,7 +5715,7 @@ mod tests { async fn get_task_ids() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { Box::pin(async move { @@ -5676,7 +5755,7 @@ mod tests { async fn roundtrip_report() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, @@ -5777,7 +5856,7 @@ mod tests { async fn report_not_found() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let rslt = ds .run_tx(|tx| { @@ -5800,7 +5879,7 @@ mod tests { async fn get_unaggregated_client_report_ids_for_task() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let time_precision = Duration::from_seconds(1000); let when = MockClock::default() @@ -5958,7 +6037,7 @@ mod tests { async fn get_unaggregated_client_report_ids_with_agg_param_for_task() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, @@ -6220,7 +6299,7 @@ mod tests { async fn count_client_reports_for_interval() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, @@ -6331,7 +6410,7 @@ mod tests { async fn count_client_reports_for_batch_id() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::FixedSize { max_batch_size: 10 }, @@ -6469,7 +6548,7 @@ mod tests { async fn roundtrip_report_share() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, @@ -6565,7 +6644,7 @@ mod tests { async fn roundtrip_aggregation_job() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; // We use a dummy VDAF & fixed-size task for this test, to better exercise the // serialization/deserialization roundtrip of the batch_identifier & aggregation_param. @@ -6746,7 +6825,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; const AGGREGATION_JOB_COUNT: usize = 10; let task = TaskBuilder::new( @@ -7032,7 +7111,7 @@ mod tests { async fn aggregation_job_not_found() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let rslt = ds .run_tx(|tx| { @@ -7078,7 +7157,7 @@ mod tests { // Setup. install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; // We use a dummy VDAF & fixed-size task for this test, to better exercise the // serialization/deserialization roundtrip of the batch_identifier & aggregation_param. @@ -7178,7 +7257,7 @@ mod tests { async fn roundtrip_report_aggregation() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let report_id = ReportId::from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); @@ -7329,7 +7408,7 @@ mod tests { async fn check_report_aggregation_exists() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, VdafInstance::Fake, @@ -7454,7 +7533,7 @@ mod tests { async fn report_aggregation_not_found() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let vdaf = Arc::new(dummy_vdaf::Vdaf::default()); @@ -7499,7 +7578,7 @@ mod tests { async fn get_report_aggregations_for_aggregation_job() { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let report_id = ReportId::from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]); let vdaf = Arc::new(Prio3::new_count(2).unwrap()); @@ -7666,7 +7745,7 @@ mod tests { let aggregation_param = AggregationParam(13); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { let task = task.clone(); @@ -7764,7 +7843,7 @@ mod tests { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; let task = TaskBuilder::new( task::QueryType::TimeInterval, @@ -8026,7 +8105,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8163,7 +8242,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8289,7 +8368,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let other_task_id = random(); @@ -8340,7 +8419,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8394,7 +8473,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8458,7 +8537,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8522,7 +8601,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([ @@ -8609,7 +8688,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8764,7 +8843,7 @@ mod tests { install_test_trace_subscriber(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = random(); let reports = Vec::from([LeaderStoredReport::new_dummy( @@ -8902,7 +8981,7 @@ mod tests { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { Box::pin(async move { @@ -9119,7 +9198,7 @@ mod tests { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { Box::pin(async move { @@ -9249,7 +9328,7 @@ mod tests { install_test_trace_subscriber(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(MockClock::default()); + let ds = ephemeral_datastore.datastore(MockClock::default()).await; ds.run_tx(|tx| { Box::pin(async move { @@ -9359,7 +9438,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let (task_id, batch_id) = ds .run_tx(|tx| { @@ -9647,7 +9726,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); // Setup. @@ -9778,7 +9857,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; let vdaf = dummy_vdaf::Vdaf::new(); // Setup. @@ -10522,7 +10601,7 @@ mod tests { let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; - let ds = ephemeral_datastore.datastore(clock.clone()); + let ds = ephemeral_datastore.datastore(clock.clone()).await; // Setup. async fn write_collect_artifacts( @@ -11074,7 +11153,7 @@ mod tests { #[tokio::test] async fn roundtrip_interval_sql() { let ephemeral_datastore = ephemeral_datastore().await; - let datastore = ephemeral_datastore.datastore(MockClock::default()); + let datastore = ephemeral_datastore.datastore(MockClock::default()).await; datastore .run_tx(|tx| { diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 54c25c4fd..fd28858f3 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -102,11 +102,10 @@ pub struct EphemeralDatastore { impl EphemeralDatastore { /// Creates a Datastore instance based on this EphemeralDatastore. All returned Datastore /// instances will refer to the same underlying durable state. - pub fn datastore(&self, clock: C) -> Datastore { - let datastore_key = - LessSafeKey::new(UnboundKey::new(&AES_128_GCM, &self.datastore_key_bytes).unwrap()); - let crypter = Crypter::new(Vec::from([datastore_key])); - Datastore::new(self.pool(), crypter, clock) + pub async fn datastore(&self, clock: C) -> Datastore { + Datastore::new(self.pool(), self.crypter(), clock) + .await + .unwrap() } /// Retrieves the connection pool used for this EphemeralDatastore. Typically, this would be @@ -124,6 +123,13 @@ impl EphemeralDatastore { pub fn datastore_key_bytes(&self) -> &[u8] { &self.datastore_key_bytes } + + /// Construct a [`Crypter`] for managing encrypted values in this datastore. + pub fn crypter(&self) -> Crypter { + let datastore_key = + LessSafeKey::new(UnboundKey::new(&AES_128_GCM, &self.datastore_key_bytes).unwrap()); + Crypter::new(Vec::from([datastore_key])) + } } /// Creates a new, empty EphemeralDatastore with no schema applied. Almost all uses will want to diff --git a/integration_tests/src/janus.rs b/integration_tests/src/janus.rs index 4ef64b442..08637037d 100644 --- a/integration_tests/src/janus.rs +++ b/integration_tests/src/janus.rs @@ -144,6 +144,7 @@ impl Janus<'static> { )) .unwrap(), connection_pool_timeouts_secs: 60, + check_schema_version: true, }, None, ) @@ -155,7 +156,9 @@ impl Janus<'static> { // depends on this task being defined will likely time out or otherwise fail. // This should become more robust in the future when we implement dynamic task provisioning // (#44). - let datastore = datastore(pool, RealClock::default(), &[datastore_key]).unwrap(); + let datastore = datastore(pool, RealClock::default(), &[datastore_key], true) + .await + .unwrap(); datastore.put_task(task).await.unwrap(); let aggregator_port_forward = cluster diff --git a/interop_binaries/config/aggregation_job_creator.yaml b/interop_binaries/config/aggregation_job_creator.yaml index b007de147..a5d724e75 100644 --- a/interop_binaries/config/aggregation_job_creator.yaml +++ b/interop_binaries/config/aggregation_job_creator.yaml @@ -3,6 +3,7 @@ # tests, and is not intended for production use. database: url: postgres://postgres@127.0.0.1:5432/postgres + check_schema_version: false health_check_listen_address: 0.0.0.0:8001 logging_config: force_json_output: true diff --git a/interop_binaries/config/aggregation_job_driver.yaml b/interop_binaries/config/aggregation_job_driver.yaml index 431b28d25..ed87b743f 100644 --- a/interop_binaries/config/aggregation_job_driver.yaml +++ b/interop_binaries/config/aggregation_job_driver.yaml @@ -3,6 +3,7 @@ # tests, and is not intended for production use. database: url: postgres://postgres@127.0.0.1:5432/postgres + check_schema_version: false health_check_listen_address: 0.0.0.0:8002 logging_config: force_json_output: true diff --git a/interop_binaries/config/collection_job_driver.yaml b/interop_binaries/config/collection_job_driver.yaml index bd10804b3..7d3cffc6d 100644 --- a/interop_binaries/config/collection_job_driver.yaml +++ b/interop_binaries/config/collection_job_driver.yaml @@ -3,6 +3,7 @@ # tests, and is not intended for production use. database: url: postgres://postgres@127.0.0.1:5432/postgres + check_schema_version: false health_check_listen_address: 0.0.0.0:8003 logging_config: force_json_output: true diff --git a/interop_binaries/config/janus_interop_aggregator.yaml b/interop_binaries/config/janus_interop_aggregator.yaml index 6a882bcd6..27ceeeb38 100644 --- a/interop_binaries/config/janus_interop_aggregator.yaml +++ b/interop_binaries/config/janus_interop_aggregator.yaml @@ -2,6 +2,7 @@ # image. database: url: postgres://postgres@127.0.0.1:5432/postgres + check_schema_version: false health_check_listen_address: 0.0.0.0:8000 logging_config: force_json_output: true