Skip to content

Commit

Permalink
adapter: deoptionalize oracle_write_ts in Catalog::transact
Browse files Browse the repository at this point in the history
The conditions under which oracle_write_ts was allowed to be None in
calls to Catalog::transact were not documented. Make the API more
typesafe by removing the option, as an oracle-derived write timestamp is
available in all production codepaths.
  • Loading branch information
benesch committed Jan 7, 2023
1 parent c6a8756 commit e06db13
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/adapter/benches/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn bench_transact(c: &mut Criterion) {
public_schema_oid: id,
}];
catalog
.transact(Some(mz_repr::Timestamp::MIN), None, ops, |_| Ok(()))
.transact(mz_repr::Timestamp::MIN, None, ops, |_| Ok(()))
.await
.unwrap();
})
Expand Down
22 changes: 13 additions & 9 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ impl CatalogState {
// passing tx, session, and builtin_table_updates?
fn add_to_audit_log<S: Append>(
&self,
oracle_write_ts: Option<mz_repr::Timestamp>,
oracle_write_ts: mz_repr::Timestamp,
session: Option<&Session>,
tx: &mut storage::Transaction<S>,
builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
Expand All @@ -1261,7 +1261,7 @@ impl CatalogState {
details: EventDetails,
) -> Result<(), Error> {
let user = session.map(|session| session.user().name.to_string());
let occurred_at = oracle_write_ts.expect("must exist").into();
let occurred_at = oracle_write_ts.into();
let id = tx.get_and_increment_id(storage::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
builtin_table_updates.push(self.pack_audit_log_update(&event)?);
Expand Down Expand Up @@ -2739,7 +2739,12 @@ impl<S: Append> Catalog<S> {
bootstrap_system_parameters: BTreeMap<String, String>,
system_parameter_frontend: Option<Arc<SystemParameterFrontend>>,
) -> Result<(), AdapterError> {
let system_config = self.storage().await.load_system_configuration().await?;
let (system_config, boot_ts) = {
let mut storage = self.storage().await;
let system_config = storage.load_system_configuration().await?;
let boot_ts = storage.boot_ts();
(system_config, boot_ts)
};
for (name, value) in &bootstrap_system_parameters {
if !system_config.contains_key(name) {
self.state.insert_system_configuration(name, value)?;
Expand Down Expand Up @@ -2810,8 +2815,7 @@ impl<S: Append> Catalog<S> {
Op::UpdateSystemConfiguration { name, value }
}))
.collect::<Vec<_>>();

self.transact(None, None, ops, |_| Ok(())).await.unwrap();
self.transact(boot_ts, None, ops, |_| Ok(())).await.unwrap();
tracing::info!("parameter sync on boot: end sync");
} else {
tracing::info!("parameter sync on boot: skipping sync as config has synced once");
Expand Down Expand Up @@ -3850,7 +3854,7 @@ impl<S: Append> Catalog<S> {
#[tracing::instrument(level = "debug", skip_all)]
pub async fn transact<F, R>(
&mut self,
oracle_write_ts: Option<mz_repr::Timestamp>,
oracle_write_ts: mz_repr::Timestamp,
session: Option<&Session>,
ops: Vec<Op>,
f: F,
Expand Down Expand Up @@ -3915,7 +3919,7 @@ impl<S: Append> Catalog<S> {
}

fn transact_inner(
oracle_write_ts: Option<mz_repr::Timestamp>,
oracle_write_ts: mz_repr::Timestamp,
session: Option<&Session>,
ops: Vec<Op>,
temporary_ids: Vec<GlobalId>,
Expand Down Expand Up @@ -6428,7 +6432,7 @@ mod tests {
assert_eq!(catalog.transient_revision(), 1);
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
mz_repr::Timestamp::MIN,
None,
vec![Op::CreateDatabase {
name: "test".to_string(),
Expand Down Expand Up @@ -6681,7 +6685,7 @@ mod tests {
.clone();
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
mz_repr::Timestamp::MIN,
None,
vec![Op::CreateItem {
id,
Expand Down
76 changes: 48 additions & 28 deletions src/adapter/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub struct BootstrapArgs {
pub struct Connection<S> {
stash: S,
consolidations_tx: mpsc::UnboundedSender<Vec<mz_stash::Id>>,
boot_ts: mz_repr::Timestamp,
}

impl<S: Append> Connection<S> {
Expand All @@ -555,29 +556,46 @@ impl<S: Append> Connection<S> {

// Initialize connection.
initialize_stash(&mut stash).await?;

// Choose a time at which to boot. This is the time at which we will run
// internal migrations, and is also exposed upwards in case higher
// layers want to run their own migrations at the same timestamp.
//
// This time is usually the current system time, but with protection
// against backwards time jumps, even across restarts.
let previous_now_ts = try_get_persisted_timestamp(&mut stash, &Timeline::EpochMilliseconds)
.await?
.unwrap_or(mz_repr::Timestamp::MIN);
let boot_ts = timeline::monotonic_now(now, previous_now_ts);

let mut conn = Connection {
stash,
consolidations_tx,
boot_ts,
};

if !conn.stash.is_readonly() {
// Choose a time at which to apply migrations. This is usually the
// current system time, but with protection against backwards time
// jumps, even across restarts.
let previous_now_ts = conn
.try_get_persisted_timestamp(&Timeline::EpochMilliseconds)
.await?
.unwrap_or(mz_repr::Timestamp::MIN);
let now_ts = timeline::monotonic_now(now, previous_now_ts);
// IMPORTANT: we durably record the new timestamp before using it.
conn.persist_timestamp(&Timeline::EpochMilliseconds, now_ts)
conn.persist_timestamp(&Timeline::EpochMilliseconds, boot_ts)
.await?;

migrate(&mut conn.stash, skip, now_ts.into(), bootstrap_args).await?;
migrate(&mut conn.stash, skip, boot_ts.into(), bootstrap_args).await?;
}

Ok(conn)
}

/// Returns the timestamp at which the storage layer booted.
///
/// This is the timestamp that will have been used to write any data during
/// migrations. It is exposed so that higher layers performing their own
/// migrations can write data at the same timestamp, if desired.
///
/// The boot timestamp is derived from the durable timestamp oracle and is
/// guaranteed to never go backwards, even in the face of backwards time
/// jumps across restarts.
pub fn boot_ts(&self) -> mz_repr::Timestamp {
self.boot_ts
}
}

impl<S: Append> Connection<S> {
Expand Down Expand Up @@ -917,23 +935,6 @@ impl<S: Append> Connection<S> {
.collect())
}

/// Get a global timestamp for a timeline that has been persisted to disk.
///
/// Returns `None` if no persisted timestamp for the specified timeline
/// exists.
pub async fn try_get_persisted_timestamp(
&mut self,
timeline: &Timeline,
) -> Result<Option<mz_repr::Timestamp>, Error> {
let key = TimestampKey {
id: timeline.to_string(),
};
Ok(COLLECTION_TIMESTAMP
.peek_key_one(&mut self.stash, &key)
.await?
.map(|v| v.ts))
}

/// Persist new global timestamp for a timeline to disk.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn persist_timestamp(
Expand Down Expand Up @@ -971,6 +972,25 @@ impl<S: Append> Connection<S> {
}
}

/// Gets a global timestamp for a timeline that has been persisted to disk.
///
/// Returns `None` if no persisted timestamp for the specified timeline exists.
async fn try_get_persisted_timestamp<S>(
stash: &mut S,
timeline: &Timeline,
) -> Result<Option<mz_repr::Timestamp>, Error>
where
S: Append,
{
let key = TimestampKey {
id: timeline.to_string(),
};
Ok(COLLECTION_TIMESTAMP
.peek_key_one(stash, &key)
.await?
.map(|v| v.ts))
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn transaction<'a, S: Append>(stash: &'a mut S) -> Result<Transaction<'a, S>, Error> {
let databases = COLLECTION_DATABASE.peek_one(stash).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl<S: Append + 'static> Coordinator<S> {
result,
} = self
.catalog
.transact(Some(oracle_write_ts), session, ops, |catalog| {
.transact(oracle_write_ts, session, ops, |catalog| {
f(CatalogTxn {
dataflow_client: &self.controller,
catalog,
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn datadriven() {
.clone();
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
mz_repr::Timestamp::MIN,
None,
vec![Op::CreateItem {
id,
Expand Down

0 comments on commit e06db13

Please sign in to comment.