Skip to content

Commit

Permalink
refactor: send region request in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 19, 2023
1 parent dc21e96 commit 1958e1f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 39 deletions.
8 changes: 3 additions & 5 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,9 @@ impl AlterTableProcedure {
let leaders = find_leaders(&region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

let datanode_manager = self.context.datanode_manager.clone();

for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
let requester = datanode_manager.datanode(&datanode).await;

for region in regions {
let region_id = RegionId::new(table_id, region);
Expand All @@ -214,12 +212,12 @@ impl AlterTableProcedure {
};
debug!("Submitting {request:?} to {datanode}");

let datanode_clone = datanode.clone();
let datanode = datanode.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode_clone)(e));
return Err(handle_operate_region_error(datanode)(e));
}
Ok(())
});
Expand Down
32 changes: 17 additions & 15 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl CreateTableProcedure {
let mut create_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let manager = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
Expand All @@ -197,23 +197,25 @@ impl CreateTableProcedure {
})
.collect::<Vec<_>>();

create_region_tasks.push(async move {
for request in requests {
let requester = manager.datanode(&datanode).await;

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};

let datanode = datanode.clone();
let requester = requester.clone();

create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
}
Ok(())
});
Ok(())
});
}
}

join_all(create_region_tasks)
Expand Down
41 changes: 22 additions & 19 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,36 +142,39 @@ impl DropTableProcedure {
let mut drop_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let clients = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

drop_region_tasks.push(async move {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();

drop_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}
}
}
Ok(())
});
Ok(())
});
}
}

join_all(drop_region_tasks)
Expand Down

0 comments on commit 1958e1f

Please sign in to comment.