Skip to content

Commit

Permalink
refactor: move AlterTable UpdateMetadata to last step
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 19, 2023
1 parent f254664 commit dc21e96
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
75 changes: 40 additions & 35 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ impl AlterTableProcedure {
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;

let alter_kind = self.alter_kind()?;
let manager = &self.context.table_metadata_manager;

if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? {
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);

let exist = manager
Expand All @@ -146,7 +147,11 @@ impl AlterTableProcedure {
}
);

self.data.state = AlterTableState::UpdateMetadata;
if matches!(alter_kind, Kind::RenameTable { .. }) {
self.data.state = AlterTableState::UpdateMetadata;
} else {
self.data.state = AlterTableState::SubmitAlterRegionRequests;
};

Ok(Status::executing(true))
}
Expand Down Expand Up @@ -174,7 +179,7 @@ impl AlterTableProcedure {
})
}

pub async fn submit_alter_region_requests(&self) -> Result<Status> {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

Expand All @@ -191,39 +196,44 @@ impl AlterTableProcedure {
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let datanode_manager = self.context.datanode_manager.clone();
let datanode_manager = self.context.datanode_manager.clone();

for datanode in leaders {
let regions = find_leader_regions(&region_routes, &datanode);

alter_region_tasks.push(async move {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let requester = datanode_manager.datanode(&datanode).await;
let requester = datanode_manager.datanode(&datanode).await;

for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");

let datanode_clone = datanode.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(e));
return Err(handle_operate_region_error(datanode_clone)(e));
}
}
Ok(())
});
Ok(())
});
}
}

future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(Status::Done)
self.data.state = AlterTableState::UpdateMetadata;

Ok(Status::executing(true))
}

/// Update table metadata for rename table operation.
Expand Down Expand Up @@ -313,22 +323,17 @@ impl AlterTableProcedure {
let alter_kind = self.alter_kind()?;
let cache_invalidator = &self.context.cache_invalidator;

let status = if matches!(alter_kind, Kind::RenameTable { .. }) {
if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.await?;

Status::Done
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.await?;

self.data.state = AlterTableState::SubmitAlterRegionRequests;

Status::executing(true)
};
Ok(status)

Ok(Status::Done)
}

fn lock_key_inner(&self) -> Vec<String> {
Expand Down Expand Up @@ -376,9 +381,9 @@ impl Procedure for AlterTableProcedure {

match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
}
.map_err(error_handler)
}
Expand All @@ -398,11 +403,11 @@ impl Procedure for AlterTableProcedure {
enum AlterTableState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
SubmitAlterRegionRequests,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ async fn test_submit_alter_region_requests() {
.await
.unwrap();

let procedure = AlterTableProcedure::new(
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(table_info),
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn test_submit_alter_region_requests() {
});

let status = procedure.submit_alter_region_requests().await.unwrap();
assert!(matches!(status, Status::Done));
assert!(matches!(status, Status::Executing { persist: true }));

handle.await.unwrap();

Expand Down

0 comments on commit dc21e96

Please sign in to comment.