From e06db1384ace258d90dfdb5959d6dce55063ce04 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 5 Jan 2023 20:32:45 -0500 Subject: [PATCH] adapter: deoptionalize oracle_write_ts in Catalog::transact The conditions under which oracle_write_ts was allowed to be None in calls to Catalog::transact were not documented. Make the API more typesafe by removing the option, as an oracle-derived write timestamp is available in all production codepaths. --- src/adapter/benches/catalog.rs | 2 +- src/adapter/src/catalog.rs | 22 +++++---- src/adapter/src/catalog/storage.rs | 76 +++++++++++++++++++----------- src/adapter/src/coord/ddl.rs | 2 +- src/adapter/tests/sql.rs | 2 +- 5 files changed, 64 insertions(+), 40 deletions(-) diff --git a/src/adapter/benches/catalog.rs b/src/adapter/benches/catalog.rs index d8e31d30aa3a..ca6008f34674 100644 --- a/src/adapter/benches/catalog.rs +++ b/src/adapter/benches/catalog.rs @@ -125,7 +125,7 @@ fn bench_transact(c: &mut Criterion) { public_schema_oid: id, }]; catalog - .transact(Some(mz_repr::Timestamp::MIN), None, ops, |_| Ok(())) + .transact(mz_repr::Timestamp::MIN, None, ops, |_| Ok(())) .await .unwrap(); }) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index febe9840695a..cd3d7748eae8 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1251,7 +1251,7 @@ impl CatalogState { // passing tx, session, and builtin_table_updates? fn add_to_audit_log( &self, - oracle_write_ts: Option, + oracle_write_ts: mz_repr::Timestamp, session: Option<&Session>, tx: &mut storage::Transaction, builtin_table_updates: &mut Vec, @@ -1261,7 +1261,7 @@ impl CatalogState { details: EventDetails, ) -> Result<(), Error> { let user = session.map(|session| session.user().name.to_string()); - let occurred_at = oracle_write_ts.expect("must exist").into(); + let occurred_at = oracle_write_ts.into(); let id = tx.get_and_increment_id(storage::AUDIT_LOG_ID_ALLOC_KEY.to_string())?; let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at); builtin_table_updates.push(self.pack_audit_log_update(&event)?); @@ -2739,7 +2739,12 @@ impl Catalog { bootstrap_system_parameters: BTreeMap, system_parameter_frontend: Option>, ) -> Result<(), AdapterError> { - let system_config = self.storage().await.load_system_configuration().await?; + let (system_config, boot_ts) = { + let mut storage = self.storage().await; + let system_config = storage.load_system_configuration().await?; + let boot_ts = storage.boot_ts(); + (system_config, boot_ts) + }; for (name, value) in &bootstrap_system_parameters { if !system_config.contains_key(name) { self.state.insert_system_configuration(name, value)?; @@ -2810,8 +2815,7 @@ impl Catalog { Op::UpdateSystemConfiguration { name, value } })) .collect::>(); - - self.transact(None, None, ops, |_| Ok(())).await.unwrap(); + self.transact(boot_ts, None, ops, |_| Ok(())).await.unwrap(); tracing::info!("parameter sync on boot: end sync"); } else { tracing::info!("parameter sync on boot: skipping sync as config has synced once"); @@ -3850,7 +3854,7 @@ impl Catalog { #[tracing::instrument(level = "debug", skip_all)] pub async fn transact( &mut self, - oracle_write_ts: Option, + oracle_write_ts: mz_repr::Timestamp, session: Option<&Session>, ops: Vec, f: F, @@ -3915,7 +3919,7 @@ impl Catalog { } fn transact_inner( - oracle_write_ts: Option, + oracle_write_ts: mz_repr::Timestamp, session: Option<&Session>, ops: Vec, temporary_ids: Vec, @@ -6428,7 +6432,7 @@ mod tests { assert_eq!(catalog.transient_revision(), 1); catalog .transact( - Some(mz_repr::Timestamp::MIN), + mz_repr::Timestamp::MIN, None, vec![Op::CreateDatabase { name: "test".to_string(), @@ -6681,7 +6685,7 @@ mod tests { .clone(); catalog .transact( - Some(mz_repr::Timestamp::MIN), + mz_repr::Timestamp::MIN, None, vec![Op::CreateItem { id, diff --git a/src/adapter/src/catalog/storage.rs b/src/adapter/src/catalog/storage.rs index 9b8e9ff49f48..b7c22388785b 100644 --- a/src/adapter/src/catalog/storage.rs +++ b/src/adapter/src/catalog/storage.rs @@ -529,6 +529,7 @@ pub struct BootstrapArgs { pub struct Connection { stash: S, consolidations_tx: mpsc::UnboundedSender>, + boot_ts: mz_repr::Timestamp, } impl Connection { @@ -555,29 +556,46 @@ impl Connection { // Initialize connection. initialize_stash(&mut stash).await?; + + // Choose a time at which to boot. This is the time at which we will run + // internal migrations, and is also exposed upwards in case higher + // layers want to run their own migrations at the same timestamp. + // + // This time is usually the current system time, but with protection + // against backwards time jumps, even across restarts. + let previous_now_ts = try_get_persisted_timestamp(&mut stash, &Timeline::EpochMilliseconds) + .await? + .unwrap_or(mz_repr::Timestamp::MIN); + let boot_ts = timeline::monotonic_now(now, previous_now_ts); + let mut conn = Connection { stash, consolidations_tx, + boot_ts, }; if !conn.stash.is_readonly() { - // Choose a time at which to apply migrations. This is usually the - // current system time, but with protection against backwards time - // jumps, even across restarts. - let previous_now_ts = conn - .try_get_persisted_timestamp(&Timeline::EpochMilliseconds) - .await? - .unwrap_or(mz_repr::Timestamp::MIN); - let now_ts = timeline::monotonic_now(now, previous_now_ts); // IMPORTANT: we durably record the new timestamp before using it. - conn.persist_timestamp(&Timeline::EpochMilliseconds, now_ts) + conn.persist_timestamp(&Timeline::EpochMilliseconds, boot_ts) .await?; - - migrate(&mut conn.stash, skip, now_ts.into(), bootstrap_args).await?; + migrate(&mut conn.stash, skip, boot_ts.into(), bootstrap_args).await?; } Ok(conn) } + + /// Returns the timestamp at which the storage layer booted. + /// + /// This is the timestamp that will have been used to write any data during + /// migrations. It is exposed so that higher layers performing their own + /// migrations can write data at the same timestamp, if desired. + /// + /// The boot timestamp is derived from the durable timestamp oracle and is + /// guaranteed to never go backwards, even in the face of backwards time + /// jumps across restarts. + pub fn boot_ts(&self) -> mz_repr::Timestamp { + self.boot_ts + } } impl Connection { @@ -917,23 +935,6 @@ impl Connection { .collect()) } - /// Get a global timestamp for a timeline that has been persisted to disk. - /// - /// Returns `None` if no persisted timestamp for the specified timeline - /// exists. - pub async fn try_get_persisted_timestamp( - &mut self, - timeline: &Timeline, - ) -> Result, Error> { - let key = TimestampKey { - id: timeline.to_string(), - }; - Ok(COLLECTION_TIMESTAMP - .peek_key_one(&mut self.stash, &key) - .await? - .map(|v| v.ts)) - } - /// Persist new global timestamp for a timeline to disk. #[tracing::instrument(level = "debug", skip(self))] pub async fn persist_timestamp( @@ -971,6 +972,25 @@ impl Connection { } } +/// Gets a global timestamp for a timeline that has been persisted to disk. +/// +/// Returns `None` if no persisted timestamp for the specified timeline exists. +async fn try_get_persisted_timestamp( + stash: &mut S, + timeline: &Timeline, +) -> Result, Error> +where + S: Append, +{ + let key = TimestampKey { + id: timeline.to_string(), + }; + Ok(COLLECTION_TIMESTAMP + .peek_key_one(stash, &key) + .await? + .map(|v| v.ts)) +} + #[tracing::instrument(level = "trace", skip_all)] pub async fn transaction<'a, S: Append>(stash: &'a mut S) -> Result, Error> { let databases = COLLECTION_DATABASE.peek_one(stash).await?; diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 8d62d26c771d..c895bf8aa9f5 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -234,7 +234,7 @@ impl Coordinator { result, } = self .catalog - .transact(Some(oracle_write_ts), session, ops, |catalog| { + .transact(oracle_write_ts, session, ops, |catalog| { f(CatalogTxn { dataflow_client: &self.controller, catalog, diff --git a/src/adapter/tests/sql.rs b/src/adapter/tests/sql.rs index be389355b190..67cc3b666f27 100644 --- a/src/adapter/tests/sql.rs +++ b/src/adapter/tests/sql.rs @@ -139,7 +139,7 @@ async fn datadriven() { .clone(); catalog .transact( - Some(mz_repr::Timestamp::MIN), + mz_repr::Timestamp::MIN, None, vec![Op::CreateItem { id,