Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replay logs of different tables in parallel #1492

Merged
merged 10 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
dashmap = "5.5.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a required dependency for this task?

If HashMap works, I prefer to stick with it first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to run 'replay_table_log_entries' concurrently, but I faced an issue with 'serial_exec_ctxs', which is a mutable reference HashMap. I had to wrap this by Arc and Mutex, then every time I grap a mutable reference to a value from the map, it locks the entire map.

DashMap allowing concurrent access to different keys. I wonder if there's an appoach to make Hashmap work in this case.

Copy link
Contributor

@jiacai2050 jiacai2050 Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then every time I grap a mutable reference to a value from the map, it locks the entire map.

I think it's fine to use plain Mutex here, since they are not the bottle neck, replay_table_log_entries is the most heavy task in this place.

Also, there is a partitioned lock in our codebase, you can use if you want to optimize here:
https://github.com/apache/incubator-horaedb/blob/9f166f3daa9a02ef8af1e733c22f956ab97e7aaf/src/components/partitioned_lock/src/lib.rs#L130

Copy link
Contributor Author

@Lethannn Lethannn Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then every time I grap a mutable reference to a value from the map, it locks the entire map.

I think it's fine to use plain Mutex here, since they are not the bottle neck, replay_table_log_entries is the most heavy task in this place.

Also, there is a partitioned lock in our codebase, you can use if you want to optimize here:

https://github.com/apache/incubator-horaedb/blob/9f166f3daa9a02ef8af1e733c22f956ab97e7aaf/src/components/partitioned_lock/src/lib.rs#L130

Awesome! I'm gonna check it out. Thx for the heads up

datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
derive_builder = "0.12"
Expand Down
1 change: 1 addition & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ base64 = { workspace = true }
bytes_ext = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
future_ext = { workspace = true }
futures = { workspace = true }
Expand Down
73 changes: 49 additions & 24 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ use common_types::{
schema::{IndexInWriterSchema, Schema},
table::ShardId,
};
use dashmap::{mapref::one::RefMut, DashMap};
use futures::StreamExt;
use generic_error::BoxError;
use lazy_static::lazy_static;
use logger::{debug, error, info, trace, warn};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::MutexGuard;
use tokio::sync::{Mutex, MutexGuard};
use wal::{
log_batch::LogEntry,
manager::{
Expand Down Expand Up @@ -374,31 +376,54 @@ impl RegionBasedReplay {
// TODO: No `group_by` method in `VecDeque`, so implement it manually here...
Self::split_log_batch_by_table(log_batch, &mut table_batches);

// TODO: Replay logs of different tables in parallel.
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}

// Replay all log entries of current table.
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
)
.await;

// If occur error, mark this table as failed and store the cause.
if let Err(e) = result {
failed_tables.insert(table_batch.table_id, e);
let alter_failed_tables = HashMap::new();
let alter_failed_tables_ref = Arc::new(Mutex::new(alter_failed_tables));

let mut serial_exec_ctxs_dash_map = DashMap::new();
Copy link
Contributor

@jiacai2050 jiacai2050 Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map seems unnecessary, what I think of is like this:

modified   src/analytic_engine/src/instance/wal_replayer.rs
@@ -29,6 +29,7 @@ use common_types::{
     schema::{IndexInWriterSchema, Schema},
     table::ShardId,
 };
+use futures::StreamExt;
 use generic_error::BoxError;
 use lazy_static::lazy_static;
 use logger::{debug, error, info, trace, warn};
@@ -374,6 +375,7 @@ impl RegionBasedReplay {
         // TODO: No `group_by` method in `VecDeque`, so implement it manually here...
         Self::split_log_batch_by_table(log_batch, &mut table_batches);
 
+        let mut replay_tasks = Vec::with_capacity(table_batches.len());
         // TODO: Replay logs of different tables in parallel.
         for table_batch in table_batches {
             // Some tables may have failed in previous replay, ignore them.
@@ -384,22 +386,27 @@ impl RegionBasedReplay {
             // Replay all log entries of current table.
             // Some tables may have been moved to other shards or dropped, ignore such logs.
             if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
-                let result = replay_table_log_entries(
+                replay_tasks.push(replay_table_log_entries(
                     &context.flusher,
                     context.max_retry_flush_limit,
                     &mut ctx.serial_exec,
                     &ctx.table_data,
                     log_batch.range(table_batch.range),
-                )
-                .await;
+                ));
 
-                // If occur error, mark this table as failed and store the cause.
-                if let Err(e) = result {
-                    failed_tables.insert(table_batch.table_id, e);
-                }
+                // If occur error, mark this table as failed and store the
+                // cause. if let Err(e) = result {
+                //     failed_tables.insert(table_batch.table_id, e);
+                // }
             }
         }
-
+        for ret in futures::stream::iter(replay_tasks)
+            .buffer_unordered(20)
+            .collect::<Vec<_>>()
+            .await
+        {
+            // insert to failed_tables in there are errors
+        }
         Ok(())
     }

But this compile failed due to mutable reference

error[E0499]: cannot borrow `*serial_exec_ctxs` as mutable more than once at a time
   --> src/analytic_engine/src/instance/wal_replayer.rs:388:32
    |
388 |             if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) {
    |                                ^^^^^^^^^^^^^^^^ `*serial_exec_ctxs` was mutably borrowed here in the previous iteration of the loop
...
403 |         for ret in futures::stream::iter(replay_tasks)
    |                                          ------------ first borrow used here, in later iteration of loop


So the first step to do this task is to remove those mutable references.

The fix should be easy, just define serial_exec_ctxs with Arc<Mutex<HashMap>> type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async fn replay_single_batch(
    context: &ReplayContext,
    log_batch:  VecDeque<LogEntry<ReadPayload>>,
    serial_exec_ctxs: Arc<tokio::sync::Mutex<HashMap<TableId, SerialExecContext<'_>>>>,
    failed_tables: &mut FailedTables,
) -> Result<()> {
    let mut table_batches = Vec::new();
    // TODO: No `group_by` method in `VecDeque`, so implement it manually here...
    Self::split_log_batch_by_table(log_batch, &mut table_batches);

    // TODO: Replay logs of different tables in parallel.
    let mut replay_tasks = Vec::with_capacity(table_batches.len());

    for table_batch in table_batches {
        // Some tables may have failed in previous replay, ignore them.
        if failed_tables.contains_key(&table_batch.table_id) {
            continue;
        }

        let serial_exec_ctxs = serial_exec_ctxs.clone();
        replay_tasks.push(async move {
            if let Some(ctx) = serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
                let result = replay_table_log_entries(
                    &context.flusher,
                    context.max_retry_flush_limit,
                    &mut ctx.serial_exec,
                    &ctx.table_data,
                    log_batch.range(table_batch.range),
                )
                    .await;
                (table_batch.table_id, result)
            } else {
                (table_batch.table_id, Ok(()))
            }
        });
    }

    for (table_id, ret) in futures::stream::iter(replay_tasks)
        .buffer_unordered(20)
        .collect::<Vec<_>>()
        .await
    {
        // If occur error, mark this table as failed and store the cause.
        if let Err(e) = ret {
            failed_tables.insert(table_id, e);
        }
    }

    Ok(())
}

I ran into the same compile failed before. Here is my code. Is this what you were expecting? However, My concern is, wouldn't serial_exec_ctxs.lock().await.get_mut break concurrency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into the same compile failed before.

I push my commits to your branch, it compile OK.

wouldn't serial_exec_ctxs.lock().await.get_mut break concurrency?

Yes, this step will be run in serially, but we make replay_table_log_entries concurrent, which is what we want.

serial_exec_ctxs_dash_map.extend(serial_exec_ctxs);
let serial_exec_ctxs_dash_map_ref = Arc::new(serial_exec_ctxs_dash_map);

// Some tables may have failed in previous replay, ignore them.
futures::stream::iter(
table_batches
.into_iter()
.filter(|table_batch| !failed_tables.contains_key(&table_batch.table_id)),
)
.for_each_concurrent(None, |table_batch| {
let alter_failed_tables_ref = Arc::clone(&alter_failed_tables_ref);
let serial_exec_ctxs_dash_map_ref = Arc::clone(&serial_exec_ctxs_dash_map_ref);
async move {
// Replay all log entries of current table.
// Some tables may have been moved to other shards or dropped, ignore such logs.
if let Some(mut ctx) = serial_exec_ctxs_dash_map_ref.get_mut(&table_batch.table_id)
{
let ctx = RefMut::value_mut(&mut ctx);

let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
)
.await;

// If occur error, mark this table as failed and store the cause.
if let Err(e) = result {
alter_failed_tables_ref
.lock()
.await
.insert(table_batch.table_id, e);
}
}
}
}
})
.await;

let alter_failed_tables = Arc::try_unwrap(alter_failed_tables_ref)
.unwrap()
.into_inner();
failed_tables.extend(alter_failed_tables);

Ok(())
}
Expand Down
Loading