Skip to content

Commit

Permalink
feat: Implement delete for the storage engine (#777)
Browse files Browse the repository at this point in the history
* docs: Fix incorrect comment of Vector::only_null

* feat: Add delete to WriteRequest and WriteBatch

* feat: Filter deleted rows

* fix: Fix panic after reopening engine

This is detected by adding a reopen step to the delete test for region.

* fix: Fix OpType::min_type()

* test: Add delete absent key test

* chore: Address CR comments
  • Loading branch information
evenyag authored Dec 30, 2022
1 parent 6fe205f commit 4d56d89
Show file tree
Hide file tree
Showing 24 changed files with 413 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/datatypes/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub trait Vector: Send + Sync + Serializable + Debug + VectorOp {
/// Returns whether row is null.
fn is_null(&self, row: usize) -> bool;

/// If the only value vector can contain is NULL.
/// If the vector only contains NULL.
fn only_null(&self) -> bool {
self.null_count() == self.len()
}
Expand Down
10 changes: 7 additions & 3 deletions src/mito/src/table/test_util/mock_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use storage::metadata::{RegionMetaImpl, RegionMetadata};
use storage::write_batch::WriteBatch;
use store_api::storage::{
AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse,
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest,
ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, ScanRequest, ScanResponse,
SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse,
};

pub type Result<T> = std::result::Result<T, MockError>;
Expand Down Expand Up @@ -173,7 +173,11 @@ impl Region for MockRegion {
}

fn write_request(&self) -> WriteBatch {
WriteBatch::new(self.in_memory_metadata().schema().clone())
let metadata = self.inner.metadata.load();
let user_schema = metadata.user_schema().clone();
let row_key_end = metadata.schema().store_schema().row_key_end();

WriteBatch::new(user_schema, row_key_end)
}

async fn alter(&self, request: AlterRequest) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/storage/benches/wal/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub fn new_test_batch() -> WriteBatch {
("10", LogicalTypeId::String, false),
],
Some(2),
3,
)
}

Expand Down
8 changes: 6 additions & 2 deletions src/storage/benches/wal/util/write_batch_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ use storage::write_batch::WriteBatch;

use crate::memtable::util::schema_util::{self, ColumnDef};

pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> WriteBatch {
pub fn new_write_batch(
column_defs: &[ColumnDef],
timestamp_index: Option<usize>,
row_key_end: usize,
) -> WriteBatch {
let schema = schema_util::new_schema_ref(column_defs, timestamp_index);

WriteBatch::new(schema)
WriteBatch::new(schema, row_key_end)
}
8 changes: 6 additions & 2 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ pub enum Error {
expect,
given
))]
LenNotEquals {
UnequalLengths {
name: String,
expect: usize,
given: usize,
Expand Down Expand Up @@ -434,6 +434,9 @@ pub enum Error {
backtrace: Backtrace,
source: datatypes::error::Error,
},

#[snafu(display("More columns than expected in the request"))]
MoreColumnThanExpected { backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -455,7 +458,8 @@ impl ErrorExt for Error {
| RequestTooLarge { .. }
| TypeMismatch { .. }
| HasNull { .. }
| LenNotEquals { .. } => StatusCode::InvalidArguments,
| UnequalLengths { .. }
| MoreColumnThanExpected { .. } => StatusCode::InvalidArguments,

Utf8 { .. }
| EncodeJson { .. }
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/memtable/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ mod tests {
("value", LogicalTypeId::Int64, true),
],
Some(0),
1,
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/memtable/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ fn test_memtable_projection() {
let k1 = Arc::new(UInt64Vector::from_slice(&[0, 1, 2])) as VectorRef;
let v0 = Arc::new(UInt64Vector::from_slice(&[10, 11, 12])) as VectorRef;
let sequences = Arc::new(UInt64Vector::from_slice(&[9, 9, 9])) as VectorRef;
let op_types = Arc::new(UInt8Vector::from_slice(&[0, 0, 0])) as VectorRef;
let op_types = Arc::new(UInt8Vector::from_slice(&[1, 1, 1])) as VectorRef;

assert_eq!(k0, *batch.column(0));
assert_eq!(k1, *batch.column(1));
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/proto/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn gen_mutation_types(payload: &Payload) -> Vec<i32> {
.mutations
.iter()
.map(|m| match m.op_type {
OpType::Delete => MutationType::Delete.into(),
OpType::Put => MutationType::Put.into(),
})
.collect::<Vec<_>>()
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ pub trait BatchOp {
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements
/// being masked out.
fn filter(&self, batch: &Batch, filter: &BooleanVector) -> Result<Batch>;

/// Unselect deleted rows according to the [`OpType`](store_api::storage::OpType).
///
/// # Panics
/// Panics if
/// - `batch` doesn't have a valid op type column.
/// - `selected.len()` is less than the number of rows.
fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec);
}

/// Reusable [Batch] builder.
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ impl<R> DedupReader<R> {
.get_or_insert_with(Batch::default)
.clone_from(&batch); // Use `clone_from` to reuse allocated memory if possible.

// TODO(yingwen): To support `DELETE`, we could find all rows whose op_types are equal
// to `OpType::Delete`, mark their `selected` to false, then filter the batch.
// Find all rows whose op_types are `OpType::Delete`, mark their `selected` to false.
self.schema.unselect_deleted(&batch, &mut self.selected);

let filter = BooleanVector::from_iterator(self.selected.iter().by_vals());
// Filter duplicate rows.
Expand Down
10 changes: 7 additions & 3 deletions src/storage/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber,
WriteContext, WriteResponse,
AlterRequest, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext,
WriteResponse,
};

use crate::error::{self, Error, Result};
Expand Down Expand Up @@ -91,7 +91,11 @@ impl<S: LogStore> Region for RegionImpl<S> {
}

fn write_request(&self) -> Self::WriteRequest {
WriteBatch::new(self.in_memory_metadata().schema().clone())
let metadata = self.inner.version_control().metadata();
let user_schema = metadata.user_schema().clone();
let row_key_end = metadata.schema().store_schema().row_key_end();

WriteBatch::new(user_schema, row_key_end)
}

async fn alter(&self, request: AlterRequest) -> Result<()> {
Expand Down
29 changes: 28 additions & 1 deletion src/storage/src/region/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use log_store::fs::noop::NoopLogStore;
use object_store::backend::fs;
use object_store::ObjectStore;
use store_api::storage::{
consts, Chunk, ChunkReader, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
};
use tempdir::TempDir;

Expand Down Expand Up @@ -124,6 +124,17 @@ impl<S: LogStore> TesterBase<S> {
pub fn committed_sequence(&self) -> SequenceNumber {
self.region.committed_sequence()
}

/// Delete by keys (timestamp).
pub async fn delete(&self, keys: &[i64]) -> WriteResponse {
let keys: Vec<TimestampMillisecond> = keys.iter().map(|v| (*v).into()).collect();
// Build a batch without version.
let mut batch = new_write_batch_for_test(false);
let keys = new_delete_data(&keys);
batch.delete(keys).unwrap();

self.region.write(&self.write_ctx, batch).await.unwrap()
}
}

pub type FileTesterBase = TesterBase<LocalFileLogStore>;
Expand All @@ -141,6 +152,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
("v0", LogicalTypeId::Int64, true),
],
Some(0),
2,
)
} else {
write_batch_util::new_write_batch(
Expand All @@ -153,6 +165,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
("v0", LogicalTypeId::Int64, true),
],
Some(0),
1,
)
}
}
Expand All @@ -173,6 +186,20 @@ fn new_put_data(data: &[(TimestampMillisecond, Option<i64>)]) -> HashMap<String,
put_data
}

fn new_delete_data(keys: &[TimestampMillisecond]) -> HashMap<String, VectorRef> {
let mut delete_data = HashMap::new();

let timestamps =
TimestampMillisecondVector::from_vec(keys.iter().map(|v| v.0.into()).collect());

delete_data.insert(
test_util::TIMESTAMP_NAME.to_string(),
Arc::new(timestamps) as VectorRef,
);

delete_data
}

fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
assert_eq!(2, chunk.columns.len());

Expand Down
65 changes: 65 additions & 0 deletions src/storage/src/region/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl Tester {
fn committed_sequence(&self) -> SequenceNumber {
self.base().committed_sequence()
}

async fn delete(&self, keys: &[i64]) -> WriteResponse {
self.base().delete(keys).await
}
}

#[tokio::test]
Expand Down Expand Up @@ -202,3 +206,64 @@ async fn test_scan_different_batch() {
assert_eq!(data, output);
}
}

#[tokio::test]
async fn test_put_delete_scan() {
let dir = TempDir::new("put-delete-scan").unwrap();
let store_dir = dir.path().to_str().unwrap();
let mut tester = Tester::new(REGION_NAME, store_dir).await;

let data = vec![
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, None),
(1004, Some(104)),
];

