Skip to content

Commit

Permalink
chore: fix logs and style (#1011)
Browse files Browse the repository at this point in the history
## Rationale
See title.

## Detailed Changes
See title.

## Test Plan
None.
  • Loading branch information
Rachelint authored Jun 25, 2023
1 parent 9a9c0f7 commit 3e4eb78
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl ShardOpener {
Ok(table_data.map(|data| (data, ctx.space.clone())))
}
Err(e) => {
error!("ShardOpener recover single table meta failed, table:{:?}, shard_id:{}", ctx.table_def, self.shard_id);
error!("ShardOpener recover single table meta failed, table:{:?}, shard_id:{}, err:{e}", ctx.table_def, self.shard_id);
Err(e)
}
};
Expand Down Expand Up @@ -387,7 +387,7 @@ impl ShardOpener {
}

(TableOpenStage::RecoverTableData(_), Some(e)) => {
error!("ShardOpener replay wals of single table failed, table:{}, table_id:{}, shard_id:{}", table_data.name, table_data.id, self.shard_id);
error!("ShardOpener replay wals of single table failed, table:{}, table_id:{}, shard_id:{}, err:{e}", table_data.name, table_data.id, self.shard_id);
*stage = TableOpenStage::Failed(e);
}

Expand Down
12 changes: 4 additions & 8 deletions analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,20 @@ impl TableBasedReplay {
let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size);
loop {
// fetch entries to log_entry_buf
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

// Replay all log entries of current table
let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
Expand All @@ -227,7 +226,6 @@ impl TableBasedReplay {
log_entry_buf.iter(),
)
.await?;
drop(timer);
}

Ok(())
Expand Down Expand Up @@ -298,23 +296,21 @@ impl RegionBasedReplay {

// Split and replay logs.
loop {
let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder::default();
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
drop(timer);

if log_entry_buf.is_empty() {
break;
}

let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds)
.await?;
drop(timer);
}

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions wal/src/message_queue_impl/log_cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Log cleaner

Expand Down Expand Up @@ -68,7 +68,7 @@ impl<M: MessageQueue> LogCleaner<M> {

pub async fn maybe_clean_logs(&mut self, safe_delete_offset: Offset) -> Result<()> {
info!(
"Begin to check and clean logs, region id:{}, topic:{}, safe delete offset:{:?}",
"Region clean logs begin, region id:{}, topic:{}, safe delete offset:{:?}",
self.region_id, self.log_topic, safe_delete_offset
);

Expand Down Expand Up @@ -102,7 +102,7 @@ impl<M: MessageQueue> LogCleaner<M> {
}

info!(
"Finished to check and clean logs, do clean:{}, region id:{}, topic:{}, prepare delete to offset:{:?}",
"Region clean logs finish, do clean:{}, region id:{}, topic:{}, prepare delete to offset:{:?}",
do_clean, self.region_id, self.log_topic, safe_delete_offset
);

Expand Down
2 changes: 2 additions & 0 deletions wal/src/message_queue_impl/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ impl<M: MessageQueue> Region<M> {
};

let safe_delete_offset = snapshot.safe_delete_offset();
info!("Region clean logs, snapshot:{snapshot:?}, safe_delete_offset:{safe_delete_offset}");

// Sync snapshot first.
synchronizer
.sync(snapshot)
Expand Down

0 comments on commit 3e4eb78

Please sign in to comment.