Skip to content

Commit

Permalink
feat: introduce bulk memtable encoder/decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jun 24, 2024
1 parent 5566dd7 commit b9a2a0d
Show file tree
Hide file tree
Showing 11 changed files with 922 additions and 65 deletions.
8 changes: 8 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,13 @@ pub enum Error {
location: Location,
source: Arc<Error>,
},

#[snafu(display("Operation is not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -871,6 +878,7 @@ impl ErrorExt for Error {
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
UnsupportedOperation { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

pub use bulk::part::BulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
Expand All @@ -34,7 +35,7 @@ use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::region::options::MemtableOptions;

pub mod bulk;
pub mod key_values;
pub mod partition_tree;
pub mod time_partition;
Expand Down Expand Up @@ -101,6 +102,9 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;

/// Writes an encoded batch of into memtable.
fn write_bulk(&self, part: BulkPart) -> Result<()>;

/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
Expand Down
84 changes: 84 additions & 0 deletions src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Memtable implementation for bulk load

use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRef, MemtableStats,
};

#[allow(unused)]
pub(crate) mod part;

#[derive(Debug)]
pub struct BulkMemtable {
id: MemtableId,
parts: RwLock<Vec<BulkPart>>,
}

impl Memtable for BulkMemtable {
fn id(&self) -> MemtableId {
self.id
}

fn write(&self, _kvs: &KeyValues) -> Result<()> {
unimplemented!()
}

fn write_one(&self, _key_value: KeyValue) -> Result<()> {
unimplemented!()
}

fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
let mut parts = self.parts.write().unwrap();
parts.push(fragment);
Ok(())
}

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
}

fn is_empty(&self) -> bool {
self.parts.read().unwrap().is_empty()
}

fn freeze(&self) -> Result<()> {
Ok(())
}

fn stats(&self) -> MemtableStats {
todo!()
}

fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self {
id,
parts: RwLock::new(vec![]),
})
}
}
Loading

0 comments on commit b9a2a0d

Please sign in to comment.