Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat concurrent replay wal #1505

Merged
merged 10 commits into from
Mar 25, 2024
Merged

Conversation

zealchen
Copy link
Contributor

@zealchen zealchen commented Mar 23, 2024

Rationale

Close #1498

Detailed Changes

Recover table logs in parallel.

Test Plan

CI

@zealchen zealchen force-pushed the feat_concurrent_replay_wal branch from bc25819 to b1ff6b6 Compare March 25, 2024 06:28
@@ -182,13 +182,28 @@ impl Replay for TableBasedReplay {
debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",);

let mut failed_tables = HashMap::new();
let read_ctx = ReadContext {
let read_ctx = Arc::new(ReadContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Arc is not required.

-        for table_data in table_datas {
-            let table_id = table_data.id;
-            if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await {
+
+        let mut tasks = futures::stream::iter(
+            table_datas
+                .iter()
+                .map(|table_data| {
+                    let table_id = table_data.id;
+                    let read_ctx = &read_ctx;
+                    async move {
+                        let ret = Self::recover_table_logs(context, table_data, read_ctx).await;
+                        (table_id, ret)
+                    }
+                })
+                .collect::<Vec<_>>(),
+        )
+        .buffer_unordered(20);
+        while let Some((table_id, ret)) = tasks.next().await {
+            if let Err(e) = ret {
+                // If occur error, mark this table as failed and store the cause.
                 failed_tables.insert(table_id, e);
             }
         }

This compile success for me.

Copy link
Contributor

@jiacai2050 jiacai2050 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jiacai2050 jiacai2050 merged commit 44caa99 into apache:main Mar 25, 2024
10 checks passed
zealchen added a commit to zealchen/incubator-horaedb that referenced this pull request Apr 9, 2024
## Rationale
Close apache#1498

## Detailed Changes
Recover table logs in parallel.

## Test Plan
CI

---------

Co-authored-by: chenmian.cm <chenmian.cm@antfin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Replay WAL of different tables concurrently for TableBasedReplay
2 participants