diff --git a/Cargo.lock b/Cargo.lock index c8e331b0f7..8064a066d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" name = "analytic_engine" version = "2.0.0" dependencies = [ + "anyhow", "arc-swap 1.6.0", "arena", "arrow 49.0.0", @@ -126,6 +127,7 @@ dependencies = [ "table_kv", "tempfile", "test_util", + "thiserror", "time_ext", "tokio", "trace_metric", diff --git a/Cargo.toml b/Cargo.toml index 77277e084f..4563acf5c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ async-trait = "0.1.72" atomic_enum = "0.2.0" base64 = "0.13" bytes = "1" +thiserror = "1" bytes_ext = { path = "src/components/bytes_ext" } catalog = { path = "src/catalog" } catalog_impls = { path = "src/catalog_impls" } diff --git a/src/analytic_engine/Cargo.toml b/src/analytic_engine/Cargo.toml index 2773420b46..1e02406a0c 100644 --- a/src/analytic_engine/Cargo.toml +++ b/src/analytic_engine/Cargo.toml @@ -38,6 +38,7 @@ wal-rocksdb = ["wal/wal-rocksdb"] [dependencies] # In alphabetical order +anyhow = { workspace = true } arc-swap = "1.4.0" arena = { workspace = true } arrow = { workspace = true } @@ -81,6 +82,7 @@ snafu = { workspace = true } table_engine = { workspace = true } table_kv = { workspace = true } tempfile = { workspace = true, optional = true } +thiserror = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } trace_metric = { workspace = true } diff --git a/src/analytic_engine/src/error.rs b/src/analytic_engine/src/error.rs new file mode 100644 index 0000000000..205ef1f0d0 --- /dev/null +++ b/src/analytic_engine/src/error.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Global Error type for analytic engine. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ErrorKind { + KeyTooLarge, + Internal, +} diff --git a/src/analytic_engine/src/instance/wal_replayer.rs b/src/analytic_engine/src/instance/wal_replayer.rs index 6a996fe996..ac1c1a8e5b 100644 --- a/src/analytic_engine/src/instance/wal_replayer.rs +++ b/src/analytic_engine/src/instance/wal_replayer.rs @@ -50,10 +50,11 @@ use crate::{ engine::{Error, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, - write::MemTableWriter, + write::{Error as WriteError, MemTableWriter}, }, payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder}, table::data::TableDataRef, + ErrorKind, }; // Metrics of wal replayer @@ -547,22 +548,20 @@ async fn replay_table_log_entries( let index_in_writer = IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - let write_res = memtable_writer - .write(sequence, row_group, index_in_writer) - .box_err() - .context(ReplayWalWithCause { - msg: Some(format!( - "table_id:{}, table_name:{}, space_id:{}", - table_data.space_id, table_data.name, table_data.id - )), - }); + let write_res = memtable_writer.write(sequence, row_group, index_in_writer); if let Err(e) = write_res { - // TODO: find a better way to match this. - if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) { + if matches!(e, WriteError::UpdateMemTableSequence { ref source } if source.kind() == ErrorKind::KeyTooLarge ) + { // ignore this error warn!("Unable to insert memtable, err:{e}"); } else { - return Err(e); + return Err(Error::ReplayWalWithCause { + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), + source: Box::new(e), + }); } } diff --git a/src/analytic_engine/src/lib.rs b/src/analytic_engine/src/lib.rs index e363e782cf..c1308d88a8 100644 --- a/src/analytic_engine/src/lib.rs +++ b/src/analytic_engine/src/lib.rs @@ -22,6 +22,7 @@ mod compaction; mod context; mod engine; +pub mod error; mod instance; mod manifest; pub mod memtable; @@ -39,6 +40,7 @@ pub mod table_meta_set_impl; #[cfg(any(test, feature = "test"))] pub mod tests; +use error::ErrorKind; use manifest::details::Options as ManifestOptions; use object_store::config::StorageOptions; use serde::{Deserialize, Serialize}; diff --git a/src/analytic_engine/src/memtable/columnar/iter.rs b/src/analytic_engine/src/memtable/columnar/iter.rs index 7abc24d2ce..5400fbd993 100644 --- a/src/analytic_engine/src/memtable/columnar/iter.rs +++ b/src/analytic_engine/src/memtable/columnar/iter.rs @@ -23,6 +23,7 @@ use std::{ time::Instant, }; +use anyhow::Context; use arena::{Arena, BasicStats, MonoIncArena}; use bytes_ext::{ByteVec, Bytes}; use codec::{memcomparable::MemComparable, row, Encoder}; @@ -36,17 +37,14 @@ use common_types::{ schema::Schema, SequenceNumber, }; -use generic_error::BoxError; use logger::trace; +use macros::ensure; use parquet::data_type::AsBytes; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use snafu::{OptionExt, ResultExt}; use crate::memtable::{ - key, - key::{KeySequence, SequenceCodec}, - AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause, IterTimeout, - ProjectSchema, Result, ScanContext, ScanRequest, + key::{self, KeySequence, SequenceCodec}, + Result, ScanContext, ScanRequest, }; /// Iterator state @@ -106,7 +104,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let row_projector = request .row_projector_builder .build(&schema) - .context(ProjectSchema)?; + .context("ProjectSchema")?; let mut columnar_iter = Self { memtable, row_num, @@ -147,15 +145,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let column_schema = self.memtable_schema.column(*idx); let column = memtable .get(&column_schema.id) - .with_context(|| InternalNoCause { - msg: format!("column not found, column:{}", column_schema.name), - })?; + .with_context(|| format!("column not found, column:{}", column_schema.name))?; for (i, key) in key_vec.iter_mut().enumerate().take(self.row_num) { let datum = column.get_datum(i); - encoder - .encode(key, &datum) - .box_err() - .context(Internal { msg: "encode key" })?; + encoder.encode(key, &datum).context("encode key")?; } } @@ -163,10 +156,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { for (i, mut key) in key_vec.into_iter().enumerate() { SequenceCodec .encode(&mut key, &KeySequence::new(self.last_sequence, i as u32)) - .box_err() - .context(Internal { - msg: "encode key sequence", - })?; + .context("encode key sequence")?; self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice()); } @@ -203,9 +193,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { if !rows.is_empty() { if let Some(deadline) = self.deadline { let now = Instant::now(); - if now >= deadline { - return IterTimeout { now, deadline }.fail(); - } + ensure!( + now < deadline, + "iter timeout, now:{now:?}, deadline:{deadline:?}" + ); } let fetched_schema = self.row_projector.fetched_schema().clone(); @@ -219,10 +210,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { self.batch_size, ); for row in rows.into_iter() { - builder.append_row(row).context(AppendRow)?; + builder.append_row(row).context("AppendRow")?; } - let batch = builder.build().context(BuildRecordBatch)?; + let batch = builder.build().context("BuildRecordBatch")?; trace!("column iterator send one batch:{:?}", batch); Ok(Some(batch)) } else { @@ -245,7 +236,8 @@ impl + Clone + Sync + Send> ColumnarIterImpl { while self.iter.valid() { // Fetch current entry let key = self.iter.key(); - let (user_key, _) = key::user_key_from_internal_key(key).context(DecodeInternalKey)?; + let (user_key, _) = + key::user_key_from_internal_key(key).context("DecodeInternalKey")?; // Check user key is still in range if self.is_after_end_bound(user_key) { @@ -262,7 +254,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // be set as last_internal_key so maybe we can just // unwrap it? let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key) - .context(DecodeInternalKey)?; + .context("DecodeInternalKey")?; user_key == last_user_key } // This is the first user key diff --git a/src/analytic_engine/src/memtable/columnar/mod.rs b/src/analytic_engine/src/memtable/columnar/mod.rs index 6cbe783fcf..33bd86bdff 100644 --- a/src/analytic_engine/src/memtable/columnar/mod.rs +++ b/src/analytic_engine/src/memtable/columnar/mod.rs @@ -23,22 +23,21 @@ use std::{ }, }; +use anyhow::Context; use arena::MonoIncArena; use bytes_ext::Bytes; use common_types::{ column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema, time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::debug; +use macros::ensure; use skiplist::{BytewiseComparator, Skiplist}; -use snafu::{ensure, OptionExt, ResultExt}; use crate::memtable::{ columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence, - reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal, InternalNoCause, - InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, - ScanRequest, + reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, + PutContext, Result, ScanContext, ScanRequest, }; pub mod factory; @@ -108,16 +107,11 @@ impl MemTable for ColumnarMemTable { } else { // TODO: impl append() one row in column, avoid memory expansion. let column = Column::with_capacity(1, column_schema.data_type) - .box_err() - .context(Internal { - msg: "new column failed", - })?; + .context("new column failed")?; columns.insert(column_schema.id, column); columns .get_mut(&column_schema.id) - .context(InternalNoCause { - msg: "get column failed", - })? + .context("get column failed")? }; if let Some(writer_index) = ctx.index_in_writer.column_index_in_writer(i) { @@ -127,10 +121,7 @@ impl MemTable for ColumnarMemTable { } else { column .append_datum_ref(&row[writer_index]) - .box_err() - .context(Internal { - msg: "append datum failed", - })? + .context("append datum failed")? } } else { column.append_nulls(1); @@ -140,9 +131,7 @@ impl MemTable for ColumnarMemTable { let mut memtable = self.memtable.write().unwrap(); for (k, v) in columns { if let Some(column) = memtable.get_mut(&k) { - column.append_column(v).box_err().context(Internal { - msg: "append column", - })?; + column.append_column(v).context("append column")?; } else { memtable.insert(k, v); }; @@ -174,18 +163,14 @@ impl MemTable for ColumnarMemTable { .schema .columns() .get(self.schema.timestamp_index()) - .context(InternalNoCause { - msg: "timestamp column is missing", - })?; + .context("timestamp column is missing")?; let num_rows = self .memtable .read() .unwrap() .get(×tamp_column.id) - .context(InternalNoCause { - msg: "get timestamp column failed", - })? + .context("get timestamp column failed")? .len(); let (reverse, batch_size) = (request.reverse, ctx.batch_size); let arena = MonoIncArena::with_collector( @@ -219,10 +204,7 @@ impl MemTable for ColumnarMemTable { let last = self.last_sequence(); ensure!( sequence >= last, - InvalidPutSequence { - given: sequence, - last - } + "invalid sequence, given:{sequence}, last:{last}" ); self.last_sequence.store(sequence, Ordering::Relaxed); diff --git a/src/analytic_engine/src/memtable/error.rs b/src/analytic_engine/src/memtable/error.rs new file mode 100644 index 0000000000..4389b0e41c --- /dev/null +++ b/src/analytic_engine/src/memtable/error.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use thiserror::Error; + +use crate::ErrorKind; + +#[derive(Debug, Error)] +#[error(transparent)] +pub struct Error(#[from] InnerError); + +impl From for Error { + fn from(source: anyhow::Error) -> Self { + Self(InnerError::Other { source }) + } +} + +impl Error { + pub fn kind(&self) -> ErrorKind { + match self.0 { + InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge, + InnerError::Other { .. } => ErrorKind::Internal, + } + } +} + +#[derive(Error, Debug)] +pub(crate) enum InnerError { + #[error("too large key, max:{max}, current:{current}")] + KeyTooLarge { current: usize, max: usize }, + + #[error(transparent)] + Other { + #[from] + source: anyhow::Error, + }, +} diff --git a/src/analytic_engine/src/memtable/layered/iter.rs b/src/analytic_engine/src/memtable/layered/iter.rs index be32b77e02..7b051966d1 100644 --- a/src/analytic_engine/src/memtable/layered/iter.rs +++ b/src/analytic_engine/src/memtable/layered/iter.rs @@ -17,13 +17,12 @@ //! Skiplist memtable iterator +use anyhow::Context; use common_types::{record_batch::FetchedRecordBatch, schema::Schema, time::TimeRange}; -use generic_error::BoxError; -use snafu::ResultExt; use crate::memtable::{ layered::{ImmutableSegment, MutableSegment}, - ColumnarIterPtr, Internal, ProjectSchema, Result, ScanContext, ScanRequest, + ColumnarIterPtr, Error, Result, ScanContext, ScanRequest, }; /// Columnar iterator for [LayeredMemTable] @@ -43,7 +42,7 @@ impl ColumnarIterImpl { let row_projector = request .row_projector_builder .build(memtable_schema) - .context(ProjectSchema)?; + .context("build row projector")?; let (maybe_mutable, selected_immutables) = Self::filter_by_time_range(mutable, immutables, request.time_range); @@ -63,13 +62,11 @@ impl ColumnarIterImpl { fetched_column_indexes, batch.clone(), ) - .box_err() - .with_context(|| Internal { - msg: format!("row_projector:{row_projector:?}",), - }) + .map_err(|e| Error::from(anyhow::Error::new(e))) }) }) .collect::>(); + let immutable_iter = immutable_batches.into_iter(); let maybe_mutable_iter = match maybe_mutable { diff --git a/src/analytic_engine/src/memtable/layered/mod.rs b/src/analytic_engine/src/memtable/layered/mod.rs index 35d0b4a10a..72e992bc43 100644 --- a/src/analytic_engine/src/memtable/layered/mod.rs +++ b/src/analytic_engine/src/memtable/layered/mod.rs @@ -29,6 +29,7 @@ use std::{ }, }; +use anyhow::Context; use arena::CollectorRef; use arrow::record_batch::RecordBatch as ArrowRecordBatch; use bytes_ext::Bytes; @@ -36,17 +37,15 @@ use common_types::{ projected_schema::RowProjectorBuilder, row::Row, schema::Schema, time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::debug; use skiplist::{BytewiseComparator, KeyComparator}; -use snafu::{OptionExt, ResultExt}; use crate::memtable::{ factory::{FactoryRef, Options}, key::KeySequence, layered::iter::ColumnarIterImpl, - ColumnarIterPtr, Internal, InternalNoCause, MemTable, MemTableRef, Metrics as MemtableMetrics, - PutContext, Result, ScanContext, ScanRequest, + ColumnarIterPtr, MemTable, MemTableRef, Metrics as MemtableMetrics, PutContext, Result, + ScanContext, ScanRequest, }; /// MemTable implementation based on skiplist @@ -238,15 +237,15 @@ impl Inner { .map(|batch_res| batch_res.map(|batch| batch.into_arrow_record_batch())) .collect::>>()?; - let time_range = current_mutable.time_range().context(InternalNoCause { - msg: "failed to get time range from mutable segment", - })?; - let max_key = current_mutable.max_key().context(InternalNoCause { - msg: "failed to get max key from mutable segment", - })?; - let min_key = current_mutable.min_key().context(InternalNoCause { - msg: "failed to get min key from mutable segment", - })?; + let time_range = current_mutable + .time_range() + .context("failed to get time range from mutable segment")?; + let max_key = current_mutable + .max_key() + .context("failed to get max key from mutable segment")?; + let min_key = current_mutable + .min_key() + .context("failed to get min key from mutable segment")?; let immutable = ImmutableSegment::new(immutable_batches, time_range, min_key, max_key); self.immutable_segments.push(immutable); @@ -388,10 +387,7 @@ impl MutableSegmentBuilder { let memtable = self .memtable_factory .create_memtable(memtable_opts) - .box_err() - .context(Internal { - msg: "failed to build mutable segment", - })?; + .context("failed to build mutable segment")?; Ok(MutableSegment(memtable)) } @@ -409,7 +405,7 @@ struct MutableBuilderOptions { /// Immutable batch pub(crate) struct ImmutableSegment { - /// Record batch converted from `MutableBatch` + /// Record batch converted from `MutableBatch` record_batches: Vec, /// Min time of source `MutableBatch` diff --git a/src/analytic_engine/src/memtable/mod.rs b/src/analytic_engine/src/memtable/mod.rs index 7df963a2b0..f53bff149f 100644 --- a/src/analytic_engine/src/memtable/mod.rs +++ b/src/analytic_engine/src/memtable/mod.rs @@ -18,6 +18,7 @@ //! MemTable pub mod columnar; +pub mod error; pub mod factory; pub mod key; pub mod layered; @@ -27,6 +28,7 @@ pub mod test_util; use std::{collections::HashMap, ops::Bound, sync::Arc, time::Instant}; +use anyhow::Context; use bytes_ext::{ByteVec, Bytes}; use common_types::{ projected_schema::RowProjectorBuilder, @@ -36,12 +38,11 @@ use common_types::{ time::TimeRange, SequenceNumber, MUTABLE_SEGMENT_SWITCH_THRESHOLD, }; -use generic_error::{BoxError, GenericError}; +pub use error::Error; use horaedbproto::manifest; use macros::define_result; use serde::{Deserialize, Serialize}; use size_ext::ReadableSize; -use snafu::{Backtrace, ResultExt, Snafu}; use trace_metric::MetricsCollector; use crate::memtable::key::KeySequence; @@ -96,9 +97,9 @@ impl LayeredMemtableOptions { pub fn parse_from(opts: &HashMap) -> Result { let mut options = LayeredMemtableOptions::default(); if let Some(v) = opts.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) { - let threshold = v.parse::().box_err().context(Internal { - msg: format!("invalid mutable segment switch threshold:{v}"), - })?; + let threshold = v + .parse::() + .with_context(|| format!("invalid mutable segment switch threshold:{v}"))?; options.mutable_segment_switch_threshold = ReadableSize(threshold); } @@ -122,95 +123,7 @@ impl From for manifest::LayeredMemtableOptions { } } -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub enum Error { - #[snafu(display("Failed to encode internal key, err:{}", source))] - EncodeInternalKey { source: crate::memtable::key::Error }, - - #[snafu(display("Failed to decode internal key, err:{}", source))] - DecodeInternalKey { source: crate::memtable::key::Error }, - - #[snafu(display("Failed to decode row, err:{}", source))] - DecodeRow { source: codec::row::Error }, - - #[snafu(display("Failed to append row to batch builder, err:{}", source))] - AppendRow { - source: common_types::record_batch::Error, - }, - - #[snafu(display("Failed to build record batch, err:{}", source,))] - BuildRecordBatch { - source: common_types::record_batch::Error, - }, - - #[snafu(display("Failed to decode continuous row, err:{}", source))] - DecodeContinuousRow { - source: common_types::row::contiguous::Error, - }, - - #[snafu(display("Failed to project memtable schema, err:{}", source))] - ProjectSchema { - source: common_types::projected_schema::Error, - }, - - #[snafu(display( - "Invalid sequence number to put, given:{}, last:{}.\nBacktrace:\n{}", - given, - last, - backtrace - ))] - InvalidPutSequence { - given: SequenceNumber, - last: SequenceNumber, - backtrace: Backtrace, - }, - - #[snafu(display("Invalid row, err:{}", source))] - InvalidRow { source: GenericError }, - - #[snafu(display("Fail to iter in reverse order, err:{}", source))] - IterReverse { source: GenericError }, - - #[snafu(display( - "Timeout when iter memtable, now:{:?}, deadline:{:?}.\nBacktrace:\n{}", - now, - deadline, - backtrace - ))] - IterTimeout { - now: Instant, - deadline: Instant, - backtrace: Backtrace, - }, - - #[snafu(display("msg:{msg}, err:{source}"))] - Internal { msg: String, source: GenericError }, - - #[snafu(display("msg:{msg}"))] - InternalNoCause { msg: String }, - - #[snafu(display("Timestamp is not found in row.\nBacktrace:\n{backtrace}"))] - TimestampNotFound { backtrace: Backtrace }, - - #[snafu(display( - "{TOO_LARGE_MESSAGE}, current:{current}, max:{max}.\nBacktrace:\n{backtrace}" - ))] - KeyTooLarge { - current: usize, - max: usize, - backtrace: Backtrace, - }, - #[snafu(display("Factory err, msg:{msg}, err:{source}"))] - Factory { msg: String, source: GenericError }, - - #[snafu(display("Factory err, msg:{msg}.\nBacktrace:\n{backtrace}"))] - FactoryNoCause { msg: String, backtrace: Backtrace }, -} - -pub const TOO_LARGE_MESSAGE: &str = "Memtable key length is too large"; - -define_result!(Error); +define_result!(error::Error); /// Options for put and context for tracing pub struct PutContext { diff --git a/src/analytic_engine/src/memtable/reversed_iter.rs b/src/analytic_engine/src/memtable/reversed_iter.rs index 10d424ddf7..14b3851f43 100644 --- a/src/analytic_engine/src/memtable/reversed_iter.rs +++ b/src/analytic_engine/src/memtable/reversed_iter.rs @@ -18,10 +18,8 @@ use std::iter::Rev; use common_types::record_batch::FetchedRecordBatch; -use generic_error::BoxError; -use snafu::ResultExt; -use crate::memtable::{IterReverse, Result}; +use crate::memtable::{Error, Result}; /// Reversed columnar iterator. // TODO(xikai): Now the implementation is not perfect: read all the entries @@ -74,8 +72,7 @@ where Ok(mut batch_with_key) => { batch_with_key .reverse_data() - .box_err() - .context(IterReverse)?; + .map_err(|e| Error::from(anyhow::Error::new(e)))?; Ok(batch_with_key) } diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs b/src/analytic_engine/src/memtable/skiplist/iter.rs index cce3913dea..ab9b59da03 100644 --- a/src/analytic_engine/src/memtable/skiplist/iter.rs +++ b/src/analytic_engine/src/memtable/skiplist/iter.rs @@ -19,6 +19,7 @@ use std::{cmp::Ordering, ops::Bound, time::Instant}; +use anyhow::Context; use arena::{Arena, BasicStats}; use bytes_ext::{Bytes, BytesMut}; use codec::row; @@ -30,14 +31,13 @@ use common_types::{ SequenceNumber, }; use logger::trace; +use macros::ensure; use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist}; -use snafu::ResultExt; use crate::memtable::{ key::{self, KeySequence}, skiplist::SkiplistMemTable, - AppendRow, BuildRecordBatch, DecodeContinuousRow, DecodeInternalKey, EncodeInternalKey, - IterTimeout, ProjectSchema, Result, ScanContext, ScanRequest, + Result, ScanContext, ScanRequest, }; /// Iterator state @@ -91,7 +91,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let row_projector = request .row_projector_builder .build(&memtable.schema) - .context(ProjectSchema)?; + .context("build projector")?; let iter = memtable.skiplist.iter(); let mut columnar_iter = Self { @@ -122,7 +122,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // Construct seek key let mut key_buf = BytesMut::new(); let seek_key = key::internal_key_for_seek(user_key, self.sequence, &mut key_buf) - .context(EncodeInternalKey)?; + .context("encode internal key")?; // Seek the skiplist self.iter.seek(seek_key); @@ -135,7 +135,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let mut key_buf = BytesMut::new(); let seek_key = key::internal_key_for_seek(&next_user_key, self.sequence, &mut key_buf) - .context(EncodeInternalKey)?; + .context("encode internal key")?; // Seek the skiplist self.iter.seek(seek_key); @@ -168,14 +168,14 @@ impl + Clone + Sync + Send> ColumnarIterImpl { while self.iter.valid() && num_rows < self.batch_size { if let Some(row) = self.fetch_next_row()? { let row_reader = ContiguousRowReader::try_new(&row, &self.memtable_schema) - .context(DecodeContinuousRow)?; + .context("decode continuous row")?; let projected_row = ProjectedContiguousRow::new(row_reader, &self.row_projector); trace!("Column iterator fetch next row, row:{:?}", projected_row); builder .append_projected_contiguous_row(&projected_row) - .context(AppendRow)?; + .context("append row")?; num_rows += 1; } else { // There is no more row to fetch @@ -191,12 +191,13 @@ impl + Clone + Sync + Send> ColumnarIterImpl { if num_rows > 0 { if let Some(deadline) = self.deadline { let now = Instant::now(); - if now >= deadline { - return IterTimeout { now, deadline }.fail(); - } + ensure!( + now < deadline, + "iter timeout, now:{now:?}, deadline:{deadline:?}" + ) } - let batch = builder.build().context(BuildRecordBatch)?; + let batch = builder.build().context("build record batch")?; trace!("column iterator send one batch:{:?}", batch); Ok(Some(batch)) @@ -221,7 +222,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // Fetch current entry let key = self.iter.key(); let (user_key, sequence) = - key::user_key_from_internal_key(key).context(DecodeInternalKey)?; + key::user_key_from_internal_key(key).context("DecodeInternalKey")?; // Check user key is still in range if self.is_after_end_bound(user_key) { @@ -238,7 +239,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { // be set as last_internal_key so maybe we can just // unwrap it? let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key) - .context(DecodeInternalKey)?; + .context("DecodeInternalKey")?; user_key == last_user_key } // This is the first user key diff --git a/src/analytic_engine/src/memtable/skiplist/mod.rs b/src/analytic_engine/src/memtable/skiplist/mod.rs index baa4d9c2bd..ae5b37f89a 100644 --- a/src/analytic_engine/src/memtable/skiplist/mod.rs +++ b/src/analytic_engine/src/memtable/skiplist/mod.rs @@ -22,6 +22,7 @@ pub mod iter; use std::sync::atomic::{self, AtomicI64, AtomicU64, AtomicUsize}; +use anyhow::Context; use arena::{Arena, BasicStats}; use bytes_ext::Bytes; use codec::Encoder; @@ -31,17 +32,17 @@ use common_types::{ time::TimeRange, SequenceNumber, }; -use generic_error::BoxError; use logger::{debug, trace}; +use macros::ensure; use skiplist::{BytewiseComparator, Skiplist}; -use snafu::{ensure, OptionExt, ResultExt}; use crate::memtable::{ + error::InnerError, key::{ComparableInternalKey, KeySequence}, reversed_iter::ReversedColumnarIterator, skiplist::iter::ColumnarIterImpl, - ColumnarIterPtr, EncodeInternalKey, InvalidPutSequence, InvalidRow, KeyTooLarge, MemTable, - Metrics as MemtableMetrics, PutContext, Result, ScanContext, ScanRequest, TimestampNotFound, + ColumnarIterPtr, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext, + ScanRequest, }; #[derive(Default, Debug)] @@ -143,27 +144,30 @@ impl + Clone + Sync + Send + 'static> MemTable // Encode key key_encoder .encode(internal_key, row) - .context(EncodeInternalKey)?; + .context("encode interval key")?; // TODO: we should check row's primary key size at the beginning of write // process, so WAL and memtable can keep in sync. ensure!( internal_key.len() <= skiplist::MAX_KEY_SIZE as usize, - KeyTooLarge { + InnerError::KeyTooLarge { current: internal_key.len(), - max: skiplist::MAX_KEY_SIZE, + max: skiplist::MAX_KEY_SIZE as usize, } ); // Encode row value. The ContiguousRowWriter will clear the buf. let row_value = &mut ctx.value_buf; let mut row_writer = ContiguousRowWriter::new(row_value, schema, &ctx.index_in_writer); - row_writer.write_row(row).box_err().context(InvalidRow)?; + row_writer.write_row(row).context("invalid row")?; let encoded_size = internal_key.len() + row_value.len(); self.skiplist.put(internal_key, row_value); // Update min/max time - let timestamp = row.timestamp(schema).context(TimestampNotFound)?.as_i64(); + let timestamp = row + .timestamp(schema) + .context("timestamp not found")? + .as_i64(); _ = self .min_time .fetch_update(atomic::Ordering::Relaxed, atomic::Ordering::Relaxed, |v| { @@ -230,10 +234,7 @@ impl + Clone + Sync + Send + 'static> MemTable let last = self.last_sequence(); ensure!( sequence >= last, - InvalidPutSequence { - given: sequence, - last - } + "invalid sequence, given:{sequence}, last:{last}" ); self.last_sequence diff --git a/src/components/macros/src/lib.rs b/src/components/macros/src/lib.rs index f3e134ae99..c627f15199 100644 --- a/src/components/macros/src/lib.rs +++ b/src/components/macros/src/lib.rs @@ -38,6 +38,28 @@ macro_rules! hash_map( }; ); +/// Util for working with anyhow + thiserror +/// Works like anyhow's [ensure](https://docs.rs/anyhow/latest/anyhow/macro.ensure.html) +/// But return `Return` +#[macro_export] +macro_rules! ensure { + ($cond:expr, $msg:literal) => { + if !$cond { + return Err(anyhow::anyhow!($msg).into()); + } + }; + ($cond:expr, $err:expr) => { + if !$cond { + return Err($err.into()); + } + }; + ($cond:expr, $fmt:expr, $($arg:tt)*) => { + if !$cond { + return Err(anyhow::anyhow!($fmt, $($arg)*).into()); + } + }; +} + #[cfg(test)] mod tests { #[test]