Skip to content

Commit

Permalink
feat: use instead forked rskafka to support limited retry (#1005)
Browse files Browse the repository at this point in the history
## Rationale
Part of #799 
We use `rskafka` as our kafka client.
However I found it will retry without limit even though kafka service is
unavailable...
(see
[https://github.com/influxdata/rskafka/issues/65](https://github.com/influxdata/rskafka/issues/65))
Worse, I found `rskafka` is almostis no longer maintained...

For quick fix, I forked it to support limited retry.

## Detailed Changes
+ Use the instead forked `rskafka`(supporting limited retry).
+ Add more logs in recovery path for better debugging.

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored Jun 20, 2023
1 parent 41fe63a commit e3b4009
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use common_types::table::ShardId;
use log::info;
use log::{error, info};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::{engine::TableDef, table::TableId};
Expand Down Expand Up @@ -273,6 +273,11 @@ impl ShardOpener {

/// Recover table meta data from manifest based on shard.
async fn recover_table_metas(&mut self) -> Result<()> {
info!(
"ShardOpener recover table metas begin, shard_id:{}",
self.shard_id
);

for (table_id, state) in self.stages.iter_mut() {
match state {
// Only do the meta recovery work in `RecoverTableMeta` state.
Expand All @@ -288,7 +293,10 @@ impl ShardOpener {
let table_data = ctx.space.find_table_by_id(*table_id);
Ok(table_data.map(|data| (data, ctx.space.clone())))
}
Err(e) => Err(e),
Err(e) => {
error!("ShardOpener recover single table meta failed, table:{:?}, shard_id:{}", ctx.table_def, self.shard_id);
Err(e)
}
};

match result {
Expand All @@ -313,11 +321,20 @@ impl ShardOpener {
}
}

info!(
"ShardOpener recover table metas finish, shard_id:{}",
self.shard_id
);
Ok(())
}

/// Recover table data based on shard.
async fn recover_table_datas(&mut self) -> Result<()> {
info!(
"ShardOpener recover table datas begin, shard_id:{}",
self.shard_id
);

// Replay wal logs of tables.
let mut replay_table_datas = Vec::with_capacity(self.stages.len());
for (table_id, stage) in self.stages.iter_mut() {
Expand Down Expand Up @@ -370,6 +387,7 @@ impl ShardOpener {
}

(TableOpenStage::RecoverTableData(_), Some(e)) => {
error!("ShardOpener replay wals of single table failed, table:{}, table_id:{}, shard_id:{}", table_data.name, table_data.id, self.shard_id);
*stage = TableOpenStage::Failed(e);
}

Expand All @@ -381,6 +399,10 @@ impl ShardOpener {
}
}

info!(
"ShardOpener recover table datas finish, shard_id:{}",
self.shard_id
);
Ok(())
}

Expand Down
32 changes: 29 additions & 3 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync {
// `SnapshotReoverer`.
#[derive(Debug, Clone)]
struct SnapshotRecoverer<LogStore, SnapshotStore> {
table_id: TableId,
space_id: SpaceId,
log_store: LogStore,
snapshot_store: SnapshotStore,
}
Expand All @@ -217,6 +219,11 @@ where
}

async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result<Snapshot> {
debug!(
"Manifest recover with prev snapshot, snapshot:{:?}, table_id:{}, space_id:{}",
prev_snapshot, self.table_id, self.space_id
);

let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq);
let mut reader = self.log_store.scan(log_start_boundary).await?;

Expand All @@ -239,6 +246,11 @@ where
}

async fn create_latest_snapshot_without_prev(&self) -> Result<Option<Snapshot>> {
debug!(
"Manifest recover without prev snapshot, table_id:{}, space_id:{}",
self.table_id, self.space_id
);

let mut reader = self.log_store.scan(ReadBoundary::Min).await?;

let mut latest_seq = SequenceNumber::MIN;
Expand All @@ -258,6 +270,10 @@ where
data: manifest_data_builder.build(),
}))
} else {
debug!(
"Manifest recover nothing, table_id:{}, space_id:{}",
self.table_id, self.space_id
);
Ok(None)
}
}
Expand Down Expand Up @@ -474,7 +490,7 @@ impl Manifest for ManifestImpl {
}