tester.put(&data).await;

let keys = [1001, 1003];

tester.delete(&keys).await;

let output = tester.full_scan().await;
let expect = vec![(1000, Some(100)), (1002, None), (1004, Some(104))];
assert_eq!(expect, output);

// Deletion is also persistent.
tester.try_reopen().await.unwrap();
let output = tester.full_scan().await;
assert_eq!(expect, output);
}

#[tokio::test]
async fn test_put_delete_absent_key() {
let dir = TempDir::new("put-delete-scan").unwrap();
let store_dir = dir.path().to_str().unwrap();
let mut tester = Tester::new(REGION_NAME, store_dir).await;

let data = vec![
(1000, Some(100)),
(1001, Some(101)),
(1002, None),
(1003, None),
(1004, Some(104)),
];

tester.put(&data).await;

// 999 and 1006 is absent.
let keys = [999, 1002, 1004, 1006];

tester.delete(&keys).await;

let output = tester.full_scan().await;
let expect = vec![(1000, Some(100)), (1001, Some(101)), (1003, None)];
assert_eq!(expect, output);

// Deletion is also persistent.
tester.try_reopen().await.unwrap();
let output = tester.full_scan().await;
assert_eq!(expect, output);
}
1 change: 1 addition & 0 deletions src/storage/src/region/tests/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn new_write_batch_for_test() -> WriteBatch {
("v1", LogicalTypeId::Int64, true),
],
Some(1),
2,
)
}

Expand Down
51 changes: 48 additions & 3 deletions src/storage/src/schema/projected.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use std::sync::Arc;

use common_base::BitVec;
use common_error::prelude::*;
use datatypes::prelude::ScalarVector;
use datatypes::schema::{SchemaBuilder, SchemaRef};
use datatypes::vectors::BooleanVector;
use store_api::storage::{Chunk, ColumnId};
use datatypes::vectors::{BooleanVector, UInt8Vector};
use store_api::storage::{Chunk, ColumnId, OpType};

use crate::error;
use crate::metadata::{self, Result};
Expand Down Expand Up @@ -167,7 +168,9 @@ impl ProjectedSchema {
/// Convert [Batch] into [Chunk].
///
/// This will remove all internal columns. The input `batch` should has the
/// same schema as `self.schema_to_read()`.
/// same schema as [`self.schema_to_read()`](ProjectedSchema::schema_to_read).
/// The output [Chunk] has the same schema as
/// [`self.projected_user_schema()`](ProjectedSchema::projected_user_schema).
pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk {
let columns = match &self.projection {
Some(projection) => projection
Expand Down Expand Up @@ -330,6 +333,29 @@ impl BatchOp for ProjectedSchema {

Ok(Batch::new(columns))
}

fn unselect_deleted(&self, batch: &Batch, selected: &mut BitVec) {
let op_types = batch.column(self.schema_to_read.op_type_index());
// Safety: We expect the batch has the same schema as `self.schema_to_read`. The
// read procedure should guarantee this, otherwise this is a critical bug and it
// should be fine to panic.
let op_types = op_types
.as_any()
.downcast_ref::<UInt8Vector>()
.unwrap_or_else(|| {
panic!(
"Expect op_type (UInt8) column at index {}, given {:?}",
self.schema_to_read.op_type_index(),
op_types.data_type()
);
});

for (i, op_type) in op_types.iter_data().enumerate() {
if op_type == Some(OpType::Delete.as_u8()) {
selected.set(i, false);
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -526,4 +552,23 @@ mod tests {
let expect: VectorRef = Arc::new(TimestampMillisecondVector::from_values([1000, 3000]));
assert_eq!(expect, *res.column(0));
}

#[test]
fn test_unselect_deleted() {
let schema = read_util::new_projected_schema();
let batch = read_util::new_full_kv_batch(&[
(100, 1, 1000, OpType::Put),
(101, 1, 999, OpType::Delete),
(102, 1, 1000, OpType::Put),
(103, 1, 999, OpType::Put),
(104, 1, 1000, OpType::Delete),
]);

let mut selected = BitVec::repeat(true, batch.num_rows());
schema.unselect_deleted(&batch, &mut selected);
assert_eq!(
BitVec::from_iter([true, false, true, true, false]),
selected
);
}
}
Loading

0 comments on commit 4d56d89

Please sign in to comment.