From b8b1f4612d189f93483d491c58f7d7c726544e6e Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 25 Apr 2024 22:49:02 +0800 Subject: [PATCH 1/7] refactor: first step to refactor error --- Cargo.lock | 2 + Cargo.toml | 1 + src/analytic_engine/Cargo.toml | 2 + .../src/instance/wal_replayer.rs | 25 +++-- .../src/memtable/columnar/iter.rs | 46 ++++---- .../src/memtable/columnar/mod.rs | 46 +++----- src/analytic_engine/src/memtable/error.rs | 47 ++++++++ .../src/memtable/layered/iter.rs | 13 +-- .../src/memtable/layered/mod.rs | 32 +++--- src/analytic_engine/src/memtable/mod.rs | 101 ++---------------- .../src/memtable/reversed_iter.rs | 8 +- .../src/memtable/skiplist/iter.rs | 37 ++++--- .../src/memtable/skiplist/mod.rs | 48 +++++---- 13 files changed, 175 insertions(+), 233 deletions(-) create mode 100644 src/analytic_engine/src/memtable/error.rs diff --git a/Cargo.lock b/Cargo.lock index c8e331b0f7..8064a066d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" name = "analytic_engine" version = "2.0.0" dependencies = [ + "anyhow", "arc-swap 1.6.0", "arena", "arrow 49.0.0", @@ -126,6 +127,7 @@ dependencies = [ "table_kv", "tempfile", "test_util", + "thiserror", "time_ext", "tokio", "trace_metric", diff --git a/Cargo.toml b/Cargo.toml index 77277e084f..4563acf5c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ async-trait = "0.1.72" atomic_enum = "0.2.0" base64 = "0.13" bytes = "1" +thiserror = "1" bytes_ext = { path = "src/components/bytes_ext" } catalog = { path = "src/catalog" } catalog_impls = { path = "src/catalog_impls" } diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml index 2773420b46..3bfd00dc00 100644 --- a/src/analytic_engine/Cargo.toml +++ b/src/analytic_engine/Cargo.toml @@ -37,6 +37,7 @@ wal-message-queue = ["wal/wal-message-queue"] wal-rocksdb = ["wal/wal-rocksdb"] [dependencies] +anyhow = { workspace = true } # In alphabetical order arc-swap = "1.4.0" arena = { workspace = true } @@ -81,6 +82,7 @@ snafu = { workspace = true } table_engine = { workspace = true } table_kv = { workspace = true } tempfile = { workspace = true, optional = true } +thiserror = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } trace_metric = { workspace = true } diff --git a/src/analytic_engine/src/instance/wal_replayer.rs b/src/analytic_engine/src/instance/wal_replayer.rs index 6a996fe996..ecf8750de6 100644 --- a/src/analytic_engine/src/instance/wal_replayer.rs +++ b/src/analytic_engine/src/instance/wal_replayer.rs @@ -50,8 +50,9 @@ use crate::{ engine::{Error, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, - write::MemTableWriter, + write::{Error as WriteError, MemTableWriter}, }, + memtable::error::ErrorKind, payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder}, table::data::TableDataRef, }; @@ -547,22 +548,20 @@ async fn replay_table_log_entries( let index_in_writer = IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - let write_res = memtable_writer - .write(sequence, row_group, index_in_writer) - .box_err() - .context(ReplayWalWithCause { - msg: Some(format!( - "table_id:{}, table_name:{}, space_id:{}", - table_data.space_id, table_data.name, table_data.id - )), - }); + let write_res = memtable_writer.write(sequence, row_group, index_in_writer); if let Err(e) = write_res { - // TODO: find a better way to match this. - if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) { + if matches!(e, WriteError::UpdateMemTableSequence { ref source } if source.kind() == ErrorKind::KeyTooLarge ) + { // ignore this error warn!("Unable to insert memtable, err:{e}"); } else { - return Err(e); + return Err(Error::ReplayWalWithCause { + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), + source: Box::new(e), + }); } } diff --git a/src/analytic_engine/src/memtable/columnar/iter.rs b/src/analytic_engine/src/memtable/columnar/iter.rs index 7abc24d2ce..e3d4c52863 100644 --- a/src/analytic_engine/src/memtable/columnar/iter.rs +++ b/src/analytic_engine/src/memtable/columnar/iter.rs @@ -23,6 +23,7 @@ use std::{ time::Instant, }; +use anyhow::Context; use arena::{Arena, BasicStats, MonoIncArena}; use bytes_ext::{ByteVec, Bytes}; use codec::{memcomparable::MemComparable, row, Encoder}; @@ -36,17 +37,16 @@ use common_types::{ schema::Schema, SequenceNumber, }; -use generic_error::BoxError; use logger::trace; use parquet::data_type::AsBytes; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use snafu::{OptionExt, ResultExt}; -use crate::memtable::{ - key, - key::{KeySequence, SequenceCodec}, - AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause, IterTimeout, - ProjectSchema, Result, ScanContext, ScanRequest, +use crate::{ + ensure, + memtable::{ + key::{self, KeySequence, SequenceCodec}, + Result, ScanContext, ScanRequest, + }, }; /// Iterator state @@ -106,7 +106,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let row_projector = request .row_projector_builder .build(&schema) - .context(ProjectSchema)?; + .context("ProjectSchema")?; let mut columnar_iter = Self { memtable, row_num, @@ -147,15 +147,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let column_schema = self.memtable_schema.column(*idx); let column = memtable .get(&column_schema.id) - .with_context(|| InternalNoCause { - msg: format!("column not found, column:{}", column_schema.name), - })?; + .with_context(|| format!("column not found, column:{}", column_schema.name))?; for (i, key) in key_vec.iter_mut().enumerate().take(self.row_num) { let datum = column.get_datum(i); - encoder - .encode(key, &datum) - .box_err() - .context(Internal { msg: "encode key" })?; + encoder.encode(key, &datum).context("encode key")?; } } @@ -163,10 +158,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { for (i, mut key) in key_vec.into_iter().enumerate() { SequenceCodec .encode(&mut key, &KeySequence::new(self.last_sequence, i as u32)) - .box_err() - .context(Internal { - msg: "encode key sequence", - })?; + .context("encode key sequence")?; self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice()); } @@ -203,9 +195,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { if !rows.is_empty() { if let Some(deadline) = self.deadline { let now = Instant::now(); - if now >= deadline { - return IterTimeout { now, deadline }.fail(); - } + ensure!( + now < deadline, + "iter timeout, now:{now:?}, deadline:{deadline:?}" + ); } let fetched_schema = self.row_projector.fetched_schema().clone(); @@ -219,10 +212,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { self.batch_size, ); for row in rows.into_iter() { - builder.append_row(row).context(AppendRow)?; + builder.append_row(row).context("AppendRow")?; } - let batch = builder.build().context(BuildRecordBatch)?; + let batch = builder.build().context("BuildRecordBatch")?; trace!("column iterator send one batch:{:?}", batch); Ok(Some(batch)) } else { @@ -245,7 +238,8 @@ impl + Clone + Sync + Send> ColumnarIterImpl { while self.iter.valid() { // Fetch current entry let key = self.iter.key(); - let (user_key, _) = key::user_key_from_internal_key(key).context(DecodeInternalKey)?; + let (user_key, _) = + key::user_key_from_internal_key(key).context("DecodeInternalKey")?; // Check user key is still in range if self.is_after_end_bound(user_key) { @@ -262,7 +256,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // be set as last_internal_key so maybe we can just // unwrap it? let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key) - .context(DecodeInternalKey)?; + .context("DecodeInternalKey")?; user_key == last_user_key } // This is the first user key diff --git a/src/analytic_engine/src/memtable/columnar/mod.rs b/src/analytic_engine/src/memtable/columnar/mod.rs index 6cbe783fcf..5db282eb88 100644 --- a/src/analytic_engine/src/memtable/columnar/mod.rs +++ b/src/analytic_engine/src/memtable/columnar/mod.rs @@ -23,22 +23,23 @@ use std::{ }, }; +use anyhow::Context; use arena::MonoIncArena; use bytes_ext::Bytes; use common_types::{ column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema, time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::debug; use skiplist::{BytewiseComparator, Skiplist}; -use snafu::{ensure, OptionExt, ResultExt}; -use crate::memtable::{ - columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence, - reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal, InternalNoCause, - InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, - ScanRequest, +use crate::{ + ensure, + memtable::{ + columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence, + reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, + Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, + }, }; pub mod factory; @@ -108,16 +109,11 @@ impl MemTable for ColumnarMemTable { } else { // TODO: impl append() one row in column, avoid memory expansion. let column = Column::with_capacity(1, column_schema.data_type) - .box_err() - .context(Internal { - msg: "new column failed", - })?; + .context("new column failed")?; columns.insert(column_schema.id, column); columns .get_mut(&column_schema.id) - .context(InternalNoCause { - msg: "get column failed", - })? + .context("get column failed")? }; if let Some(writer_index) = ctx.index_in_writer.column_index_in_writer(i) { @@ -127,10 +123,7 @@ impl MemTable for ColumnarMemTable { } else { column .append_datum_ref(&row[writer_index]) - .box_err() - .context(Internal { - msg: "append datum failed", - })? + .context("append datum failed")? } } else { column.append_nulls(1); @@ -140,9 +133,7 @@ impl MemTable for ColumnarMemTable { let mut memtable = self.memtable.write().unwrap(); for (k, v) in columns { if let Some(column) = memtable.get_mut(&k) { - column.append_column(v).box_err().context(Internal { - msg: "append column", - })?; + column.append_column(v).context("append column")?; } else { memtable.insert(k, v); }; @@ -174,18 +165,14 @@ impl MemTable for ColumnarMemTable { .schema .columns() .get(self.schema.timestamp_index()) - .context(InternalNoCause { - msg: "timestamp column is missing", - })?; + .context("timestamp column is missing")?; let num_rows = self .memtable .read() .unwrap() .get(×tamp_column.id) - .context(InternalNoCause { - msg: "get timestamp column failed", - })? + .context("get timestamp column failed")? .len(); let (reverse, batch_size) = (request.reverse, ctx.batch_size); let arena = MonoIncArena::with_collector( @@ -219,10 +206,7 @@ impl MemTable for ColumnarMemTable { let last = self.last_sequence(); ensure!( sequence >= last, - InvalidPutSequence { - given: sequence, - last - } + "invalid sequence, given:{sequence}, last:{last}" ); self.last_sequence.store(sequence, Ordering::Relaxed); diff --git a/src/analytic_engine/src/memtable/error.rs b/src/analytic_engine/src/memtable/error.rs new file mode 100644 index 0000000000..0647696756 --- /dev/null +++ b/src/analytic_engine/src/memtable/error.rs @@ -0,0 +1,47 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +#[error(transparent)] +pub struct Error(#[from] InnerError); + +impl From for Error { + fn from(source: anyhow::Error) -> Self { + Self(InnerError::Other { source }) + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ErrorKind { + KeyTooLarge, + Internal, +} + +impl Error { + pub fn kind(&self) -> ErrorKind { + match self.0 { + InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge, + InnerError::Other { .. } => ErrorKind::Internal, + } + } +} + +#[macro_export] +macro_rules! ensure { + ($cond:expr, $msg:expr) => { + if !$cond { + return Err(anyhow::anyhow!($msg).into()); + } + }; +} + +#[derive(Error, Debug)] +pub(crate) enum InnerError { + #[error("too large key, max:{max}, current:{current}")] + KeyTooLarge { current: usize, max: usize }, + + #[error(transparent)] + Other { + #[from] + source: anyhow::Error, + }, +} diff --git a/src/analytic_engine/src/memtable/layered/iter.rs b/src/analytic_engine/src/memtable/layered/iter.rs index be32b77e02..7b051966d1 100644 --- a/src/analytic_engine/src/memtable/layered/iter.rs +++ b/src/analytic_engine/src/memtable/layered/iter.rs @@ -17,13 +17,12 @@ //! Skiplist memtable iterator +use anyhow::Context; use common_types::{record_batch::FetchedRecordBatch, schema::Schema, time::TimeRange}; -use generic_error::BoxError; -use snafu::ResultExt; use crate::memtable::{ layered::{ImmutableSegment, MutableSegment}, - ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest, + ColumnarIterPtr, Error, Result, ScanContext, ScanRequest, }; /// Columnar iterator for [LayeredMemTable] @@ -43,7 +42,7 @@ impl ColumnarIterImpl { let row_projector = request .row_projector_builder .build(memtable_schema) - .context(ProjectSchema)?; + .context("build row projector")?; let (maybe_mutable, selected_immutables) = Self::filter_by_time_range(mutable, immutables, request.time_range); @@ -63,13 +62,11 @@ impl ColumnarIterImpl { fetched_column_indexes, batch.clone(), ) - .box_err() - .with_context(|| Internal { - msg: format!("row_projector:{row_projector:?}",), - }) + .map_err(|e| Error::from(anyhow::Error::new(e))) }) }) .collect::>(); + let immutable_iter = immutable_batches.into_iter(); let maybe_mutable_iter = match maybe_mutable { diff --git a/src/analytic_engine/src/memtable/layered/mod.rs b/src/analytic_engine/src/memtable/layered/mod.rs index 35d0b4a10a..72e992bc43 100644 --- a/src/analytic_engine/src/memtable/layered/mod.rs +++ b/src/analytic_engine/src/memtable/layered/mod.rs @@ -29,6 +29,7 @@ use std::{ }, }; +use anyhow::Context; use arena::CollectorRef; use arrow::record_batch::RecordBatch as ArrowRecordBatch; use bytes_ext::Bytes; @@ -36,17 +37,15 @@ use common_types::{ projected_schema::RowProjectorBuilder, row::Row, schema::Schema, time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::debug; use skiplist::{BytewiseComparator, KeyComparator}; -use snafu::{OptionExt, ResultExt}; use crate::memtable::{ factory::{FactoryRef, Options}, key::KeySequence, layered::iter::ColumnarIterImpl, - ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics as MemtableMetrics, - PutContext, Result, ScanContext, ScanRequest, + ColumnarIterPtr, MemTable, MemTableRef, Metrics as MemtableMetrics, PutContext, Result, + ScanContext, ScanRequest, }; /// MemTable implementation based on skiplist @@ -238,15 +237,15 @@ impl Inner { .map(|batch_res| batch_res.map(|batch| batch.into_arrow_record_batch())) .collect::>>()?; - let time_range = current_mutable.time_range().context(InternalNoCause { - msg: "failed to get time range from mutable segment", - })?; - let max_key = current_mutable.max_key().context(InternalNoCause { - msg: "failed to get max key from mutable segment", - })?; - let min_key = current_mutable.min_key().context(InternalNoCause { - msg: "failed to get min key from mutable segment", - })?; + let time_range = current_mutable + .time_range() + .context("failed to get time range from mutable segment")?; + let max_key = current_mutable + .max_key() + .context("failed to get max key from mutable segment")?; + let min_key = current_mutable + .min_key() + .context("failed to get min key from mutable segment")?; let immutable = ImmutableSegment::new(immutable_batches, time_range, min_key, max_key); self.immutable_segments.push(immutable); @@ -388,10 +387,7 @@ impl MutableSegmentBuilder { let memtable = self .memtable_factory .create_memtable(memtable_opts) - .box_err() - .context(Internal { - msg: "failed to build mutable segment", - })?; + .context("failed to build mutable segment")?; Ok(MutableSegment(memtable)) } @@ -409,7 +405,7 @@ struct MutableBuilderOptions { /// Immutable batch pub(crate) struct ImmutableSegment { - /// Record batch converted from `MutableBatch` + /// Record batch converted from `MutableBatch` record_batches: Vec, /// Min time of source `MutableBatch` diff --git a/src/analytic_engine/src/memtable/mod.rs b/src/analytic_engine/src/memtable/mod.rs index 7df963a2b0..7ef7876cef 100644 --- a/src/analytic_engine/src/memtable/mod.rs +++ b/src/analytic_engine/src/memtable/mod.rs @@ -18,6 +18,7 @@ //! MemTable pub mod columnar; +pub mod error; pub mod factory; pub mod key; pub mod layered; @@ -27,6 +28,7 @@ pub mod test_util; use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant}; +use anyhow::Context; use bytes_ext::{ByteVec, Bytes}; use common_types::{ projected_schema::RowProjectorBuilder, @@ -36,12 +38,11 @@ use common_types::{ time::TimeRange, SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD, }; -use generic_error::{BoxError, GenericError}; +pub use error::{Error, ErrorKind}; use horaedbproto::manifest; use macros::define_result; use serde::{Deserialize, Serialize}; use size_ext::ReadableSize; -use snafu::{Backtrace, ResultExt, Snafu}; use trace_metric::MetricsCollector; use crate::memtable::key::KeySequence; @@ -96,9 +97,9 @@ impl LayeredMemtableOptions { pub fn parse_from(opts: &HashMap) -> Result { let mut options = LayeredMemtableOptions::default(); if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) { - let threshold = v.parse::().box_err().context(Internal { - msg: format!("invalid mutable segment switch threshold:{v}"), - })?; + let threshold = v + .parse::() + .with_context(|| format!("invalid mutable segment switch threshold:{v}"))?; options.mutable_segment_switch_threshold = ReadableSize(threshold); } @@ -122,95 +123,7 @@ impl From for manifest::LayeredMemtableOptions { } } -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub enum Error { - #[snafu(display("Failed to encode internal key, err:{}", source))] - EncodeInternalKey { source: crate::memtable::key::Error }, - - #[snafu(display("Failed to decode internal key, err:{}", source))] - DecodeInternalKey { source: crate::memtable::key::Error }, - - #[snafu(display("Failed to decode row, err:{}", source))] - DecodeRow { source: codec::row::Error }, - - #[snafu(display("Failed to append row to batch builder, err:{}", source))] - AppendRow { - source: common_types::record_batch::Error, - }, - - #[snafu(display("Failed to build record batch, err:{}", source,))] - BuildRecordBatch { - source: common_types::record_batch::Error, - }, - - #[snafu(display("Failed to decode continuous row, err:{}", source))] - DecodeContinuousRow { - source: common_types::row::contiguous::Error, - }, - - #[snafu(display("Failed to project memtable schema, err:{}", source))] - ProjectSchema { - source: common_types::projected_schema::Error, - }, - - #[snafu(display( - "Invalid sequence number to put, given:{}, last:{}.\nBacktrace:\n{}", - given, - last, - backtrace - ))] - InvalidPutSequence { - given: SequenceNumber, - last: SequenceNumber, - backtrace: Backtrace, - }, - - #[snafu(display("Invalid row, err:{}", source))] - InvalidRow { source: GenericError }, - - #[snafu(display("Fail to iter in reverse order, err:{}", source))] - IterReverse { source: GenericError }, - - #[snafu(display( - "Timeout when iter memtable, now:{:?}, deadline:{:?}.\nBacktrace:\n{}", - now, - deadline, - backtrace - ))] - IterTimeout { - now: Instant, - deadline: Instant, - backtrace: Backtrace, - }, - - #[snafu(display("msg:{msg}, err:{source}"))] - Internal { msg: String, source: GenericError }, - - #[snafu(display("msg:{msg}"))] - InternalNoCause { msg: String }, - - #[snafu(display("Timestamp is not found in row.\nBacktrace:\n{backtrace}"))] - TimestampNotFound { backtrace: Backtrace }, - - #[snafu(display( - "{TOO_LARGE_MESSAGE}, current:{current}, max:{max}.\nBacktrace:\n{backtrace}" - ))] - KeyTooLarge { - current: usize, - max: usize, - backtrace: Backtrace, - }, - #[snafu(display("Factory err, msg:{msg}, err:{source}"))] - Factory { msg: String, source: GenericError }, - - #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))] - FactoryNoCause { msg: String, backtrace: Backtrace }, -} - -pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large"; - -define_result!(Error); +define_result!(error::Error); /// Options for put and context for tracing pub struct PutContext { diff --git a/src/analytic_engine/src/memtable/reversed_iter.rs b/src/analytic_engine/src/memtable/reversed_iter.rs index 10d424ddf7..e7052f422f 100644 --- a/src/analytic_engine/src/memtable/reversed_iter.rs +++ b/src/analytic_engine/src/memtable/reversed_iter.rs @@ -18,10 +18,9 @@ use std::iter::Rev; use common_types::record_batch::FetchedRecordBatch; -use generic_error::BoxError; -use snafu::ResultExt; -use crate::memtable::{IterReverse, Result}; +use super::Error; +use crate::memtable::Result; /// Reversed columnar iterator. // TODO(xikai): Now the implementation is not perfect: read all the entries @@ -74,8 +73,7 @@ where Ok(mut batch_with_key) => { batch_with_key .reverse_data() - .box_err() - .context(IterReverse)?; + .map_err(|e| Error::from(anyhow::Error::new(e)))?; Ok(batch_with_key) } diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs b/src/analytic_engine/src/memtable/skiplist/iter.rs index cce3913dea..15fe8d3d69 100644 --- a/src/analytic_engine/src/memtable/skiplist/iter.rs +++ b/src/analytic_engine/src/memtable/skiplist/iter.rs @@ -19,6 +19,7 @@ use std::{cmp::Ordering, ops::Bound, time::Instant}; +use anyhow::Context; use arena::{Arena, BasicStats}; use bytes_ext::{Bytes, BytesMut}; use codec::row; @@ -31,13 +32,14 @@ use common_types::{ }; use logger::trace; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use snafu::ResultExt; -use crate::memtable::{ - key::{self, KeySequence}, - skiplist::SkiplistMemTable, - AppendRow, BuildRecordBatch, DecodeContinuousRow, DecodeInternalKey, EncodeInternalKey, - IterTimeout, ProjectSchema, Result, ScanContext, ScanRequest, +use crate::{ + ensure, + memtable::{ + key::{self, KeySequence}, + skiplist::SkiplistMemTable, + Result, ScanContext, ScanRequest, + }, }; /// Iterator state @@ -91,7 +93,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let row_projector = request .row_projector_builder .build(&memtable.schema) - .context(ProjectSchema)?; + .context("build projector")?; let iter = memtable.skiplist.iter(); let mut columnar_iter = Self { @@ -122,7 +124,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // Construct seek key let mut key_buf = BytesMut::new(); let seek_key = key::internal_key_for_seek(user_key, self.sequence, &mut key_buf) - .context(EncodeInternalKey)?; + .context("encode internal key")?; // Seek the skiplist self.iter.seek(seek_key); @@ -135,7 +137,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let mut key_buf = BytesMut::new(); let seek_key = key::internal_key_for_seek(&next_user_key, self.sequence, &mut key_buf) - .context(EncodeInternalKey)?; + .context("encode internal key")?; // Seek the skiplist self.iter.seek(seek_key); @@ -168,14 +170,14 @@ impl + Clone + Sync + Send> ColumnarIterImpl { while self.iter.valid() && num_rows < self.batch_size { if let Some(row) = self.fetch_next_row()? { let row_reader = ContiguousRowReader::try_new(&row, &self.memtable_schema) - .context(DecodeContinuousRow)?; + .context("decode continuous row")?; let projected_row = ProjectedContiguousRow::new(row_reader, &self.row_projector); trace!("Column iterator fetch next row, row:{:?}", projected_row); builder .append_projected_contiguous_row(&projected_row) - .context(AppendRow)?; + .context("append row")?; num_rows += 1; } else { // There is no more row to fetch @@ -191,12 +193,13 @@ impl + Clone + Sync + Send> ColumnarIterImpl { if num_rows > 0 { if let Some(deadline) = self.deadline { let now = Instant::now(); - if now >= deadline { - return IterTimeout { now, deadline }.fail(); - } + ensure!( + now < deadline, + "iter timeout, now:{now:?}, deadline:{deadline:?}" + ) } - let batch = builder.build().context(BuildRecordBatch)?; + let batch = builder.build().context("build record batch")?; trace!("column iterator send one batch:{:?}", batch); Ok(Some(batch)) @@ -221,7 +224,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // Fetch current entry let key = self.iter.key(); let (user_key, sequence) = - key::user_key_from_internal_key(key).context(DecodeInternalKey)?; + key::user_key_from_internal_key(key).context("DecodeInternalKey")?; // Check user key is still in range if self.is_after_end_bound(user_key) { @@ -238,7 +241,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // be set as last_internal_key so maybe we can just // unwrap it? let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key) - .context(DecodeInternalKey)?; + .context("DecodeInternalKey")?; user_key == last_user_key } // This is the first user key diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs b/src/analytic_engine/src/memtable/skiplist/mod.rs index baa4d9c2bd..c234e91ab9 100644 --- a/src/analytic_engine/src/memtable/skiplist/mod.rs +++ b/src/analytic_engine/src/memtable/skiplist/mod.rs @@ -22,6 +22,7 @@ pub mod iter; use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize}; +use anyhow::{anyhow, Context}; use arena::{Arena, BasicStats}; use bytes_ext::Bytes; use codec::Encoder; @@ -31,17 +32,19 @@ use common_types::{ time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::{debug, trace}; use skiplist::{BytewiseComparator, Skiplist}; -use snafu::{ensure, OptionExt, ResultExt}; - -use crate::memtable::{ - key::{ComparableInternalKey, KeySequence}, - reversed_iter::ReversedColumnarIterator, - skiplist::iter::ColumnarIterImpl, - ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, KeyTooLarge, MemTable, - Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, TimestampNotFound, + +use crate::{ + ensure, + memtable::{ + error::InnerError, + key::{ComparableInternalKey, KeySequence}, + reversed_iter::ReversedColumnarIterator, + skiplist::iter::ColumnarIterImpl, + ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, + ScanRequest, + }, }; #[derive(Default, Debug)] @@ -143,27 +146,30 @@ impl + Clone + Sync + Send + 'static> MemTable // Encode key key_encoder .encode(internal_key, row) - .context(EncodeInternalKey)?; + .context("encode interval key")?; // TODO: we should check row's primary key size at the beginning of write // process, so WAL and memtable can keep in sync. - ensure!( - internal_key.len() <= skiplist::MAX_KEY_SIZE as usize, - KeyTooLarge { + if internal_key.len() > skiplist::MAX_KEY_SIZE as usize { + return Err(InnerError::KeyTooLarge { current: internal_key.len(), - max: skiplist::MAX_KEY_SIZE, + max: skiplist::MAX_KEY_SIZE as usize, } - ); + .into()); + } // Encode row value. The ContiguousRowWriter will clear the buf. let row_value = &mut ctx.value_buf; let mut row_writer = ContiguousRowWriter::new(row_value, schema, &ctx.index_in_writer); - row_writer.write_row(row).box_err().context(InvalidRow)?; + row_writer.write_row(row).context("invalid row")?; let encoded_size = internal_key.len() + row_value.len(); self.skiplist.put(internal_key, row_value); // Update min/max time - let timestamp = row.timestamp(schema).context(TimestampNotFound)?.as_i64(); + let timestamp = row + .timestamp(schema) + .ok_or(anyhow!("timestamp not found"))? + .as_i64(); _ = self .min_time .fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| { @@ -230,11 +236,11 @@ impl + Clone + Sync + Send + 'static> MemTable let last = self.last_sequence(); ensure!( sequence >= last, - InvalidPutSequence { - given: sequence, - last - } + "invalid sequence, given:{sequence}, last:{last}" ); + // if sequence < last { + // return Err(anyhow!("invalid sequence, given:{sequence}, + // last:{last}").into()); } self.last_sequence .store(sequence, atomic::Ordering::Relaxed); From f8ce06883a6681251d585d539ccc4ff4645c1e3b Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 25 Apr 2024 23:00:32 +0800 Subject: [PATCH 2/7] fix asf header --- src/analytic_engine/src/memtable/error.rs | 17 +++++++++++++++++ .../src/memtable/reversed_iter.rs | 3 +-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/analytic_engine/src/memtable/error.rs b/src/analytic_engine/src/memtable/error.rs index 0647696756..cae0e84a9d 100644 --- a/src/analytic_engine/src/memtable/error.rs +++ b/src/analytic_engine/src/memtable/error.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use thiserror::Error; #[derive(Debug, Error)] diff --git a/src/analytic_engine/src/memtable/reversed_iter.rs b/src/analytic_engine/src/memtable/reversed_iter.rs index e7052f422f..14b3851f43 100644 --- a/src/analytic_engine/src/memtable/reversed_iter.rs +++ b/src/analytic_engine/src/memtable/reversed_iter.rs @@ -19,8 +19,7 @@ use std::iter::Rev; use common_types::record_batch::FetchedRecordBatch; -use super::Error; -use crate::memtable::Result; +use crate::memtable::{Error, Result}; /// Reversed columnar iterator. // TODO(xikai): Now the implementation is not perfect: read all the entries From fc446e75b32e0f6994b8d270fee66d1aadb5c2c6 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 25 Apr 2024 23:05:05 +0800 Subject: [PATCH 3/7] remove ok_or --- src/analytic_engine/src/memtable/skiplist/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs b/src/analytic_engine/src/memtable/skiplist/mod.rs index c234e91ab9..b72250ba08 100644 --- a/src/analytic_engine/src/memtable/skiplist/mod.rs +++ b/src/analytic_engine/src/memtable/skiplist/mod.rs @@ -22,7 +22,7 @@ pub mod iter; use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use arena::{Arena, BasicStats}; use bytes_ext::Bytes; use codec::Encoder; @@ -168,7 +168,7 @@ impl + Clone + Sync + Send + 'static> MemTable // Update min/max time let timestamp = row .timestamp(schema) - .ok_or(anyhow!("timestamp not found"))? + .context("timestamp not found")? .as_i64(); _ = self .min_time From 123a63b044609896a143201842f162ae52091ccb Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 25 Apr 2024 23:10:22 +0800 Subject: [PATCH 4/7] remove comments --- src/analytic_engine/src/memtable/skiplist/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs b/src/analytic_engine/src/memtable/skiplist/mod.rs index b72250ba08..7c57163fef 100644 --- a/src/analytic_engine/src/memtable/skiplist/mod.rs +++ b/src/analytic_engine/src/memtable/skiplist/mod.rs @@ -238,9 +238,6 @@ impl + Clone + Sync + Send + 'static> MemTable sequence >= last, "invalid sequence, given:{sequence}, last:{last}" ); - // if sequence < last { - // return Err(anyhow!("invalid sequence, given:{sequence}, - // last:{last}").into()); } self.last_sequence .store(sequence, atomic::Ordering::Relaxed); From 04ea8dff08d89ade06d689527e203d81e9828845 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Apr 2024 08:20:01 +0800 Subject: [PATCH 5/7] improve ensure --- .../src/memtable/columnar/iter.rs | 10 +++---- .../src/memtable/columnar/mod.rs | 12 ++++----- src/analytic_engine/src/memtable/error.rs | 9 ------- .../src/memtable/skiplist/iter.rs | 12 ++++----- .../src/memtable/skiplist/mod.rs | 26 +++++++++---------- src/components/macros/src/lib.rs | 22 ++++++++++++++++ 6 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/analytic_engine/src/memtable/columnar/iter.rs b/src/analytic_engine/src/memtable/columnar/iter.rs index e3d4c52863..5400fbd993 100644 --- a/src/analytic_engine/src/memtable/columnar/iter.rs +++ b/src/analytic_engine/src/memtable/columnar/iter.rs @@ -38,15 +38,13 @@ use common_types::{ SequenceNumber, }; use logger::trace; +use macros::ensure; use parquet::data_type::AsBytes; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use crate::{ - ensure, - memtable::{ - key::{self, KeySequence, SequenceCodec}, - Result, ScanContext, ScanRequest, - }, +use crate::memtable::{ + key::{self, KeySequence, SequenceCodec}, + Result, ScanContext, ScanRequest, }; /// Iterator state diff --git a/src/analytic_engine/src/memtable/columnar/mod.rs b/src/analytic_engine/src/memtable/columnar/mod.rs index 5db282eb88..33bd86bdff 100644 --- a/src/analytic_engine/src/memtable/columnar/mod.rs +++ b/src/analytic_engine/src/memtable/columnar/mod.rs @@ -31,15 +31,13 @@ use common_types::{ time::TimeRange, SequenceNumber, }; use logger::debug; +use macros::ensure; use skiplist::{BytewiseComparator, Skiplist}; -use crate::{ - ensure, - memtable::{ - columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence, - reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, - Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, - }, +use crate::memtable::{ + columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence, + reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, + PutContext, Result, ScanContext, ScanRequest, }; pub mod factory; diff --git a/src/analytic_engine/src/memtable/error.rs b/src/analytic_engine/src/memtable/error.rs index cae0e84a9d..4697984df2 100644 --- a/src/analytic_engine/src/memtable/error.rs +++ b/src/analytic_engine/src/memtable/error.rs @@ -42,15 +42,6 @@ impl Error { } } -#[macro_export] -macro_rules! ensure { - ($cond:expr, $msg:expr) => { - if !$cond { - return Err(anyhow::anyhow!($msg).into()); - } - }; -} - #[derive(Error, Debug)] pub(crate) enum InnerError { #[error("too large key, max:{max}, current:{current}")] diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs b/src/analytic_engine/src/memtable/skiplist/iter.rs index 15fe8d3d69..ab9b59da03 100644 --- a/src/analytic_engine/src/memtable/skiplist/iter.rs +++ b/src/analytic_engine/src/memtable/skiplist/iter.rs @@ -31,15 +31,13 @@ use common_types::{ SequenceNumber, }; use logger::trace; +use macros::ensure; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use crate::{ - ensure, - memtable::{ - key::{self, KeySequence}, - skiplist::SkiplistMemTable, - Result, ScanContext, ScanRequest, - }, +use crate::memtable::{ + key::{self, KeySequence}, + skiplist::SkiplistMemTable, + Result, ScanContext, ScanRequest, }; /// Iterator state diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs b/src/analytic_engine/src/memtable/skiplist/mod.rs index 7c57163fef..ae5b37f89a 100644 --- a/src/analytic_engine/src/memtable/skiplist/mod.rs +++ b/src/analytic_engine/src/memtable/skiplist/mod.rs @@ -33,18 +33,16 @@ use common_types::{ SequenceNumber, }; use logger::{debug, trace}; +use macros::ensure; use skiplist::{BytewiseComparator, Skiplist}; -use crate::{ - ensure, - memtable::{ - error::InnerError, - key::{ComparableInternalKey, KeySequence}, - reversed_iter::ReversedColumnarIterator, - skiplist::iter::ColumnarIterImpl, - ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, - ScanRequest, - }, +use crate::memtable::{ + error::InnerError, + key::{ComparableInternalKey, KeySequence}, + reversed_iter::ReversedColumnarIterator, + skiplist::iter::ColumnarIterImpl, + ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, + ScanRequest, }; #[derive(Default, Debug)] @@ -150,13 +148,13 @@ impl + Clone + Sync + Send + 'static> MemTable // TODO: we should check row's primary key size at the beginning of write // process, so WAL and memtable can keep in sync. - if internal_key.len() > skiplist::MAX_KEY_SIZE as usize { - return Err(InnerError::KeyTooLarge { + ensure!( + internal_key.len() <= skiplist::MAX_KEY_SIZE as usize, + InnerError::KeyTooLarge { current: internal_key.len(), max: skiplist::MAX_KEY_SIZE as usize, } - .into()); - } + ); // Encode row value. The ContiguousRowWriter will clear the buf. let row_value = &mut ctx.value_buf; diff --git a/src/components/macros/src/lib.rs b/src/components/macros/src/lib.rs index f3e134ae99..c627f15199 100644 --- a/src/components/macros/src/lib.rs +++ b/src/components/macros/src/lib.rs @@ -38,6 +38,28 @@ macro_rules! hash_map( }; ); +/// Util for working with anyhow + thiserror +/// Works like anyhow's [ensure](https://docs.rs/anyhow/latest/anyhow/macro.ensure.html) +/// But return `Return` +#[macro_export] +macro_rules! ensure { + ($cond:expr, $msg:literal) => { + if !$cond { + return Err(anyhow::anyhow!($msg).into()); + } + }; + ($cond:expr, $err:expr) => { + if !$cond { + return Err($err.into()); + } + }; + ($cond:expr, $fmt:expr, $($arg:tt)*) => { + if !$cond { + return Err(anyhow::anyhow!($fmt, $($arg)*).into()); + } + }; +} + #[cfg(test)] mod tests { #[test] From 3510a85782666816c9a0d0443bf0392a312a74d3 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Apr 2024 11:43:55 +0800 Subject: [PATCH 6/7] move ErrorKind to crate level --- src/analytic_engine/src/error.rs | 23 +++++++++++++++++++ .../src/instance/wal_replayer.rs | 2 +- src/analytic_engine/src/lib.rs | 2 ++ src/analytic_engine/src/memtable/error.rs | 8 ++----- src/analytic_engine/src/memtable/mod.rs | 2 +- 5 files changed, 29 insertions(+), 8 deletions(-) create mode 100644 src/analytic_engine/src/error.rs diff --git a/src/analytic_engine/src/error.rs b/src/analytic_engine/src/error.rs new file mode 100644 index 0000000000..205ef1f0d0 --- /dev/null +++ b/src/analytic_engine/src/error.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Global Error type for analytic engine. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ErrorKind { + KeyTooLarge, + Internal, +} diff --git a/src/analytic_engine/src/instance/wal_replayer.rs b/src/analytic_engine/src/instance/wal_replayer.rs index ecf8750de6..ac1c1a8e5b 100644 --- a/src/analytic_engine/src/instance/wal_replayer.rs +++ b/src/analytic_engine/src/instance/wal_replayer.rs @@ -52,9 +52,9 @@ use crate::{ serial_executor::TableOpSerialExecutor, write::{Error as WriteError, MemTableWriter}, }, - memtable::error::ErrorKind, payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder}, table::data::TableDataRef, + ErrorKind, }; // Metrics of wal replayer diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs index e363e782cf..c1308d88a8 100644 --- a/src/analytic_engine/src/lib.rs +++ b/src/analytic_engine/src/lib.rs @@ -22,6 +22,7 @@ mod compaction; mod context; mod engine; +pub mod error; mod instance; mod manifest; pub mod memtable; @@ -39,6 +40,7 @@ pub mod table_meta_set_impl; #[cfg(any(test, feature = "test"))] pub mod tests; +use error::ErrorKind; use manifest::details::Options as ManifestOptions; use object_store::config::StorageOptions; use serde::{Deserialize, Serialize}; diff --git a/src/analytic_engine/src/memtable/error.rs b/src/analytic_engine/src/memtable/error.rs index 4697984df2..4389b0e41c 100644 --- a/src/analytic_engine/src/memtable/error.rs +++ b/src/analytic_engine/src/memtable/error.rs @@ -17,6 +17,8 @@ use thiserror::Error; +use crate::ErrorKind; + #[derive(Debug, Error)] #[error(transparent)] pub struct Error(#[from] InnerError); @@ -27,12 +29,6 @@ impl From for Error { } } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ErrorKind { - KeyTooLarge, - Internal, -} - impl Error { pub fn kind(&self) -> ErrorKind { match self.0 { diff --git a/src/analytic_engine/src/memtable/mod.rs b/src/analytic_engine/src/memtable/mod.rs index 7ef7876cef..f53bff149f 100644 --- a/src/analytic_engine/src/memtable/mod.rs +++ b/src/analytic_engine/src/memtable/mod.rs @@ -38,7 +38,7 @@ use common_types::{ time::TimeRange, SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD, }; -pub use error::{Error, ErrorKind}; +pub use error::Error; use horaedbproto::manifest; use macros::define_result; use serde::{Deserialize, Serialize}; From bec79e42a8db7057f2dceec8d9d2b8f824450a83 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 26 Apr 2024 15:20:20 +0800 Subject: [PATCH 7/7] Update src/analytic_engine/Cargo.toml Co-authored-by: chunshao.rcs --- src/analytic_engine/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml index 3bfd00dc00..1e02406a0c 100644 --- a/src/analytic_engine/Cargo.toml +++ b/src/analytic_engine/Cargo.toml @@ -37,8 +37,8 @@ wal-message-queue = ["wal/wal-message-queue"] wal-rocksdb = ["wal/wal-rocksdb"] [dependencies] -anyhow = { workspace = true } # In alphabetical order +anyhow = { workspace = true } arc-swap = "1.4.0" arena = { workspace = true } arrow = { workspace = true }