Skip to content

Commit

Permalink
adapter: change audit log timestamp to use oracle
Browse files Browse the repository at this point in the history
Use the oracle timestamp instead of system clock for the audit events
table. The system clock was used because (at the time) the oracle ts
didn't have a firm bound on being off from the system clock, so using it
to determine when a thing happened was not safe. For example, we once
had a bug where the oracle ts was days ahead due to a fast loop. We've
improved the logic enough that we have enough confidence that won't
happen again (due to the use of write groups) that we'd now prefer to
have the guarantee of always goes up, and believe that the audit log
timestamps will be within a 10s margin of error.
  • Loading branch information
maddyblue authored and benesch committed Jan 7, 2023
1 parent d61bb8a commit c6a8756
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 5 deletions.
2 changes: 1 addition & 1 deletion doc/user/content/sql/system-catalog/mz_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Field | Type | Meaning
`object_type` | [`text`] | The type of the affected object: `cluster`, `cluster-replica`, `connection`, `database`, `function`, `index`, `materialized-view`, `role`, `schema`, `secret`, `sink`, `source`, `table`, `type`, or `view`.
`details` | [`jsonb`] | Additional details about the event. The shape of the details varies based on `event_type` and `object_type`.
`user` | [`text`] | The user who triggered the event, or `NULL` if triggered by the system.
`occurred_at` | [`timestamp with time zone`] | The time at which the event occurred.
`occurred_at` | [`timestamp with time zone`] | The time at which the event occurred. Guaranteed to be in order of event creation. Events created in the same transaction will have identical values.

### `mz_aws_privatelink_connections`

