Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: first step to refactor error #1524

Merged
merged 7 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async-trait = "0.1.72"
atomic_enum = "0.2.0"
base64 = "0.13"
bytes = "1"
thiserror = "1"
bytes_ext = { path = "src/components/bytes_ext" }
catalog = { path = "src/catalog" }
catalog_impls = { path = "src/catalog_impls" }
Expand Down
2 changes: 2 additions & 0 deletions src/analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ wal-rocksdb = ["wal/wal-rocksdb"]

[dependencies]
# In alphabetical order
anyhow = { workspace = true }
arc-swap = "1.4.0"
arena = { workspace = true }
arrow = { workspace = true }
Expand Down Expand Up @@ -81,6 +82,7 @@ snafu = { workspace = true }
table_engine = { workspace = true }
table_kv = { workspace = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
trace_metric = { workspace = true }
Expand Down
23 changes: 23 additions & 0 deletions src/analytic_engine/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Global Error type for analytic engine.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ErrorKind {
KeyTooLarge,
Internal,
}
25 changes: 12 additions & 13 deletions src/analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ use crate::{
engine::{Error, ReplayWalWithCause, Result},
flush_compaction::{Flusher, TableFlushOptions},
serial_executor::TableOpSerialExecutor,
write::MemTableWriter,
write::{Error as WriteError, MemTableWriter},
},
payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder},
table::data::TableDataRef,
ErrorKind,
};

