Skip to content

Commit

Permalink
fix: merge last fields may not reset builder
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Jun 24, 2024
1 parent 799f11f commit 211a371
Showing 1 changed file with 182 additions and 36 deletions.
218 changes: 182 additions & 36 deletions src/mito2/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ impl LastFieldsBuilder {

self.initialized = true;

common_telemetry::info!("Init by batch: {:?}", batch);

let last_idx = batch.num_rows() - 1;
let fields = batch.fields();
// Safety: The last_idx is valid.
Expand Down Expand Up @@ -326,52 +328,50 @@ impl LastFieldsBuilder {
) -> Result<Option<Batch>> {
debug_assert!(self.initialized);

if self.last_fields.is_empty() {
let mut output = if self.last_fields.is_empty() {
common_telemetry::info!("Not need to overwrite");

// No need to overwrite the last row.
let mut buffer = buffer;
if self.filter_deleted {
filter_deleted_from_batch(&mut buffer, metrics)?;
buffer
} else {
// Builds last fields.
for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
// Safety: Vectors of the batch has the same type.
builder.push_value_ref(value.as_value_ref());
}
return Ok(Some(buffer));
}

// Builds last fields.
for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
// Safety: Vectors of the batch has the same type.
builder.push_value_ref(value.as_value_ref());
}
let fields = self
.builders
.iter_mut()
.zip(buffer.fields())
.map(|(builder, col)| BatchColumn {
column_id: col.column_id,
data: builder.to_vector(),
})
.collect();

// Resets itself. `self.builders` already reset in `to_vector()`.
self.clear();
let fields = self
.builders
.iter_mut()
.zip(buffer.fields())
.map(|(builder, col)| BatchColumn {
column_id: col.column_id,
data: builder.to_vector(),
})
.collect();

let mut merged = if buffer.num_rows() == 1 {
// Replaces the buffer directly if it only has one row.
buffer.with_fields(fields)?
} else {
// Replaces the last row of the buffer.
let front = buffer.slice(0, buffer.num_rows() - 1);
let last = buffer.slice(buffer.num_rows() - 1, 1);
let last = last.with_fields(fields)?;
Batch::concat(vec![front, last])?
if buffer.num_rows() == 1 {
// Replaces the buffer directly if it only has one row.
buffer.with_fields(fields)?
} else {
// Replaces the last row of the buffer.
let front = buffer.slice(0, buffer.num_rows() - 1);
let last = buffer.slice(buffer.num_rows() - 1, 1);
let last = last.with_fields(fields)?;
Batch::concat(vec![front, last])?
}
};

if self.filter_deleted {
filter_deleted_from_batch(&mut merged, metrics)?;
filter_deleted_from_batch(&mut output, metrics)?;
}

if merged.is_empty() {
// Resets itself. `self.builders` already reset in `to_vector()`.
self.clear();

if output.is_empty() {
Ok(None)
} else {
Ok(Some(merged))
Ok(Some(output))
}
}

Expand Down Expand Up @@ -426,13 +426,19 @@ impl DedupStrategy for LastNotNull {
if buffer.primary_key() != batch.primary_key() {
// Next key is different.
let buffer = std::mem::replace(buffer, batch);
common_telemetry::info!(
"different key, merge last not null, buffer: {:?}, self.buffer: {:?}",
buffer,
self.buffer
);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
return Ok(merged);
}

if buffer.last_timestamp() != batch.first_timestamp() {
// The next batch has a different timestamp.
let buffer = std::mem::replace(buffer, batch);
common_telemetry::info!("different ts, merge last not null, buffer: {:?}", buffer);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;
return Ok(merged);
}
Expand All @@ -442,17 +448,32 @@ impl DedupStrategy for LastNotNull {
metrics.num_unselected_rows += 1;
// We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
if batch.num_rows() == 1 {
common_telemetry::info!(
"num rows 1, push first row, batch: {:?}, buffer: {:?}",
batch,
buffer
);
self.last_fields.push_first_row(&batch);
return Ok(None);
}

// The next batch has the same key and timestamp but contains multiple rows.
// We can merge the first row and buffer the remaining rows.
let first = batch.slice(0, 1);
common_telemetry::info!(
"more rows, push first row, first: {:?}, buffer: {:?}",
first,
buffer
);
self.last_fields.push_first_row(&first);
// Moves the remaining rows to the buffer.
let batch = batch.slice(1, batch.num_rows() - 1);
let buffer = std::mem::replace(buffer, batch);
common_telemetry::info!(
"more rows, remaining rows, buffer: {:?}, self.buffer: {:?}",
buffer,
self.buffer
);
let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;

Ok(merged)
Expand All @@ -466,6 +487,8 @@ impl DedupStrategy for LastNotNull {
// Initializes last fields with the first buffer.
self.last_fields.maybe_init(&buffer);

common_telemetry::info!("finish, buffer: {:?}", buffer);

let merged = self.last_fields.merge_last_not_null(buffer, metrics)?;

Ok(merged)
Expand Down Expand Up @@ -871,4 +894,127 @@ mod tests {
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}

fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
let mut actual = Vec::new();
let mut metrics = DedupMetrics::default();
for batch in input {
if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() {
actual.push(out);
}
}
if let Some(out) = strategy.finish(&mut metrics).unwrap() {
actual.push(out);
}

assert_eq!(expect, actual);
}

#[test]
fn test_last_not_null_strategy_delete_last() {
common_telemetry::init_default_ut_logging();

let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k2",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Delete],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];

let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]),
],
);
}

#[test]
fn test_last_not_null_strategy_delete_one() {
common_telemetry::init_default_ut_logging();

let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
];

let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[new_batch_multi_fields(
b"k2",
&[1],
&[6],
&[OpType::Put],
&[(Some(11), None)],
)],
);
}

#[test]
fn test_last_not_null_strategy_delete_all() {
common_telemetry::init_default_ut_logging();

let input = [
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]),
];

let mut strategy = LastNotNull::new(true);
check_dedup_strategy(&input, &mut strategy, &[]);
}

#[test]
fn test_last_not_null_strategy_same_batch() {
common_telemetry::init_default_ut_logging();

let input = [
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(
b"k1",
&[1, 2],
&[1, 7],
&[OpType::Put, OpType::Put],
&[(Some(1), None), (Some(22), Some(222))],
),
new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[2, 5],
&[OpType::Put, OpType::Put],
&[(None, None), (Some(13), None)],
),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]),
];

let mut strategy = LastNotNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]),
new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]),
],
);
}
}

0 comments on commit 211a371

Please sign in to comment.