Expand Down
5 changes: 4 additions & 1 deletion src/adapter/benches/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ fn bench_transact(c: &mut Criterion) {
oid: id,
public_schema_oid: id,
}];
catalog.transact(None, ops, |_| Ok(())).await.unwrap();
catalog
.transact(Some(mz_repr::Timestamp::MIN), None, ops, |_| Ok(()))
.await
.unwrap();
})
})
});
Expand Down
26 changes: 24 additions & 2 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ impl CatalogState {
// passing tx, session, and builtin_table_updates?
fn add_to_audit_log<S: Append>(
&self,
oracle_write_ts: Option<mz_repr::Timestamp>,
session: Option<&Session>,
tx: &mut storage::Transaction<S>,
builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
Expand All @@ -1260,7 +1261,7 @@ impl CatalogState {
details: EventDetails,
) -> Result<(), Error> {
let user = session.map(|session| session.user().name.to_string());
let occurred_at = (self.config.now)();
let occurred_at = oracle_write_ts.expect("must exist").into();
let id = tx.get_and_increment_id(storage::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
builtin_table_updates.push(self.pack_audit_log_update(&event)?);
Expand Down Expand Up @@ -2810,7 +2811,7 @@ impl<S: Append> Catalog<S> {
}))
.collect::<Vec<_>>();

self.transact(None, ops, |_| Ok(())).await.unwrap();
self.transact(None, None, ops, |_| Ok(())).await.unwrap();
tracing::info!("parameter sync on boot: end sync");
} else {
tracing::info!("parameter sync on boot: skipping sync as config has synced once");
Expand Down Expand Up @@ -3849,6 +3850,7 @@ impl<S: Append> Catalog<S> {
#[tracing::instrument(level = "debug", skip_all)]
pub async fn transact<F, R>(
&mut self,
oracle_write_ts: Option<mz_repr::Timestamp>,
session: Option<&Session>,
ops: Vec<Op>,
f: F,
Expand Down Expand Up @@ -3884,6 +3886,7 @@ impl<S: Append> Catalog<S> {
let mut state = self.state.clone();

Self::transact_inner(
oracle_write_ts,
session,
ops,
temporary_ids,
Expand Down Expand Up @@ -3912,6 +3915,7 @@ impl<S: Append> Catalog<S> {
}

fn transact_inner(
oracle_write_ts: Option<mz_repr::Timestamp>,
session: Option<&Session>,
ops: Vec<Op>,
temporary_ids: Vec<GlobalId>,
Expand Down Expand Up @@ -4090,6 +4094,7 @@ impl<S: Append> Catalog<S> {
builtin_table_updates.extend(state.pack_item_update(id, -1));

state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4219,6 +4224,7 @@ impl<S: Append> Catalog<S> {
builtin_table_updates.extend(state.pack_item_update(id, -1));

state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4256,6 +4262,7 @@ impl<S: Append> Catalog<S> {
let database_id = tx.insert_database(&name)?;
let schema_id = tx.insert_schema(database_id, DEFAULT_SCHEMA)?;
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand All @@ -4277,6 +4284,7 @@ impl<S: Append> Catalog<S> {
},
)?;
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4320,6 +4328,7 @@ impl<S: Append> Catalog<S> {
};
let schema_id = tx.insert_schema(database_id, &schema_name)?;
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4351,6 +4360,7 @@ impl<S: Append> Catalog<S> {
}
let role_id = tx.insert_user_role(&name)?;
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4384,6 +4394,7 @@ impl<S: Append> Catalog<S> {
let id =
tx.insert_user_compute_instance(&name, &arranged_introspection_sources)?;
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4438,6 +4449,7 @@ impl<S: Append> Catalog<S> {
},
);
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4548,6 +4560,7 @@ impl<S: Append> Catalog<S> {
_ => EventDetails::IdFullNameV1(IdFullNameV1 { id, name }),
};
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand All @@ -4574,6 +4587,7 @@ impl<S: Append> Catalog<S> {
tx.remove_database(&id)?;
builtin_table_updates.push(state.pack_database_update(database, -1));
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand All @@ -4599,6 +4613,7 @@ impl<S: Append> Catalog<S> {
-1,
));
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4630,6 +4645,7 @@ impl<S: Append> Catalog<S> {
let role = &state.roles[&name];
builtin_table_updates.push(state.pack_role_update(role, -1));
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand All @@ -4656,6 +4672,7 @@ impl<S: Append> Catalog<S> {
builtin_table_updates.extend(state.pack_item_update(*id, -1));
}
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4720,6 +4737,7 @@ impl<S: Append> Catalog<S> {
replica_name: name.clone(),
});
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4750,6 +4768,7 @@ impl<S: Append> Catalog<S> {
builtin_table_updates.extend(state.pack_item_update(id, -1));
if Self::should_audit_log_item(&entry.item) {
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -4809,6 +4828,7 @@ impl<S: Append> Catalog<S> {
});
if Self::should_audit_log_item(&entry.item) {
state.add_to_audit_log(
oracle_write_ts,
session,
tx,
builtin_table_updates,
Expand Down Expand Up @@ -6408,6 +6428,7 @@ mod tests {
assert_eq!(catalog.transient_revision(), 1);
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
None,
vec![Op::CreateDatabase {
name: "test".to_string(),
Expand Down Expand Up @@ -6660,6 +6681,7 @@ mod tests {
.clone();
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
None,
vec![Op::CreateItem {
id,
Expand Down
12 changes: 11 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,24 @@ impl<S: Append + 'static> Coordinator<S> {
.unwrap_or(SYSTEM_CONN_ID),
)?;

// This will produce timestamps that are guaranteed to increase on each
// call, and also never be behind the system clock. If the system clock
// hasn't advanced (or has gone backward), it will increment by 1. For
// the audit log, we need to balance "close (within 10s or so) to the
// system clock" and "always goes up". We've chosen here to prioritize
// always going up, and believe we will always be close to the system
// clock because it is well configured (chrony) and so may only rarely
// regress or pause for 10s.
let oracle_write_ts = self.get_local_write_ts().await.timestamp;

let TransactionResult {
builtin_table_updates,
audit_events,
collections,
result,
} = self
.catalog
.transact(session, ops, |catalog| {
.transact(Some(oracle_write_ts), session, ops, |catalog| {
f(CatalogTxn {
dataflow_client: &self.controller,
catalog,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn datadriven() {
.clone();
catalog
.transact(
Some(mz_repr::Timestamp::MIN),
None,
vec![Op::CreateItem {
id,
Expand Down

0 comments on commit c6a8756

Please sign in to comment.