// Metrics of wal replayer
Expand Down Expand Up @@ -547,22 +548,20 @@ async fn replay_table_log_entries(
let index_in_writer =
IndexInWriterSchema::for_same_schema(row_group.schema().num_columns());
let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec);
let write_res = memtable_writer
.write(sequence, row_group, index_in_writer)
.box_err()
.context(ReplayWalWithCause {
msg: Some(format!(
"table_id:{}, table_name:{}, space_id:{}",
table_data.space_id, table_data.name, table_data.id
)),
});
let write_res = memtable_writer.write(sequence, row_group, index_in_writer);
if let Err(e) = write_res {
// TODO: find a better way to match this.
if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) {
if matches!(e, WriteError::UpdateMemTableSequence { ref source } if source.kind() == ErrorKind::KeyTooLarge )
{
// ignore this error
warn!("Unable to insert memtable, err:{e}");
} else {
return Err(e);
return Err(Error::ReplayWalWithCause {
msg: Some(format!(
"table_id:{}, table_name:{}, space_id:{}",
table_data.space_id, table_data.name, table_data.id
)),
source: Box::new(e),
});
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
mod compaction;
mod context;
mod engine;
pub mod error;
mod instance;
mod manifest;
pub mod memtable;
Expand All @@ -39,6 +40,7 @@ pub mod table_meta_set_impl;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use error::ErrorKind;
use manifest::details::Options as ManifestOptions;
use object_store::config::StorageOptions;
use serde::{Deserialize, Serialize};
Expand Down
42 changes: 17 additions & 25 deletions src/analytic_engine/src/memtable/columnar/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
time::Instant,
};

use anyhow::Context;
use arena::{Arena, BasicStats, MonoIncArena};
use bytes_ext::{ByteVec, Bytes};
use codec::{memcomparable::MemComparable, row, Encoder};
Expand All @@ -36,17 +37,14 @@ use common_types::{
schema::Schema,
SequenceNumber,
};
use generic_error::BoxError;
use logger::trace;
use macros::ensure;
use parquet::data_type::AsBytes;
use skiplist::{ArenaSlice, BytewiseComparator, IterRef, Skiplist};
use snafu::{OptionExt, ResultExt};

use crate::memtable::{
key,
key::{KeySequence, SequenceCodec},
AppendRow, BuildRecordBatch, DecodeInternalKey, Internal, InternalNoCause, IterTimeout,
ProjectSchema, Result, ScanContext, ScanRequest,
key::{self, KeySequence, SequenceCodec},
Result, ScanContext, ScanRequest,
};

/// Iterator state
Expand Down Expand Up @@ -106,7 +104,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
let row_projector = request
.row_projector_builder
.build(&schema)
.context(ProjectSchema)?;
.context("ProjectSchema")?;
let mut columnar_iter = Self {
memtable,
row_num,
Expand Down Expand Up @@ -147,26 +145,18 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
let column_schema = self.memtable_schema.column(*idx);
let column = memtable
.get(&column_schema.id)
.with_context(|| InternalNoCause {
msg: format!("column not found, column:{}", column_schema.name),
})?;
.with_context(|| format!("column not found, column:{}", column_schema.name))?;
for (i, key) in key_vec.iter_mut().enumerate().take(self.row_num) {
let datum = column.get_datum(i);
encoder
.encode(key, &datum)
.box_err()
.context(Internal { msg: "encode key" })?;
encoder.encode(key, &datum).context("encode key")?;
}
}

// TODO: Persist the skiplist.
for (i, mut key) in key_vec.into_iter().enumerate() {
SequenceCodec
.encode(&mut key, &KeySequence::new(self.last_sequence, i as u32))
.box_err()
.context(Internal {
msg: "encode key sequence",
})?;
.context("encode key sequence")?;
self.skiplist.put(&key, (i as u32).to_le_bytes().as_slice());
}

Expand Down Expand Up @@ -203,9 +193,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
if !rows.is_empty() {
if let Some(deadline) = self.deadline {
let now = Instant::now();
if now >= deadline {
return IterTimeout { now, deadline }.fail();
}
ensure!(
now < deadline,
"iter timeout, now:{now:?}, deadline:{deadline:?}"
);
}

let fetched_schema = self.row_projector.fetched_schema().clone();
Expand All @@ -219,10 +210,10 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
self.batch_size,
);
for row in rows.into_iter() {
builder.append_row(row).context(AppendRow)?;
builder.append_row(row).context("AppendRow")?;
}

let batch = builder.build().context(BuildRecordBatch)?;
let batch = builder.build().context("BuildRecordBatch")?;
trace!("column iterator send one batch:{:?}", batch);
Ok(Some(batch))
} else {
Expand All @@ -245,7 +236,8 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
while self.iter.valid() {
// Fetch current entry
let key = self.iter.key();
let (user_key, _) = key::user_key_from_internal_key(key).context(DecodeInternalKey)?;
let (user_key, _) =
key::user_key_from_internal_key(key).context("DecodeInternalKey")?;

// Check user key is still in range
if self.is_after_end_bound(user_key) {
Expand All @@ -262,7 +254,7 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {
// be set as last_internal_key so maybe we can just
// unwrap it?
let (last_user_key, _) = key::user_key_from_internal_key(last_internal_key)
.context(DecodeInternalKey)?;
.context("DecodeInternalKey")?;
user_key == last_user_key
}
// This is the first user key
Expand Down
40 changes: 11 additions & 29 deletions src/analytic_engine/src/memtable/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,21 @@ use std::{
},
};

use anyhow::Context;
use arena::MonoIncArena;
use bytes_ext::Bytes;
use common_types::{
column::Column, column_schema::ColumnId, datum::Datum, row::Row, schema::Schema,
time::TimeRange, SequenceNumber,
};
use generic_error::BoxError;
use logger::debug;
use macros::ensure;
use skiplist::{BytewiseComparator, Skiplist};
use snafu::{ensure, OptionExt, ResultExt};

use crate::memtable::{
columnar::iter::ColumnarIterImpl, factory::Options, key::KeySequence,
reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, Internal, InternalNoCause,
InvalidPutSequence, MemTable, Metrics as MemtableMetrics, PutContext, Result, ScanContext,
ScanRequest,
reversed_iter::ReversedColumnarIterator, ColumnarIterPtr, MemTable, Metrics as MemtableMetrics,
PutContext, Result, ScanContext, ScanRequest,
};

pub mod factory;
Expand Down Expand Up @@ -108,16 +107,11 @@ impl MemTable for ColumnarMemTable {
} else {
// TODO: impl append() one row in column, avoid memory expansion.
let column = Column::with_capacity(1, column_schema.data_type)
.box_err()
.context(Internal {
msg: "new column failed",
})?;
.context("new column failed")?;
columns.insert(column_schema.id, column);
columns
.get_mut(&column_schema.id)
.context(InternalNoCause {
msg: "get column failed",
})?
.context("get column failed")?
};

if let Some(writer_index) = ctx.index_in_writer.column_index_in_writer(i) {
Expand All @@ -127,10 +121,7 @@ impl MemTable for ColumnarMemTable {
} else {
column
.append_datum_ref(&row[writer_index])
.box_err()
.context(Internal {
msg: "append datum failed",
})?
.context("append datum failed")?
}
} else {
column.append_nulls(1);
Expand All @@ -140,9 +131,7 @@ impl MemTable for ColumnarMemTable {
let mut memtable = self.memtable.write().unwrap();
for (k, v) in columns {
if let Some(column) = memtable.get_mut(&k) {
column.append_column(v).box_err().context(Internal {
msg: "append column",
})?;
column.append_column(v).context("append column")?;
} else {
memtable.insert(k, v);
};
Expand Down Expand Up @@ -174,18 +163,14 @@ impl MemTable for ColumnarMemTable {
.schema
.columns()
.get(self.schema.timestamp_index())
.context(InternalNoCause {
msg: "timestamp column is missing",
})?;
.context("timestamp column is missing")?;

let num_rows = self
.memtable
.read()
.unwrap()
.get(&timestamp_column.id)
.context(InternalNoCause {
msg: "get timestamp column failed",
})?
.context("get timestamp column failed")?
.len();
let (reverse, batch_size) = (request.reverse, ctx.batch_size);
let arena = MonoIncArena::with_collector(
Expand Down Expand Up @@ -219,10 +204,7 @@ impl MemTable for ColumnarMemTable {
let last = self.last_sequence();
ensure!(
sequence >= last,
InvalidPutSequence {
given: sequence,
last
}
"invalid sequence, given:{sequence}, last:{last}"
);

self.last_sequence.store(sequence, Ordering::Relaxed);
Expand Down
51 changes: 51 additions & 0 deletions src/analytic_engine/src/memtable/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use thiserror::Error;

use crate::ErrorKind;

#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(#[from] InnerError);

impl From<anyhow::Error> for Error {
fn from(source: anyhow::Error) -> Self {
Self(InnerError::Other { source })
}
}

impl Error {
pub fn kind(&self) -> ErrorKind {
match self.0 {
InnerError::KeyTooLarge { .. } => ErrorKind::KeyTooLarge,
InnerError::Other { .. } => ErrorKind::Internal,
}
}
}

#[derive(Error, Debug)]
pub(crate) enum InnerError {
#[error("too large key, max:{max}, current:{current}")]
KeyTooLarge { current: usize, max: usize },

#[error(transparent)]
Other {
#[from]
source: anyhow::Error,
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
},
}
Loading
Loading