async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> {
info!("Manifest recover, request:{:?}", load_req);
info!("Manifest recover begin, request:{load_req:?}");

// Load table meta snapshot from storage.
let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64());
Expand All @@ -490,6 +506,8 @@ impl Manifest for ManifestImpl {
self.store.clone(),
);
let reoverer = SnapshotRecoverer {
table_id: load_req.table_id,
space_id: load_req.space_id,
log_store,
snapshot_store,
};
Expand All @@ -505,6 +523,8 @@ impl Manifest for ManifestImpl {
self.table_meta_set.apply_edit_to_table(request)?;
}

info!("Manifest recover finish, request:{load_req:?}");

Ok(())
}

Expand Down Expand Up @@ -1386,7 +1406,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1418,7 +1439,8 @@ mod tests {
assert_eq!(snapshot.data, expect_table_manifest_data);
assert_eq!(snapshot.end_seq, log_store.next_seq() - 1);

let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await;
let recovered_snapshot =
recover_snapshot(table_id, 0, &log_store, &snapshot_store).await;
assert_eq!(snapshot, recovered_snapshot.unwrap());
}
// The logs in the log store should be cleared after snapshot.
Expand Down Expand Up @@ -1446,10 +1468,14 @@ mod tests {
}

async fn recover_snapshot(
table_id: TableId,
space_id: SpaceId,
log_store: &MemLogStore,
snapshot_store: &MemSnapshotStore,
) -> Option<Snapshot> {
let recoverer = SnapshotRecoverer {
table_id,
space_id,
log_store: log_store.clone(),
snapshot_store: snapshot_store.clone(),
};
Expand Down
4 changes: 2 additions & 2 deletions components/message_queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ snafu = { workspace = true }
tokio = { workspace = true }

[dependencies.rskafka]
git = "https://github.com/influxdata/rskafka.git"
rev = "00988a564b1db0249d858065fc110476c075efad"
git = "https://github.com/Rachelint/rskafka.git"
rev = "f0fd8e278d8164cb0cfca5a80476361fc308ecc3"
default-features = false
features = ["compression-gzip", "compression-lz4", "compression-snappy"]

Expand Down
21 changes: 20 additions & 1 deletion components/message_queue/src/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,39 @@

//! Kafka implementation's config

use common_util::config::ReadableDuration;
use serde::{Deserialize, Serialize};

/// Generic client config that is used for consumers, producers as well as admin
/// operations (like "create topic").
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub client: ClientConfig,
pub topic_management: TopicManagementConfig,
pub consumer: ConsumerConfig,
pub retry_interval_factor: f64,
pub init_retry_interval: ReadableDuration,
pub max_retry_interval: ReadableDuration,
pub max_retry: usize,
// TODO: may need some config options for producer,
// but it seems nothing needed now.
}

impl Default for Config {
fn default() -> Self {
Self {
client: Default::default(),
topic_management: Default::default(),
consumer: Default::default(),
retry_interval_factor: 2.0,
init_retry_interval: ReadableDuration::secs(1),
max_retry_interval: ReadableDuration::secs(10),
max_retry: 10,
}
}
}

#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ClientConfig {
Expand Down
10 changes: 9 additions & 1 deletion components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rskafka::{
Client, ClientBuilder,
},
record::{Record, RecordAndOffset},
BackoffConfig,
};
use snafu::{Backtrace, ResultExt, Snafu};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -141,7 +142,14 @@ impl KafkaImplInner {
panic!("The boost broker must be set");
}

let mut client_builder = ClientBuilder::new(config.client.boost_brokers.clone().unwrap());
let backoff_config = BackoffConfig {
init_backoff: config.init_retry_interval.0,
max_backoff: config.max_retry_interval.0,
base: config.retry_interval_factor,
max_retry: config.max_retry,
};
let mut client_builder = ClientBuilder::new(config.client.boost_brokers.clone().unwrap())
.backoff_config(backoff_config);
if let Some(max_message_size) = config.client.max_message_size {
client_builder = client_builder.max_message_size(max_message_size);
}
Expand Down

0 comments on commit e3b4009

Please sign in to comment.