Skip to content

Commit

Permalink
Feat concurrent replay wal (#1505)
Browse files Browse the repository at this point in the history
## Rationale
Close #1498

## Detailed Changes
Recover table logs in parallel.

## Test Plan
CI

---------

Co-authored-by: chenmian.cm <chenmian.cm@antfin.com>
  • Loading branch information
zealchen and chenmian.cm authored Mar 25, 2024
1 parent 54a5db7 commit 44caa99
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,24 @@ impl Replay for TableBasedReplay {
batch_size: context.wal_replay_batch_size,
..Default::default()
};
for table_data in table_datas {
let table_id = table_data.id;
if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await {

let mut tasks = futures::stream::iter(
table_datas
.iter()
.map(|table_data| {
let table_id = table_data.id;
let read_ctx = &read_ctx;
async move {
let ret = Self::recover_table_logs(context, table_data, read_ctx).await;
(table_id, ret)
}
})
.collect::<Vec<_>>(),
)
.buffer_unordered(20);
while let Some((table_id, ret)) = tasks.next().await {
if let Err(e) = ret {
// If occur error, mark this table as failed and store the cause.
failed_tables.insert(table_id, e);
}
}
Expand Down

0 comments on commit 44caa99

Please sign in to comment.