From c6a87569d21da49ee65d01780c8455f379272cd0 Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Wed, 4 Jan 2023 11:12:23 -0700 Subject: [PATCH] adapter: change audit log timestamp to use oracle Use the oracle timestamp instead of system clock for the audit events table. The system clock was used because (at the time) the oracle ts didn't have a firm bound on being off from the system clock, so using it to determine when a thing happened was not safe. For example, we once had a bug where the oracle ts was days ahead due to a fast loop. We've improved the logic enough that we have enough confidence that won't happen again (due to the use of write groups) that we'd now prefer to have the guarantee of always goes up, and believe that the audit log timestamps will be within a 10s margin of error. --- .../content/sql/system-catalog/mz_catalog.md | 2 +- src/adapter/benches/catalog.rs | 5 +++- src/adapter/src/catalog.rs | 26 +++++++++++++++++-- src/adapter/src/coord/ddl.rs | 12 ++++++++- src/adapter/tests/sql.rs | 1 + 5 files changed, 41 insertions(+), 5 deletions(-) diff --git a/doc/user/content/sql/system-catalog/mz_catalog.md b/doc/user/content/sql/system-catalog/mz_catalog.md index eec605eca5e4..e3cf743d48bf 100644 --- a/doc/user/content/sql/system-catalog/mz_catalog.md +++ b/doc/user/content/sql/system-catalog/mz_catalog.md @@ -33,7 +33,7 @@ Field | Type | Meaning `object_type` | [`text`] | The type of the affected object: `cluster`, `cluster-replica`, `connection`, `database`, `function`, `index`, `materialized-view`, `role`, `schema`, `secret`, `sink`, `source`, `table`, `type`, or `view`. `details` | [`jsonb`] | Additional details about the event. The shape of the details varies based on `event_type` and `object_type`. `user` | [`text`] | The user who triggered the event, or `NULL` if triggered by the system. -`occurred_at` | [`timestamp with time zone`] | The time at which the event occurred. +`occurred_at` | [`timestamp with time zone`] | The time at which the event occurred. Guaranteed to be in order of event creation. Events created in the same transaction will have identical values. ### `mz_aws_privatelink_connections` diff --git a/src/adapter/benches/catalog.rs b/src/adapter/benches/catalog.rs index 2ec90fd99215..d8e31d30aa3a 100644 --- a/src/adapter/benches/catalog.rs +++ b/src/adapter/benches/catalog.rs @@ -124,7 +124,10 @@ fn bench_transact(c: &mut Criterion) { oid: id, public_schema_oid: id, }]; - catalog.transact(None, ops, |_| Ok(())).await.unwrap(); + catalog + .transact(Some(mz_repr::Timestamp::MIN), None, ops, |_| Ok(())) + .await + .unwrap(); }) }) }); diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 7d7a3c2d4d32..febe9840695a 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1251,6 +1251,7 @@ impl CatalogState { // passing tx, session, and builtin_table_updates? fn add_to_audit_log( &self, + oracle_write_ts: Option, session: Option<&Session>, tx: &mut storage::Transaction, builtin_table_updates: &mut Vec, @@ -1260,7 +1261,7 @@ impl CatalogState { details: EventDetails, ) -> Result<(), Error> { let user = session.map(|session| session.user().name.to_string()); - let occurred_at = (self.config.now)(); + let occurred_at = oracle_write_ts.expect("must exist").into(); let id = tx.get_and_increment_id(storage::AUDIT_LOG_ID_ALLOC_KEY.to_string())?; let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at); builtin_table_updates.push(self.pack_audit_log_update(&event)?); @@ -2810,7 +2811,7 @@ impl Catalog { })) .collect::>(); - self.transact(None, ops, |_| Ok(())).await.unwrap(); + self.transact(None, None, ops, |_| Ok(())).await.unwrap(); tracing::info!("parameter sync on boot: end sync"); } else { tracing::info!("parameter sync on boot: skipping sync as config has synced once"); @@ -3849,6 +3850,7 @@ impl Catalog { #[tracing::instrument(level = "debug", skip_all)] pub async fn transact( &mut self, + oracle_write_ts: Option, session: Option<&Session>, ops: Vec, f: F, @@ -3884,6 +3886,7 @@ impl Catalog { let mut state = self.state.clone(); Self::transact_inner( + oracle_write_ts, session, ops, temporary_ids, @@ -3912,6 +3915,7 @@ impl Catalog { } fn transact_inner( + oracle_write_ts: Option, session: Option<&Session>, ops: Vec, temporary_ids: Vec, @@ -4090,6 +4094,7 @@ impl Catalog { builtin_table_updates.extend(state.pack_item_update(id, -1)); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4219,6 +4224,7 @@ impl Catalog { builtin_table_updates.extend(state.pack_item_update(id, -1)); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4256,6 +4262,7 @@ impl Catalog { let database_id = tx.insert_database(&name)?; let schema_id = tx.insert_schema(database_id, DEFAULT_SCHEMA)?; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4277,6 +4284,7 @@ impl Catalog { }, )?; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4320,6 +4328,7 @@ impl Catalog { }; let schema_id = tx.insert_schema(database_id, &schema_name)?; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4351,6 +4360,7 @@ impl Catalog { } let role_id = tx.insert_user_role(&name)?; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4384,6 +4394,7 @@ impl Catalog { let id = tx.insert_user_compute_instance(&name, &arranged_introspection_sources)?; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4438,6 +4449,7 @@ impl Catalog { }, ); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4548,6 +4560,7 @@ impl Catalog { _ => EventDetails::IdFullNameV1(IdFullNameV1 { id, name }), }; state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4574,6 +4587,7 @@ impl Catalog { tx.remove_database(&id)?; builtin_table_updates.push(state.pack_database_update(database, -1)); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4599,6 +4613,7 @@ impl Catalog { -1, )); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4630,6 +4645,7 @@ impl Catalog { let role = &state.roles[&name]; builtin_table_updates.push(state.pack_role_update(role, -1)); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4656,6 +4672,7 @@ impl Catalog { builtin_table_updates.extend(state.pack_item_update(*id, -1)); } state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4720,6 +4737,7 @@ impl Catalog { replica_name: name.clone(), }); state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4750,6 +4768,7 @@ impl Catalog { builtin_table_updates.extend(state.pack_item_update(id, -1)); if Self::should_audit_log_item(&entry.item) { state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -4809,6 +4828,7 @@ impl Catalog { }); if Self::should_audit_log_item(&entry.item) { state.add_to_audit_log( + oracle_write_ts, session, tx, builtin_table_updates, @@ -6408,6 +6428,7 @@ mod tests { assert_eq!(catalog.transient_revision(), 1); catalog .transact( + Some(mz_repr::Timestamp::MIN), None, vec![Op::CreateDatabase { name: "test".to_string(), @@ -6660,6 +6681,7 @@ mod tests { .clone(); catalog .transact( + Some(mz_repr::Timestamp::MIN), None, vec![Op::CreateItem { id, diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 6165f7d206ae..8d62d26c771d 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -217,6 +217,16 @@ impl Coordinator { .unwrap_or(SYSTEM_CONN_ID), )?; + // This will produce timestamps that are guaranteed to increase on each + // call, and also never be behind the system clock. If the system clock + // hasn't advanced (or has gone backward), it will increment by 1. For + // the audit log, we need to balance "close (within 10s or so) to the + // system clock" and "always goes up". We've chosen here to prioritize + // always going up, and believe we will always be close to the system + // clock because it is well configured (chrony) and so may only rarely + // regress or pause for 10s. + let oracle_write_ts = self.get_local_write_ts().await.timestamp; + let TransactionResult { builtin_table_updates, audit_events, @@ -224,7 +234,7 @@ impl Coordinator { result, } = self .catalog - .transact(session, ops, |catalog| { + .transact(Some(oracle_write_ts), session, ops, |catalog| { f(CatalogTxn { dataflow_client: &self.controller, catalog, diff --git a/src/adapter/tests/sql.rs b/src/adapter/tests/sql.rs index 22c37e3daa1d..be389355b190 100644 --- a/src/adapter/tests/sql.rs +++ b/src/adapter/tests/sql.rs @@ -139,6 +139,7 @@ async fn datadriven() { .clone(); catalog .transact( + Some(mz_repr::Timestamp::MIN), None, vec![Op::CreateItem { id,