Skip to content

Commit

Permalink
Merge branch 'main' into base
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang committed Jun 21, 2023
2 parents a3391bd + 9a9c0f7 commit 64ecd85
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 115 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use common_types::table::ShardId;
use log::info;
use log::{error, info};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::{engine::TableDef, table::TableId};
Expand Down Expand Up @@ -273,6 +273,11 @@ impl ShardOpener {

/// Recover table meta data from manifest based on shard.
async fn recover_table_metas(&mut self) -> Result<()> {
info!(
"ShardOpener recover table metas begin, shard_id:{}",
self.shard_id
);

for (table_id, state) in self.stages.iter_mut() {
match state {
// Only do the meta recovery work in `RecoverTableMeta` state.
Expand All @@ -288,7 +293,10 @@ impl ShardOpener {
let table_data = ctx.space.find_table_by_id(*table_id);
Ok(table_data.map(|data| (data, ctx.space.clone())))
}
Err(e) => Err(e),
Err(e) => {
error!("ShardOpener recover single table meta failed, table:{:?}, shard_id:{}", ctx.table_def, self.shard_id);
Err(e)
}
};

match result {
Expand All @@ -313,11 +321,20 @@ impl ShardOpener {
}
}

info!(
"ShardOpener recover table metas finish, shard_id:{}",
self.shard_id
);
Ok(())
}

/// Recover table data based on shard.
async fn recover_table_datas(&mut self) -> Result<()> {
info!(
"ShardOpener recover table datas begin, shard_id:{}",
self.shard_id
);

// Replay wal logs of tables.
let mut replay_table_datas = Vec::with_capacity(self.stages.len());
for (table_id, stage) in self.stages.iter_mut() {
Expand Down Expand Up @@ -370,6 +387,7 @@ impl ShardOpener {
}

(TableOpenStage::RecoverTableData(_), Some(e)) => {
error!("ShardOpener replay wals of single table failed, table:{}, table_id:{}, shard_id:{}", table_data.name, table_data.id, self.shard_id);
*stage = TableOpenStage::Failed(e);
}

Expand All @@ -381,6 +399,10 @@ impl ShardOpener {
}
}

info!(
"ShardOpener recover table datas finish, shard_id:{}",
self.shard_id
);
Ok(())
}

Expand Down
29 changes: 28 additions & 1 deletion analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ use std::{
use async_trait::async_trait;
use common_types::{schema::IndexInWriterSchema, table::ShardId};
use common_util::error::BoxError;
use lazy_static::lazy_static;
use log::{debug, error, info, trace};
use snafu::{OptionExt, ResultExt};
use prometheus::{exponential_buckets, register_histogram, Histogram};
use snafu::ResultExt;
use table_engine::table::TableId;
use tokio::sync::MutexGuard;
use wal::{
Expand All @@ -34,6 +37,22 @@ use crate::{
table::data::{SerialExecContext, TableDataRef},
};

// Metrics of wal replayer
lazy_static! {
static ref PULL_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!(
"wal_replay_pull_logs_duration",
"Histogram for pull logs duration in wal replay in seconds",
exponential_buckets(0.01, 2.0, 13).unwrap()
)
.unwrap();
static ref APPLY_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!(
"wal_replay_apply_logs_duration",
"Histogram for apply logs duration in wal replay in seconds",
exponential_buckets(0.01, 2.0, 13).unwrap()
)
.unwrap();
}

/// Wal replayer supporting both table based and region based
// TODO: limit the memory usage in `RegionBased` mode.
pub struct WalReplayer<'a> {
Expand Down Expand Up @@ -189,18 +208,21 @@ impl TableBasedReplay {
let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size);
loop {
// fetch entries to log_entry_buf
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

// Replay all log entries of current table
let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
Expand All @@ -209,6 +231,7 @@ impl TableBasedReplay {
log_entry_buf.iter(),
)
.await?;
drop(timer);
}

Ok(())
Expand Down Expand Up @@ -282,19 +305,23 @@ impl RegionBasedReplay {

// Split and replay logs.
loop {
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds)
let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds)
.await?;
drop(timer);
}

Ok(())
Expand Down
32 changes: 29 additions & 3 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync {
// `SnapshotReoverer`.
#[derive(Debug, Clone)]
struct SnapshotRecoverer<LogStore, SnapshotStore> {
table_id: TableId,
space_id: SpaceId,
log_store: LogStore,
snapshot_store: SnapshotStore,
}
Expand All @@ -217,6 +219,11 @@ where
}

async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result<Snapshot> {
debug!(
"Manifest recover with prev snapshot, snapshot:{:?}, table_id:{}, space_id:{}",
prev_snapshot, self.table_id, self.space_id
);

let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq);
let mut reader = self.log_store.scan(log_start_boundary).await?;

Expand All @@ -239,6 +246,11 @@ where
}

async fn create_latest_snapshot_without_prev(&self) -> Result<Option<Snapshot>> {
debug!(
"Manifest recover without prev snapshot, table_id:{}, space_id:{}",
self.table_id, self.space_id
);

let mut reader = self.log_store.scan(ReadBoundary::Min).await?;

let mut latest_seq = SequenceNumber::MIN;
Expand All @@ -258,6 +270,10 @@ where
data: manifest_data_builder.build(),
}))
} else {
debug!(
"Manifest recover nothing, table_id:{}, space_id:{}",
self.table_id, self.space_id
);
Ok(None)
}
}
Expand Down Expand Up @@ -474,7 +490,7 @@ impl Manifest for ManifestImpl {
}

async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> {
info!("Manifest recover, request:{:?}", load_req);
info!("Manifest recover begin, request:{load_req:?}");

// Load table meta snapshot from storage.
let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64());
Expand All @@ -490,6 +506,8 @@ impl Manifest for ManifestImpl {
self.store.clone(),
);
let reoverer = SnapshotRecoverer {
table_id: load_req.table_id,
space_id: load_req.space_id,
log_store,
snapshot_store,
};
Expand All @@ -505,6 +523,8 @@ impl Manifest for ManifestImpl {
self.table_meta_set.apply_edit_to_table(request)?;
}

info!("Manifest recover finish, request:{load_req:?}");

Ok(())
}

Expand Down Expand Up @@ -1386,7 +1406,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1418,7 +1439,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1446,10 +1468,14 @@ mod tests {
}

async fn recover_snapshot(
table_id: TableId,
space_id: SpaceId,
log_store: &MemLogStore,
snapshot_store: &MemSnapshotStore,
) -> Option<Snapshot> {
let recoverer = SnapshotRecoverer {
table_id,
space_id,
log_store: log_store.clone(),
snapshot_store: snapshot_store.clone(),
};
Expand Down
Loading

0 comments on commit 64ecd85

Please sign in to comment.