Skip to content

Commit

Permalink
feat: ALTER TABLE .. RENAME COLUMN ..
Browse files Browse the repository at this point in the history
This doesn't work since we need to update the delta metadata as well with
the updated schema which I haven't been able to figure out how...

Fixes: #2900

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Apr 18, 2024
1 parent 50b3428 commit 347df7d
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 8 deletions.
35 changes: 35 additions & 0 deletions crates/metastore/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,41 @@ impl State {
other => unreachable!("unexpected entry type: {:?}", other),
};
}
AlterTableOperation::RenameColumn {
old_column_name,
new_column_name,
} => {
let oid = match objs.tables.get(&alter_table.name) {
None => {
return Err(MetastoreError::MissingNamedObject {
schema: alter_table.schema,
name: alter_table.name,
})
}
Some(id) => id,
};

match self.entries.get_mut(oid)?.unwrap() {
CatalogEntry::Table(ent) => {
let columns = ent.get_internal_columns_mut().ok_or_else(|| {
MetastoreError::MissingColumnDefinition {
table: alter_table.name.to_string(),
}
})?;

let column = columns
.iter_mut()
.find(|c| c.name == old_column_name)
.ok_or_else(|| MetastoreError::MissingColumnFromTable {
column: old_column_name,
table: alter_table.name.to_string(),
})?;

column.name = new_column_name;
}
other => unreachable!("unexpected entry type: {:?}", other),
};
}
};
}
Mutation::AlterDatabase(alter_database) => {
Expand Down
6 changes: 6 additions & 0 deletions crates/metastore/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub enum MetastoreError {
#[error("Missing entry: {0}")]
MissingEntry(u32),

#[error("Missing columns definition for table '{table}'")]
MissingColumnDefinition { table: String },

#[error("Missing column '{column}' from table '{table}'")]
MissingColumnFromTable { column: String, table: String },

#[error("Tunnel '{tunnel} not supported for {action}'")]
TunnelNotSupportedForAction {
tunnel: String,
Expand Down
6 changes: 6 additions & 0 deletions crates/protogen/proto/metastore/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,16 @@ message AlterTableOperationSetAccessMode {
catalog.SourceAccessMode access_mode = 1;
}

message AlterTableOperationRenameColumn {
string old_column_name = 1;
string new_column_name = 2;
}

message AlterTableOperation {
oneof operation {
AlterTableOperationRename alter_table_operation_rename = 1;
AlterTableOperationSetAccessMode alter_table_operation_set_access_mode = 2;
AlterTableOperationRenameColumn alter_table_operation_rename_column = 3;
};
}

Expand Down
8 changes: 8 additions & 0 deletions crates/protogen/src/metastore/types/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ impl TableEntry {
}
}

/// Try to get the mutable ref to columns for this table if available.
pub fn get_internal_columns_mut(&mut self) -> Option<&mut Vec<InternalColumnDefinition>> {
match &mut self.options {
TableOptionsV0::Internal(options) => Some(&mut options.columns),
_ => None,
}
}

pub fn get_columns(&self) -> Option<Vec<FieldRef>> {
self.get_internal_columns().map(|val| {
val.iter()
Expand Down
30 changes: 28 additions & 2 deletions crates/protogen/src/metastore/types/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,16 @@ impl From<CreateExternalDatabase> for service::CreateExternalDatabase {

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AlterTableOperation {
RenameTable { new_name: String },
SetAccessMode { access_mode: SourceAccessMode },
RenameTable {
new_name: String,
},
SetAccessMode {
access_mode: SourceAccessMode,
},
RenameColumn {
old_column_name: String,
new_column_name: String,
},
}

impl TryFrom<service::alter_table_operation::Operation> for AlterTableOperation {
Expand All @@ -453,6 +461,15 @@ impl TryFrom<service::alter_table_operation::Operation> for AlterTableOperation
) => Self::SetAccessMode {
access_mode: access_mode.try_into()?,
},
service::alter_table_operation::Operation::AlterTableOperationRenameColumn(
service::AlterTableOperationRenameColumn {
old_column_name,
new_column_name,
},
) => Self::RenameColumn {
old_column_name,
new_column_name,
},
})
}
}
Expand All @@ -472,6 +489,15 @@ impl From<AlterTableOperation> for service::alter_table_operation::Operation {
},
)
}
AlterTableOperation::RenameColumn {
old_column_name,
new_column_name,
} => service::alter_table_operation::Operation::AlterTableOperationRenameColumn(
service::AlterTableOperationRenameColumn {
old_column_name,
new_column_name,
},
),
}
}
}
Expand Down
50 changes: 44 additions & 6 deletions crates/sqlexec/src/planner/physical_plan/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use datasources::native::access::NativeTableStorage;
use futures::stream;
use protogen::metastore::types::catalog::CatalogEntry;
use protogen::metastore::types::service::{self, AlterTableOperation, Mutation};

