From ffd788a8af1aa4b789d4372e518b8d1b2e5b8201 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 24 Jun 2024 15:56:30 +0800 Subject: [PATCH] refactor: break mutations_to_record_batch --- src/mito2/src/error.rs | 8 +++ src/mito2/src/memtable.rs | 1 + src/mito2/src/memtable/bulk.rs | 3 +- src/mito2/src/memtable/bulk/part.rs | 80 +++++++++++++----------- src/mito2/src/memtable/partition_tree.rs | 7 ++- src/mito2/src/memtable/time_series.rs | 10 ++- 6 files changed, 67 insertions(+), 42 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index c9320f75e03a..8ea31ff46fde 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -745,6 +745,13 @@ pub enum Error { location: Location, source: Arc, }, + + #[snafu(display("Operation is not supported: {}", err_msg))] + UnsupportedOperation { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -862,6 +869,7 @@ impl ErrorExt for Error { RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, BuildTimeRangeFilter { .. } => StatusCode::Unexpected, + UnsupportedOperation { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index ed9483d506cf..c5afff3a67b1 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -102,6 +102,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Writes one key value pair into the memtable. fn write_one(&self, key_value: KeyValue) -> Result<()>; + /// Writes an encoded batch of into memtable. fn write_bulk(&self, part: BulkPart) -> Result<()>; /// Scans the memtable. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 923d32329ccd..312d9b809e73 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -50,7 +50,8 @@ impl Memtable for BulkMemtable { } fn write_bulk(&self, fragment: BulkPart) -> Result<()> { - self.parts.write().unwrap().push(fragment); + let mut parts = self.parts.write().unwrap(); + parts.push(fragment); Ok(()) } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index e67596e24602..fe9db60707c9 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -189,41 +189,8 @@ fn sort_arrays_to_record_batch( schema: SchemaRef, dedup: bool, ) -> Result<(RecordBatch, i64, i64)> { - let timestamp_iter = match metadata - .time_index_column() - .column_schema - .data_type - .as_timestamp() - .unwrap() - { - // safety: timestamp column must be valid. - TimestampType::Second(_) => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimestampType::Millisecond(_) => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimestampType::Microsecond(_) => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - TimestampType::Nanosecond(_) => timestamp - .as_any() - .downcast_ref::() - .unwrap() - .values() - .iter(), - }; - let mut max_timestamp = i64::MIN; - let mut min_timestamp = i64::MAX; + let timestamp_iter = timestamp_array_to_iter(&metadata, ×tamp); + let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN); let mut to_sort = pk .iter() .zip(timestamp_iter) @@ -287,9 +254,7 @@ fn sort_arrays_to_record_batch( .context(ComputeArrowSnafu)?; arrays.push(timestamp); - arrays.push(pk_dictionary); - arrays.push( arrow::compute::take( &sequence, @@ -316,6 +281,47 @@ fn sort_arrays_to_record_batch( Ok((batch, min_timestamp, max_timestamp)) } +/// Converts timestamp array to an iter of i64 values. +fn timestamp_array_to_iter<'a>( + metadata: &RegionMetadataRef, + timestamp: &'a ArrayRef, +) -> impl Iterator { + // safety: timestamp column must be valid, and values must not be None. + match metadata + .time_index_column() + .column_schema + .data_type + .as_timestamp() + .unwrap() + { + // safety: timestamp column must be valid. + TimestampType::Second(_) => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimestampType::Millisecond(_) => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimestampType::Microsecond(_) => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + TimestampType::Nanosecond(_) => timestamp + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter(), + } +} + /// Converts a **sorted** [BinaryArray] to [DictionaryArray]. fn binary_array_to_dictionary(input: &BinaryArray) -> Result> { if input.is_empty() { diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 569eacd73d4c..e3f608caca53 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -34,7 +34,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::Result; +use crate::error::{Result, UnsupportedOperationSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::metrics::WriteMetrics; @@ -149,7 +149,10 @@ impl Memtable for PartitionTreeMemtable { } fn write_bulk(&self, _part: BulkPart) -> Result<()> { - unimplemented!("PartitionTreeMemtable does not support write_bulk") + UnsupportedOperationSnafu { + err_msg: "PartitionTreeMemtable does not support write_bulk", + } + .fail() } fn iter( diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b86038470a5f..b2304758295c 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -36,7 +36,10 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{ + ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, + UnsupportedOperationSnafu, +}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::{ @@ -225,7 +228,10 @@ impl Memtable for TimeSeriesMemtable { } fn write_bulk(&self, _part: BulkPart) -> Result<()> { - unimplemented!("TimeSeriesMemtable does not support write_bulk") + UnsupportedOperationSnafu { + err_msg: "TimeSeriesMemtable does not support write_bulk", + } + .fail() } fn iter(