Skip to content

Commit

Permalink
fix: fix alter table verification (#2437)
Browse files Browse the repository at this point in the history
* fix: fix verify alter

* refactor: move AlterTable UpdateMetadata to last step

* refactor: send region request in parallel

* Update src/table/src/metadata.rs

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* Update src/table/src/metadata.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
  • Loading branch information
3 people committed Sep 19, 2023
1 parent 0f79cca commit 339e12c
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 72 deletions.
69 changes: 36 additions & 33 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ impl AlterTableProcedure {
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;

let alter_kind = self.alter_kind()?;
let manager = &self.context.table_metadata_manager;

if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? {
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);

let exist = manager
Expand All @@ -146,7 +147,11 @@ impl AlterTableProcedure {
}
);

self.data.state = AlterTableState::UpdateMetadata;
if matches!(alter_kind, Kind::RenameTable { .. }) {
self.data.state = AlterTableState::UpdateMetadata;
} else {
self.data.state = AlterTableState::SubmitAlterRegionRequests;
};

Ok(Status::executing(true))
}
Expand Down Expand Up @@ -174,7 +179,7 @@ impl AlterTableProcedure {
})
}

pub async fn submit_alter_region_requests(&self) -> Result<Status> {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

Expand All @@ -192,38 +197,41 @@ impl AlterTableProcedure {
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let datanode_manager = self.context.datanode_manager.clone();

let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);

alter_region_tasks.push(async move {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let requester = datanode_manager.datanode(&datanode).await;
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let datanode = datanode.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(e));
}
}
Ok(())
});
Ok(())
});
}
}

future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(Status::Done)
self.data.state = AlterTableState::UpdateMetadata;

Ok(Status::executing(true))
}

/// Update table metadata for rename table operation.
Expand Down Expand Up @@ -313,22 +321,17 @@ impl AlterTableProcedure {
let alter_kind = self.alter_kind()?;
let cache_invalidator = &self.context.cache_invalidator;

let status = if matches!(alter_kind, Kind::RenameTable { .. }) {
if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.await?;

Status::Done
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.await?;

self.data.state = AlterTableState::SubmitAlterRegionRequests;

Status::executing(true)
};
Ok(status)

Ok(Status::Done)
}

fn lock_key_inner(&self) -> Vec<String> {
Expand Down Expand Up @@ -376,9 +379,9 @@ impl Procedure for AlterTableProcedure {

match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
}
.map_err(error_handler)
}
Expand All @@ -398,11 +401,11 @@ impl Procedure for AlterTableProcedure {
enum AlterTableState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
SubmitAlterRegionRequests,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
32 changes: 17 additions & 15 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl CreateTableProcedure {
let mut create_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let manager = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
Expand All @@ -197,23 +197,25 @@ impl CreateTableProcedure {
})
.collect::<Vec<_>>();

create_region_tasks.push(async move {
for request in requests {
let requester = manager.datanode(&datanode).await;

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};

let datanode = datanode.clone();
let requester = requester.clone();

create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
}
Ok(())
});
Ok(())
});
}
}

join_all(create_region_tasks)
Expand Down
41 changes: 22 additions & 19 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,36 +142,39 @@ impl DropTableProcedure {
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let clients = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

drop_region_tasks.push(async move {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();

drop_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}
}
}
Ok(())
});
Ok(())
});
}
}

join_all(drop_region_tasks)
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ async fn test_submit_alter_region_requests() {
.await
.unwrap();

let procedure = AlterTableProcedure::new(
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(table_info),
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn test_submit_alter_region_requests() {
});

let status = procedure.submit_alter_region_requests().await.unwrap();
assert!(matches!(status, Status::Done));
assert!(matches!(status, Status::Executing { persist: true }));

handle.await.unwrap();

Expand Down
13 changes: 10 additions & 3 deletions src/table/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid alter table({}) request: {}", table, err))]
InvalidAlterRequest {
table: String,
location: Location,
err: String,
},

#[snafu(display("Invalid table state: {}", table_id))]
InvalidTable {
table_id: TableId,
Expand All @@ -141,9 +148,9 @@ impl ErrorExt for Error {
Error::Datafusion { .. }
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => {
StatusCode::InvalidArguments
}
Error::RemoveColumnInIndex { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,
Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => {
StatusCode::Unexpected
}
Expand Down
65 changes: 65 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,41 @@ impl TableMeta {
let original_primary_key_indices: HashSet<&usize> =
self.primary_key_indices.iter().collect();

let mut names = HashSet::with_capacity(requests.len());

for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);

ensure!(
!table_schema.contains_column(&col_to_add.column_schema.name),
error::ColumnExistsSnafu {
table_name,
column_name: col_to_add.column_schema.name.to_string()
},
);

ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
}

let SplitResult {
columns_at_first,
columns_at_after,
Expand Down Expand Up @@ -858,6 +893,36 @@ mod tests {
assert_eq!(StatusCode::TableColumnExists, err.status_code());
}

#[test]
fn test_add_invalid_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();

let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new(
"weny",
ConcreteDataType::string_datatype(),
false,
),
is_key: false,
location: None,
}],
};

let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}

#[test]
fn test_remove_unknown_column() {
let schema = Arc::new(new_test_schema());
Expand Down

0 comments on commit 339e12c

Please sign in to comment.