Skip to content

Commit

Permalink
refactor: break mutations_to_record_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jun 24, 2024
1 parent 9f6ce37 commit ffd788a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 42 deletions.
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,13 @@ pub enum Error {
location: Location,
source: Arc<Error>,
},

#[snafu(display("Operation is not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -862,6 +869,7 @@ impl ErrorExt for Error {
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
UnsupportedOperation { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;

/// Writes an encoded batch of into memtable.
fn write_bulk(&self, part: BulkPart) -> Result<()>;

/// Scans the memtable.
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl Memtable for BulkMemtable {
}

fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
self.parts.write().unwrap().push(fragment);
let mut parts = self.parts.write().unwrap();
parts.push(fragment);
Ok(())
}

Expand Down
80 changes: 43 additions & 37 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,41 +189,8 @@ fn sort_arrays_to_record_batch(
schema: SchemaRef,
dedup: bool,
) -> Result<(RecordBatch, i64, i64)> {
let timestamp_iter = match metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
{
// safety: timestamp column must be valid.
TimestampType::Second(_) => timestamp
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Millisecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Microsecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Nanosecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values()
.iter(),
};
let mut max_timestamp = i64::MIN;
let mut min_timestamp = i64::MAX;
let timestamp_iter = timestamp_array_to_iter(&metadata, &timestamp);
let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
let mut to_sort = pk
.iter()
.zip(timestamp_iter)
Expand Down Expand Up @@ -287,9 +254,7 @@ fn sort_arrays_to_record_batch(
.context(ComputeArrowSnafu)?;

arrays.push(timestamp);

arrays.push(pk_dictionary);

arrays.push(
arrow::compute::take(
&sequence,
Expand All @@ -316,6 +281,47 @@ fn sort_arrays_to_record_batch(
Ok((batch, min_timestamp, max_timestamp))
}

/// Converts timestamp array to an iter of i64 values.
fn timestamp_array_to_iter<'a>(
metadata: &RegionMetadataRef,
timestamp: &'a ArrayRef,
) -> impl Iterator<Item = &'a i64> {
// safety: timestamp column must be valid, and values must not be None.
match metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
{
// safety: timestamp column must be valid.
TimestampType::Second(_) => timestamp
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Millisecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Microsecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values()
.iter(),
TimestampType::Nanosecond(_) => timestamp
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values()
.iter(),
}
}

/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
if input.is_empty() {
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::metrics::WriteMetrics;
Expand Down Expand Up @@ -149,7 +149,10 @@ impl Memtable for PartitionTreeMemtable {
}

fn write_bulk(&self, _part: BulkPart) -> Result<()> {
unimplemented!("PartitionTreeMemtable does not support write_bulk")
UnsupportedOperationSnafu {
err_msg: "PartitionTreeMemtable does not support write_bulk",
}
.fail()
}

fn iter(
Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result,
UnsupportedOperationSnafu,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
Expand Down Expand Up @@ -225,7 +228,10 @@ impl Memtable for TimeSeriesMemtable {
}

fn write_bulk(&self, _part: BulkPart) -> Result<()> {
unimplemented!("TimeSeriesMemtable does not support write_bulk")
UnsupportedOperationSnafu {
err_msg: "TimeSeriesMemtable does not support write_bulk",
}
.fail()
}

fn iter(
Expand Down

0 comments on commit ffd788a

Please sign in to comment.