diff --git a/Cargo.lock b/Cargo.lock index e9e56c3b8a..50744b4e51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4214,7 +4214,9 @@ dependencies = [ "bytes", "common_util", "datafusion", + "futures 0.3.28", "log", + "object_store 1.2.2", "parquet", "tokio", ] @@ -5350,8 +5352,8 @@ dependencies = [ [[package]] name = "rskafka" -version = "0.3.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=00988a564b1db0249d858065fc110476c075efad#00988a564b1db0249d858065fc110476c075efad" +version = "0.4.0" +source = "git+https://github.com/Rachelint/rskafka.git?rev=f0fd8e278d8164cb0cfca5a80476361fc308ecc3#f0fd8e278d8164cb0cfca5a80476361fc308ecc3" dependencies = [ "async-trait", "bytes", diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 70e9d75fee..fd2c19d62d 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -8,7 +8,7 @@ use std::{ }; use common_types::table::ShardId; -use log::info; +use log::{error, info}; use object_store::ObjectStoreRef; use snafu::ResultExt; use table_engine::{engine::TableDef, table::TableId}; @@ -273,6 +273,11 @@ impl ShardOpener { /// Recover table meta data from manifest based on shard. async fn recover_table_metas(&mut self) -> Result<()> { + info!( + "ShardOpener recover table metas begin, shard_id:{}", + self.shard_id + ); + for (table_id, state) in self.stages.iter_mut() { match state { // Only do the meta recovery work in `RecoverTableMeta` state. @@ -288,7 +293,10 @@ impl ShardOpener { let table_data = ctx.space.find_table_by_id(*table_id); Ok(table_data.map(|data| (data, ctx.space.clone()))) } - Err(e) => Err(e), + Err(e) => { + error!("ShardOpener recover single table meta failed, table:{:?}, shard_id:{}", ctx.table_def, self.shard_id); + Err(e) + } }; match result { @@ -313,11 +321,20 @@ impl ShardOpener { } } + info!( + "ShardOpener recover table metas finish, shard_id:{}", + self.shard_id + ); Ok(()) } /// Recover table data based on shard. async fn recover_table_datas(&mut self) -> Result<()> { + info!( + "ShardOpener recover table datas begin, shard_id:{}", + self.shard_id + ); + // Replay wal logs of tables. let mut replay_table_datas = Vec::with_capacity(self.stages.len()); for (table_id, stage) in self.stages.iter_mut() { @@ -370,6 +387,7 @@ impl ShardOpener { } (TableOpenStage::RecoverTableData(_), Some(e)) => { + error!("ShardOpener replay wals of single table failed, table:{}, table_id:{}, shard_id:{}", table_data.name, table_data.id, self.shard_id); *stage = TableOpenStage::Failed(e); } @@ -381,6 +399,10 @@ impl ShardOpener { } } + info!( + "ShardOpener recover table datas finish, shard_id:{}", + self.shard_id + ); Ok(()) } diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 0144833ec6..a5c9b6ba08 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -11,8 +11,11 @@ use std::{ use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; +use lazy_static::lazy_static; use log::{debug, error, info, trace}; use snafu::{OptionExt, ResultExt}; +use prometheus::{exponential_buckets, register_histogram, Histogram}; +use snafu::ResultExt; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -34,6 +37,22 @@ use crate::{ table::data::{SerialExecContext, TableDataRef}, }; +// Metrics of wal replayer +lazy_static! { + static ref PULL_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!( + "wal_replay_pull_logs_duration", + "Histogram for pull logs duration in wal replay in seconds", + exponential_buckets(0.01, 2.0, 13).unwrap() + ) + .unwrap(); + static ref APPLY_LOGS_DURATION_HISTOGRAM: Histogram = register_histogram!( + "wal_replay_apply_logs_duration", + "Histogram for apply logs duration in wal replay in seconds", + exponential_buckets(0.01, 2.0, 13).unwrap() + ) + .unwrap(); +} + /// Wal replayer supporting both table based and region based // TODO: limit the memory usage in `RegionBased` mode. pub struct WalReplayer<'a> { @@ -189,18 +208,21 @@ impl TableBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf + let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); let decoder = WalDecoder::default(); log_entry_buf = log_iter .next_log_entries(decoder, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; + drop(timer); if log_entry_buf.is_empty() { break; } // Replay all log entries of current table + let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer(); replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, @@ -209,6 +231,7 @@ impl TableBasedReplay { log_entry_buf.iter(), ) .await?; + drop(timer); } Ok(()) @@ -282,19 +305,23 @@ impl RegionBasedReplay { // Split and replay logs. loop { + let timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); let decoder = WalDecoder::default(); log_entry_buf = log_iter .next_log_entries(decoder, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; + drop(timer); if log_entry_buf.is_empty() { break; } - Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) + let timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer(); + Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) .await?; + drop(timer); } Ok(()) diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 9e902086b7..5ac94c9b82 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -199,6 +199,8 @@ pub(crate) trait TableMetaSet: fmt::Debug + Send + Sync { // `SnapshotReoverer`. #[derive(Debug, Clone)] struct SnapshotRecoverer { + table_id: TableId, + space_id: SpaceId, log_store: LogStore, snapshot_store: SnapshotStore, } @@ -217,6 +219,11 @@ where } async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result { + debug!( + "Manifest recover with prev snapshot, snapshot:{:?}, table_id:{}, space_id:{}", + prev_snapshot, self.table_id, self.space_id + ); + let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq); let mut reader = self.log_store.scan(log_start_boundary).await?; @@ -239,6 +246,11 @@ where } async fn create_latest_snapshot_without_prev(&self) -> Result> { + debug!( + "Manifest recover without prev snapshot, table_id:{}, space_id:{}", + self.table_id, self.space_id + ); + let mut reader = self.log_store.scan(ReadBoundary::Min).await?; let mut latest_seq = SequenceNumber::MIN; @@ -258,6 +270,10 @@ where data: manifest_data_builder.build(), })) } else { + debug!( + "Manifest recover nothing, table_id:{}, space_id:{}", + self.table_id, self.space_id + ); Ok(None) } } @@ -474,7 +490,7 @@ impl Manifest for ManifestImpl { } async fn recover(&self, load_req: &LoadRequest) -> GenericResult<()> { - info!("Manifest recover, request:{:?}", load_req); + info!("Manifest recover begin, request:{load_req:?}"); // Load table meta snapshot from storage. let location = WalLocation::new(load_req.shard_id as u64, load_req.table_id.as_u64()); @@ -490,6 +506,8 @@ impl Manifest for ManifestImpl { self.store.clone(), ); let reoverer = SnapshotRecoverer { + table_id: load_req.table_id, + space_id: load_req.space_id, log_store, snapshot_store, }; @@ -505,6 +523,8 @@ impl Manifest for ManifestImpl { self.table_meta_set.apply_edit_to_table(request)?; } + info!("Manifest recover finish, request:{load_req:?}"); + Ok(()) } @@ -1386,7 +1406,8 @@ mod tests { assert_eq!(snapshot.data, expect_table_manifest_data); assert_eq!(snapshot.end_seq, log_store.next_seq() - 1); - let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await; + let recovered_snapshot = + recover_snapshot(table_id, 0, &log_store, &snapshot_store).await; assert_eq!(snapshot, recovered_snapshot.unwrap()); } // The logs in the log store should be cleared after snapshot. @@ -1418,7 +1439,8 @@ mod tests { assert_eq!(snapshot.data, expect_table_manifest_data); assert_eq!(snapshot.end_seq, log_store.next_seq() - 1); - let recovered_snapshot = recover_snapshot(&log_store, &snapshot_store).await; + let recovered_snapshot = + recover_snapshot(table_id, 0, &log_store, &snapshot_store).await; assert_eq!(snapshot, recovered_snapshot.unwrap()); } // The logs in the log store should be cleared after snapshot. @@ -1446,10 +1468,14 @@ mod tests { } async fn recover_snapshot( + table_id: TableId, + space_id: SpaceId, log_store: &MemLogStore, snapshot_store: &MemSnapshotStore, ) -> Option { let recoverer = SnapshotRecoverer { + table_id, + space_id, log_store: log_store.clone(), snapshot_store: snapshot_store.clone(), }; diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 47478eca6a..b2181f727b 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -30,17 +30,14 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, }, }; -use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{ - arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, - ProjectionMask, - }, + arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, file::metadata::RowGroupMetaData, }; -use parquet_ext::meta_data::ChunkReader; +use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -281,13 +278,23 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { - let object_store_reader = - ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); + let object_store_reader = ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + parquet_metadata.clone(), + ); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; + let row_selection = self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; + + debug!( + "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", + self.path, + parquet_metadata.page_indexes().is_some() + ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); }; @@ -340,18 +347,32 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self) -> Result { + async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); - let (meta_data, _) = + let (parquet_meta_data, _) = parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter) .await .with_context(|| FetchAndDecodeSstMeta { file_path: self.path.to_string(), })?; - Ok(meta_data) + let object_store_reader = parquet_ext::reader::ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + Arc::new(parquet_meta_data), + ); + + let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader) + .await + .with_context(|| DecodePageIndexes { + file_path: self.path.to_string(), + })?; + + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + .box_err() + .context(DecodeSstMeta) } fn need_update_cache(&self) -> bool { @@ -375,12 +396,8 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { - let parquet_meta_data = self.load_meta_data_from_storage().await?; - let ignore_sst_filter = avoid_update_cache && empty_predicate; - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) - .box_err() - .context(DecodeSstMeta)? + self.load_meta_data_from_storage(ignore_sst_filter).await? }; if avoid_update_cache || self.meta_cache.is_none() { @@ -413,71 +430,6 @@ impl<'a> Drop for Reader<'a> { } } -#[derive(Clone)] -struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, - meta_data: MetaData, - begin: Instant, -} - -impl ObjectStoreReader { - fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { - Self { - storage, - path, - meta_data, - begin: Instant::now(), - } - } -} - -impl Drop for ObjectStoreReader { - fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); - } -} - -impl AsyncFileReader for ObjectStoreReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - self.storage - .get_ranges(&self.path, &ranges) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{e}" - )) - }) - .await - } - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { Ok(self.meta_data.parquet().clone()) }) - } -} - pub struct ChunkReaderAdapter<'a> { path: &'a Path, store: &'a ObjectStoreRef, diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 99872d448a..915190a935 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -15,20 +15,17 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))] + #[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))] ReadAgain { backtrace: Backtrace, path: String }, - #[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))] + #[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))] ReadPersist { path: String, source: GenericError }, - #[snafu(display("Failed to decode record batch, err:{}", source))] + #[snafu(display("Failed to decode record batch, err:{source}"))] DecodeRecordBatch { source: GenericError }, #[snafu(display( - "Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}", - file_path, - source, - backtrace + "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", ))] FetchAndDecodeSstMeta { file_path: String, @@ -36,43 +33,52 @@ pub mod error { backtrace: Backtrace, }, - #[snafu(display("Failed to decode sst meta data, err:{}", source))] + #[snafu(display( + "Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + ))] + DecodePageIndexes { + file_path: String, + source: parquet::errors::ParquetError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode sst meta data, err:{source}"))] DecodeSstMeta { source: GenericError }, - #[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))] SstMetaNotFound { backtrace: Backtrace }, - #[snafu(display("Fail to projection, err:{}", source))] + #[snafu(display("Fail to projection, err:{source}"))] Projection { source: GenericError }, - #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] EmptySstMeta { backtrace: Backtrace }, - #[snafu(display("Invalid schema, err:{}", source))] + #[snafu(display("Invalid schema, err:{source}"))] InvalidSchema { source: common_types::schema::Error }, - #[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))] DataFusionError { source: datafusion::error::DataFusionError, backtrace: Backtrace, }, - #[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] ObjectStoreError { source: object_store::ObjectStoreError, backtrace: Backtrace, }, - #[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))] ParquetError { source: parquet::errors::ParquetError, backtrace: Backtrace, }, - #[snafu(display("Other kind of error:{}", source))] + #[snafu(display("Other kind of error:{source}"))] Other { source: GenericError }, - #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))] OtherNoCause { msg: String, backtrace: Backtrace }, } diff --git a/components/message_queue/Cargo.toml b/components/message_queue/Cargo.toml index e820d49bca..4766330828 100644 --- a/components/message_queue/Cargo.toml +++ b/components/message_queue/Cargo.toml @@ -16,8 +16,8 @@ snafu = { workspace = true } tokio = { workspace = true } [dependencies.rskafka] -git = "https://github.com/influxdata/rskafka.git" -rev = "00988a564b1db0249d858065fc110476c075efad" +git = "https://github.com/Rachelint/rskafka.git" +rev = "f0fd8e278d8164cb0cfca5a80476361fc308ecc3" default-features = false features = ["compression-gzip", "compression-lz4", "compression-snappy"] diff --git a/components/message_queue/src/kafka/config.rs b/components/message_queue/src/kafka/config.rs index 880b7f4f94..24629442ed 100644 --- a/components/message_queue/src/kafka/config.rs +++ b/components/message_queue/src/kafka/config.rs @@ -2,20 +2,39 @@ //! Kafka implementation's config +use common_util::config::ReadableDuration; use serde::{Deserialize, Serialize}; /// Generic client config that is used for consumers, producers as well as admin /// operations (like "create topic"). -#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub client: ClientConfig, pub topic_management: TopicManagementConfig, pub consumer: ConsumerConfig, + pub retry_interval_factor: f64, + pub init_retry_interval: ReadableDuration, + pub max_retry_interval: ReadableDuration, + pub max_retry: usize, // TODO: may need some config options for producer, // but it seems nothing needed now. } +impl Default for Config { + fn default() -> Self { + Self { + client: Default::default(), + topic_management: Default::default(), + consumer: Default::default(), + retry_interval_factor: 2.0, + init_retry_interval: ReadableDuration::secs(1), + max_retry_interval: ReadableDuration::secs(10), + max_retry: 10, + } + } +} + #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct ClientConfig { diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index 4937d2efee..0d69d5e382 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -21,6 +21,7 @@ use rskafka::{ Client, ClientBuilder, }, record::{Record, RecordAndOffset}, + BackoffConfig, }; use snafu::{Backtrace, ResultExt, Snafu}; use tokio::sync::RwLock; @@ -141,7 +142,14 @@ impl KafkaImplInner { panic!("The boost broker must be set"); } - let mut client_builder = ClientBuilder::new(config.client.boost_brokers.clone().unwrap()); + let backoff_config = BackoffConfig { + init_backoff: config.init_retry_interval.0, + max_backoff: config.max_retry_interval.0, + base: config.retry_interval_factor, + max_retry: config.max_retry, + }; + let mut client_builder = ClientBuilder::new(config.client.boost_brokers.clone().unwrap()) + .backoff_config(backoff_config); if let Some(max_message_size) = config.client.max_message_size { client_builder = client_builder.max_message_size(max_message_size); } diff --git a/components/parquet_ext/Cargo.toml b/components/parquet_ext/Cargo.toml index 1b4b4b23c6..ba31703d18 100644 --- a/components/parquet_ext/Cargo.toml +++ b/components/parquet_ext/Cargo.toml @@ -17,6 +17,8 @@ async-trait = { workspace = true } bytes = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +futures = { workspace = true } log = { workspace = true } +object_store = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index cd413c0afc..7264b38dd6 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -2,6 +2,7 @@ pub mod meta_data; pub mod prune; +pub mod reader; pub mod reverse_reader; #[cfg(test)] pub mod tests; diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index e796244c16..1a95bb4f7f 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -1,15 +1,18 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::ops::Range; +use std::{ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; use common_util::error::GenericResult; use parquet::{ + arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; +use crate::reader::ObjectStoreReader; + #[async_trait] pub trait ChunkReader: Sync + Send { async fn get_bytes(&self, range: Range) -> GenericResult; @@ -65,3 +68,21 @@ pub async fn fetch_parquet_metadata( footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len)) } + +/// Build page indexes for meta data +/// +/// TODO: Currently there is no method to build page indexes for meta data in +/// `parquet`, maybe we can write a issue in `arrow-rs` . +pub async fn meta_with_page_indexes( + object_store_reader: ObjectStoreReader, +) -> Result> { + let read_options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options) + .await + .map_err(|e| { + let err_msg = format!("failed to build page indexes in metadata, err:{e}"); + ParquetError::General(err_msg) + })?; + Ok(builder.metadata().clone()) +} diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs new file mode 100644 index 0000000000..3a5cd5f170 --- /dev/null +++ b/components/parquet_ext/src/reader.rs @@ -0,0 +1,81 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{ops::Range, sync::Arc, time::Instant}; + +use bytes::Bytes; +use futures::{ + future::{BoxFuture, FutureExt}, + TryFutureExt, +}; +use log::debug; +use object_store::{ObjectStoreRef, Path}; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; + +/// Implemention AsyncFileReader based on `ObjectStore` +/// +/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to +/// keep the crate `parquet_ext` more pure. +#[derive(Clone)] +pub struct ObjectStoreReader { + storage: ObjectStoreRef, + path: Path, + meta_data: Arc, + begin: Instant, +} + +impl ObjectStoreReader { + pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { + Self { + storage, + path, + meta_data, + begin: Instant::now(), + } + } +} + +impl Drop for ObjectStoreReader { + fn drop(&mut self) { + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); + } +} + +impl AsyncFileReader for ObjectStoreReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{e}" + )) + }) + .await + } + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { Ok(self.meta_data.clone()) }) + } +} diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 9eb81422bb..16ae26ab94 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -13,7 +13,7 @@ use common_util::{ }; use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; -use parquet_ext::meta_data::fetch_parquet_metadata; +use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] @@ -30,6 +30,10 @@ struct Args { /// Thread num, 0 means cpu num #[clap(short, long, default_value_t = 0)] threads: usize, + + /// Print page indexes + #[clap(short, long, required(false))] + page_indexes: bool, } fn new_runtime(thread_num: usize) -> Runtime { @@ -64,6 +68,7 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; let verbose = args.verbose; + let page_indexes = args.page_indexes; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -71,7 +76,8 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size, verbose).await?; + parse_metadata(storage, location, object_meta.size, verbose, page_indexes) + .await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -133,9 +139,11 @@ async fn parse_metadata( path: Path, size: usize, verbose: bool, + page_indexes: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; + let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); let kv_size = kv_metadata .map(|kvs| { @@ -155,6 +163,15 @@ async fn parse_metadata( }) .unwrap_or(0); - let md = MetaData::try_new(&parquet_metadata, false)?; + let md = if page_indexes { + let object_store_reader = + ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); + let parquet_metadata = + parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; + MetaData::try_new(&parquet_metadata, false)? + } else { + MetaData::try_new(&parquet_metadata, false)? + }; + Ok((md, metadata_size, kv_size)) } diff --git a/wal/src/message_queue_impl/region.rs b/wal/src/message_queue_impl/region.rs index c38f87b4bb..dc156bb54c 100644 --- a/wal/src/message_queue_impl/region.rs +++ b/wal/src/message_queue_impl/region.rs @@ -579,7 +579,7 @@ impl Region { let (snapshot, synchronizer) = { let inner = self.inner.write().await; - debug!( + info!( "Mark deleted entries to sequence num:{}, region id:{}, table id:{}", sequence_num, inner.region_context.region_id(),