use super::{new_operation_batch, GENERIC_OPERATION_PHYSICAL_SCHEMA};
Expand Down Expand Up @@ -80,7 +82,12 @@ impl ExecutionPlan for AlterTableExec {
.get_extension::<CatalogMutator>()
.expect("context should have catalog mutator");

let stream = stream::once(alter_table_rename(mutator, self.clone()));
let storage = context
.session_config()
.get_extension::<NativeTableStorage>()
.unwrap();

let stream = stream::once(alter_table(mutator, self.clone(), storage));

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -99,22 +106,53 @@ impl DisplayAs for AlterTableExec {
}
}

async fn alter_table_rename(
async fn alter_table(
mutator: Arc<CatalogMutator>,
plan: AlterTableExec,
storage: Arc<NativeTableStorage>,
) -> DataFusionResult<RecordBatch> {
// TODO: Error if schemas between references differ.
mutator
.mutate_and_commit(
let new_state = mutator
.mutate(
plan.catalog_version,
[Mutation::AlterTable(service::AlterTable {
schema: plan.schema,
name: plan.name,
operation: plan.operation,
name: plan.name.clone(),
operation: plan.operation.clone(),
})],
)
.await
.map_err(|e| DataFusionError::Execution(format!("failed to alter table: {e}")))?;

// Table was successfully altered. We can re-name delta table now.
if let AlterTableOperation::RenameColumn { .. } = plan.operation {
let updated_table = new_state
.entries
.iter()
.find_map(|(_, e)| {
if let CatalogEntry::Table(t) = e {
if t.meta.name == plan.name {
Some(t)
} else {
None
}
} else {
None
}
})
.unwrap();

let _updated_table = storage.load_table(updated_table).await.map_err(|e| {
DataFusionError::Execution(format!("unable to load table '{}': {}", plan.name, e))
})?;

todo!("actually update the table with new schema...");
}

mutator
.commit_state(plan.catalog_version, new_state.as_ref().clone())
.await
.map_err(|e| DataFusionError::Execution(format!("failed to commit state: {e}")))?;

Ok(new_operation_batch("alter_table"))
}
27 changes: 27 additions & 0 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,33 @@ impl<'a> SessionPlanner<'a> {
}
.into_logical_plan())
}
ast::AlterTableOperation::RenameColumn {
old_column_name,
new_column_name,
} => {
validate_object_name(&name)?;
let name = object_name_to_table_ref(name)?;
let name = self.ctx.resolve_table_ref(name)?;

let schema = name.schema.into_owned();
let name = name.name.into_owned();

validate_ident(&old_column_name)?;
let old_column_name = normalize_ident(old_column_name);

validate_ident(&new_column_name)?;
let new_column_name = normalize_ident(new_column_name);

Ok(AlterTable {
schema,
name,
operation: AlterTableOperation::RenameColumn {
old_column_name,
new_column_name,
},
}
.into_logical_plan())
}
other => Err(PlanError::UnsupportedSQLStatement(other.to_string())),
}
}
Expand Down

0 comments on commit 347df7d

Please sign in to comment.