Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix alter table verification #2437

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ impl AlterTableProcedure {
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;

let alter_kind = self.alter_kind()?;
let manager = &self.context.table_metadata_manager;

if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? {
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);

let exist = manager
Expand All @@ -146,7 +147,11 @@ impl AlterTableProcedure {
}
);

self.data.state = AlterTableState::UpdateMetadata;
if matches!(alter_kind, Kind::RenameTable { .. }) {
self.data.state = AlterTableState::UpdateMetadata;
} else {
self.data.state = AlterTableState::SubmitAlterRegionRequests;
};

Ok(Status::executing(true))
}
Expand Down Expand Up @@ -174,7 +179,7 @@ impl AlterTableProcedure {
})
}

pub async fn submit_alter_region_requests(&self) -> Result<Status> {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

Expand All @@ -192,38 +197,41 @@ impl AlterTableProcedure {
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let datanode_manager = self.context.datanode_manager.clone();

let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);

alter_region_tasks.push(async move {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let requester = datanode_manager.datanode(&datanode).await;
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let datanode = datanode.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(e));
}
}
Ok(())
});
Ok(())
});
}
}

future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(Status::Done)
self.data.state = AlterTableState::UpdateMetadata;

Ok(Status::executing(true))
}

/// Update table metadata for rename table operation.
Expand Down Expand Up @@ -313,22 +321,17 @@ impl AlterTableProcedure {
let alter_kind = self.alter_kind()?;
let cache_invalidator = &self.context.cache_invalidator;

let status = if matches!(alter_kind, Kind::RenameTable { .. }) {
if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.await?;

Status::Done
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.await?;

self.data.state = AlterTableState::SubmitAlterRegionRequests;

Status::executing(true)
};
Ok(status)

Ok(Status::Done)
}

fn lock_key_inner(&self) -> Vec<String> {
Expand Down Expand Up @@ -376,9 +379,9 @@ impl Procedure for AlterTableProcedure {

match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
}
.map_err(error_handler)
}
Expand All @@ -398,11 +401,11 @@ impl Procedure for AlterTableProcedure {
enum AlterTableState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
SubmitAlterRegionRequests,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
32 changes: 17 additions & 15 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl CreateTableProcedure {
let mut create_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let manager = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
Expand All @@ -197,23 +197,25 @@ impl CreateTableProcedure {
})
.collect::<Vec<_>>();

create_region_tasks.push(async move {
for request in requests {
let requester = manager.datanode(&datanode).await;

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};

let datanode = datanode.clone();
let requester = requester.clone();

create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
}
Ok(())
});
Ok(())
});
}
}

join_all(create_region_tasks)
Expand Down
41 changes: 22 additions & 19 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,36 +142,39 @@ impl DropTableProcedure {
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let clients = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

drop_region_tasks.push(async move {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();

drop_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}
}
}
Ok(())
});
Ok(())
});
}
}

join_all(drop_region_tasks)
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ async fn test_submit_alter_region_requests() {
.await
.unwrap();

let procedure = AlterTableProcedure::new(
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(table_info),
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn test_submit_alter_region_requests() {
});

let status = procedure.submit_alter_region_requests().await.unwrap();
assert!(matches!(status, Status::Done));
assert!(matches!(status, Status::Executing { persist: true }));

handle.await.unwrap();

NiwakaDev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
13 changes: 10 additions & 3 deletions src/table/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid alter table({}) request: {}", table, err))]
InvalidAlterRequest {
table: String,
location: Location,
err: String,
},

#[snafu(display("Invalid table state: {}", table_id))]
InvalidTable {
table_id: TableId,
Expand All @@ -141,9 +148,9 @@ impl ErrorExt for Error {
Error::Datafusion { .. }
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => {
StatusCode::InvalidArguments
}
Error::RemoveColumnInIndex { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,
Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => {
StatusCode::Unexpected
}
Expand Down
64 changes: 64 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,40 @@ impl TableMeta {
let original_primary_key_indices: HashSet<&usize> =
self.primary_key_indices.iter().collect();

let mut names = HashSet::with_capacity(requests.len());

for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);

ensure!(
!table_schema.contains_column(&col_to_add.column_schema.name),
error::ColumnExistsSnafu {
table_name,
column_name: col_to_add.column_schema.name.to_string()
},
);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
}

let SplitResult {
columns_at_first,
columns_at_after,
Expand Down Expand Up @@ -865,6 +899,36 @@ mod tests {
assert_eq!(StatusCode::TableColumnExists, err.status_code());
}

#[test]
fn test_add_invalid_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();

let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new(
"weny",
ConcreteDataType::string_datatype(),
false,
),
is_key: false,
location: None,
}],
};

let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}

#[test]
fn test_remove_unknown_column() {
let schema = Arc::new(new_test_schema());
Expand Down
Loading