Skip to content

Commit

Permalink
fix: inserts clean up on failure (#2875)
Browse files Browse the repository at this point in the history
This fixes a bug introduced by datafusion 36

previously some operations that failed during optimization such as
invalid casts now fail at runtime.

Since they now fail at runtime, it means that we would still create the
catalog and table and only fail during insert afterwards. This left both
the catalog and the storage in a bad state that didn't accurately
reflect the operation. e.g.

```sql
create table invalid_ctas as (select cast('test' as int) as 'bad_cast');
```

This updated the catalog and created a table for `invalid_ctas`, but
when you'd query it you would get an error.

This PR makes sure that the operation is successful before committing
the changes. It does so by exposing some new methods on the catalog
client. `commit_state` `mutate_and_commit` and `mutate` instead of the
previous `mutate`.

The existing code was refactored to use the `mutate_and_commit` which is
the same as the old `mutate`. The code that requires the commit
semantics (create table) now uses `mutate` to first get an uncommitted
catalog state with those changes, then does all of it's other actions

- create the "native" table
- _Optional_ inserts into the table
- commits the catalog state

If any of the operations before the commit fail, then the catalog
mutations are never committed.

---------

Co-authored-by: Sean Smith <scsmithr@gmail.com>
  • Loading branch information
2 people authored and tychoish committed Apr 17, 2024
1 parent 3916f91 commit 1f5d400
Show file tree
Hide file tree
Showing 25 changed files with 583 additions and 129 deletions.
51 changes: 48 additions & 3 deletions crates/catalog/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub struct MetastoreClientConfig {
pub max_ticks_before_exit: usize,
}


/// Handle to a metastore client.
#[derive(Debug, Clone)]
pub struct MetastoreClientHandle {
Expand Down Expand Up @@ -65,8 +64,48 @@ impl MetastoreClientHandle {
.and_then(std::convert::identity) // Flatten
}

/// Try to run mutations against the Metastore catalog.
///
pub async fn commit_state(
&self,
version: u64,
state: CatalogState,
) -> Result<Arc<CatalogState>> {
let (tx, rx) = oneshot::channel();
self.send(
ClientRequest::Commit {
version,
state: Arc::new(state),
response: tx,
},
rx,
)
.await
.and_then(std::convert::identity) // Flatten
}

/// Try to run mutations against the Metastore catalog and commit them.
pub async fn try_mutate_and_commit(
&self,
current_version: u64,
mutations: Vec<Mutation>,
) -> Result<Arc<CatalogState>> {
let (tx, rx) = oneshot::channel();
let state = self
.send(
ClientRequest::ExecMutations {
version: current_version,
mutations,
response: tx,
},
rx,
)
.await??;

self.commit_state(current_version, state.as_ref().clone())
.await
}

/// Try to run mutations against the Metastore catalog
/// IMPORTANT: This method does not commit the mutations to the catalog. see `try_mutate_and_commit`
/// The version provided should be the version of the catalog state that the
/// session currently has.
pub async fn try_mutate(
Expand Down Expand Up @@ -122,6 +161,11 @@ pub enum ClientRequest {
GetCachedState {
response: oneshot::Sender<Result<Arc<CatalogState>>>,
},
Commit {
version: u64,
state: Arc<CatalogState>,
response: oneshot::Sender<Result<Arc<CatalogState>>>,
},

/// Execute mutations against a catalog.
ExecMutations {
Expand All @@ -140,6 +184,7 @@ impl ClientRequest {
fn tag(&self) -> &'static str {
match self {
ClientRequest::Ping { .. } => "ping",
ClientRequest::Commit { .. } => "commit",
ClientRequest::GetCachedState { .. } => "get_cached_state",
ClientRequest::ExecMutations { .. } => "exec_mutations",
ClientRequest::RefreshCachedState { .. } => "refresh_cached_state",
Expand Down
42 changes: 40 additions & 2 deletions crates/catalog/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,34 @@ impl CatalogMutator {
self.client.as_ref()
}

/// Commit the catalog state.
/// This persists the state to the metastore.
/// The `current_catalog_version` is the version of the catalog prior to the state being committed.
/// the 'state.version' should always be greater than 'current_catalog_version'.
/// If not, the commit will not succeed.
pub async fn commit_state(
&self,
current_catalog_version: u64,
state: CatalogState,
) -> Result<Arc<CatalogState>> {
let client = match &self.client {
Some(client) => client,
None => return Err(CatalogError::new("metastore client not configured")),
};

client
.commit_state(current_catalog_version, state.clone())
.await
}

/// Mutate the catalog if possible.
/// This returns the catalog state with the mutations reflected.
/// IMPORTANT: these changes are not yet persisted and must be 'committed' manually via `commit_state`.
/// If you wish to mutate and immediately commit, use `mutate_and_commit`
///
/// Errors if the metastore client isn't configured.
///
/// This will retry mutations if we were working with an out of date
/// catalog.
/// This will retry mutations if we were working with an out of date catalog.
pub async fn mutate(
&self,
catalog_version: u64,
Expand Down Expand Up @@ -74,6 +96,22 @@ impl CatalogMutator {

Ok(state)
}

/// Mutate the catalog if possible and immediately commit the changes.
///
/// Errors if the metastore client isn't configured.
///
/// This will retry mutations if we were working with an out of date
/// catalog.
pub async fn mutate_and_commit(
&self,
catalog_version: u64,
mutations: impl IntoIterator<Item = Mutation>,
) -> Result<Arc<CatalogState>> {
let state = self.mutate(catalog_version, mutations).await?;
self.commit_state(catalog_version, state.as_ref().clone())
.await
}
}

impl From<MetastoreClientHandle> for CatalogMutator {
Expand Down
Loading

0 comments on commit 1f5d400

Please sign in to comment.