Skip to content

Commit

Permalink
fix: skip insert when creating table that exists (#2901)
Browse files Browse the repository at this point in the history
Co-authored-by: Tal Gluck <talagluck@gmail.com>
  • Loading branch information
tychoish and talagluck authored Apr 17, 2024
1 parent 2b7035a commit 50b3428
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
58 changes: 33 additions & 25 deletions crates/sqlexec/src/planner/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,37 +188,45 @@ impl CreateTableExec {
}
};

let table = storage.create_table(ent, save_mode).await.map_err(|e| {
DataFusionError::Execution(format!("failed to create table in storage: {e}"))
})?;
let table_existed = storage
.table_exists(ent)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let insert_res = match (source, or_replace) {
(Some(input), overwrite) => insert(&table, input, overwrite, context).await,
if !table_existed || !if_not_exists {
let table = storage.create_table(ent, save_mode).await.map_err(|e| {
DataFusionError::Execution(format!("failed to create table in storage: {e}"))
})?;

// if it's a 'replace' and there is no insert, we overwrite with an empty table
(None, true) => {
let input = Arc::new(EmptyExec::new(TableProvider::schema(&table)));
insert(&table, input, true, context).await
let insert_res = match (source, or_replace) {
(Some(input), overwrite) => insert(&table, input, overwrite, context).await,

// if it's a 'replace' and there is no insert, we overwrite with an empty table
(None, true) => {
let input = Arc::new(EmptyExec::new(TableProvider::schema(&table)));
insert(&table, input, true, context).await
}
(None, false) => Ok(()),
};

if let Err(e) = insert_res {
storage.delete_table(ent).await.map_err(|e| {
DataFusionError::Execution(format!("failed to clean up table: {e}"))
})?;
return Err(e);
}
(None, false) => Ok(()),
};

if let Err(e) = insert_res {
storage.delete_table(ent).await.map_err(|e| {
DataFusionError::Execution(format!("failed to clean up table: {e}"))
})?;
return Err(e);
}
mutator
.commit_state(catalog_version, state.as_ref().clone())
.await
.map_err(|e| {
DataFusionError::Execution(format!("failed to commit catalog state: {e}"))
})?;

mutator
.commit_state(catalog_version, state.as_ref().clone())
.await
.map_err(|e| {
DataFusionError::Execution(format!("failed to commit catalog state: {e}"))
})?;
debug!(loc = %table.storage_location(), "native table created");
debug!(loc = %table.storage_location(), "native table created");

// TODO: Add storage tracking job.
// TODO: Add storage tracking job.
}

Ok(new_operation_batch("create_table"))
}
Expand Down
16 changes: 16 additions & 0 deletions testdata/sqllogictests/create_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,19 @@ create table test as select vector, point as "Point" from lance_scan('./testdata

statement ok
select * from test;

statement ok
create table if not exists foo as select 1;

query I
select * from foo;
----
1

statement ok
create table if not exists foo as select 'a'

query I
select * from foo;
----
1

0 comments on commit 50b3428

Please sign in to comment.