Skip to content

Commit

Permalink
feat: support region based wal replay (#976)
Browse files Browse the repository at this point in the history
## Rationale
Part of #799 

## Detailed Changes
- Define `WalReplayer` to carry out replay work.
- Support both `TableBased`(original) and `RegionBased` replay mode in
`WalReplayer`.
- Expose related configs.

## Test Plan
- Modify exist unit tests to cover the `RegionBased` wal replay.
- Refactor the integration test to cover recovery logic(TODO).
  • Loading branch information
Rachelint authored Jun 15, 2023
1 parent 8c001a8 commit 1fd17f3
Show file tree
Hide file tree
Showing 13 changed files with 979 additions and 303 deletions.
16 changes: 15 additions & 1 deletion analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,18 @@ pub enum Error {

#[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
OpenTablesOfShard { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))]
ReplayWalWithCause {
msg: Option<String>,
source: GenericError,
},

#[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))]
ReplayWalNoCause {
msg: Option<String>,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down Expand Up @@ -250,7 +262,9 @@ impl From<Error> for table_engine::engine::Error {
| Error::DoManifestSnapshot { .. }
| Error::OpenManifest { .. }
| Error::TableNotExist { .. }
| Error::OpenTablesOfShard { .. } => Self::Unexpected {
| Error::OpenTablesOfShard { .. }
| Error::ReplayWalNoCause { .. }
| Error::ReplayWalWithCause { .. } => Self::Unexpected {
source: Box::new(err),
},
}
Expand Down
23 changes: 22 additions & 1 deletion analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ pub enum Error {

#[snafu(display("Other failure, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))]
Other { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to run flush job, msg:{:?}, err:{}", msg, source))]
FlushJobWithCause {
msg: Option<String>,
source: GenericError,
},

#[snafu(display("Failed to run flush job, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))]
FlushJobNoCause {
msg: Option<String>,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down Expand Up @@ -163,6 +175,7 @@ pub struct TableFlushRequest {
pub max_sequence: SequenceNumber,
}

#[derive(Clone)]
pub struct Flusher {
pub space_store: SpaceStoreRef,

Expand Down Expand Up @@ -311,7 +324,15 @@ impl FlushTask {
// Start flush duration timer.
let local_metrics = self.table_data.metrics.local_flush_metrics();
let _timer = local_metrics.start_flush_timer();
self.dump_memtables(request_id, &mems_to_flush).await?;
self.dump_memtables(request_id, &mems_to_flush)
.await
.box_err()
.context(FlushJobWithCause {
msg: Some(format!(
"table:{}, table_id:{}, request_id:{request_id}",
self.table_data.name, self.table_data.id
)),
})?;

self.table_data
.set_last_flush_time(time::current_time_millis());
Expand Down
4 changes: 3 additions & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) mod mem_collector;
pub mod open;
mod read;
pub(crate) mod serial_executor;
pub mod wal_replayer;
pub(crate) mod write;

use std::sync::Arc;
Expand Down Expand Up @@ -44,7 +45,7 @@ use crate::{
meta_data::cache::MetaCacheRef,
},
table::data::{TableDataRef, TableShardInfo},
TableOptions,
RecoverMode, TableOptions,
};

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -159,6 +160,7 @@ pub struct Instance {
/// Options for scanning sst
pub(crate) scan_options: ScanOptions,
pub(crate) iter_options: Option<IterOptions>,
pub(crate) recover_mode: RecoverMode,
}

impl Instance {
Expand Down
Loading

0 comments on commit 1fd17f3

Please sign in to comment.