diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index 8efc5961b671..52f6193541ec 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -265,6 +265,8 @@ impl LastFieldsBuilder { self.initialized = true; + common_telemetry::info!("Init by batch: {:?}", batch); + let last_idx = batch.num_rows() - 1; let fields = batch.fields(); // Safety: The last_idx is valid. @@ -326,52 +328,50 @@ impl LastFieldsBuilder { ) -> Result> { debug_assert!(self.initialized); - if self.last_fields.is_empty() { + let mut output = if self.last_fields.is_empty() { + common_telemetry::info!("Not need to overwrite"); + // No need to overwrite the last row. - let mut buffer = buffer; - if self.filter_deleted { - filter_deleted_from_batch(&mut buffer, metrics)?; + buffer + } else { + // Builds last fields. + for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) { + // Safety: Vectors of the batch has the same type. + builder.push_value_ref(value.as_value_ref()); } - return Ok(Some(buffer)); - } - - // Builds last fields. - for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) { - // Safety: Vectors of the batch has the same type. - builder.push_value_ref(value.as_value_ref()); - } - let fields = self - .builders - .iter_mut() - .zip(buffer.fields()) - .map(|(builder, col)| BatchColumn { - column_id: col.column_id, - data: builder.to_vector(), - }) - .collect(); - - // Resets itself. `self.builders` already reset in `to_vector()`. - self.clear(); + let fields = self + .builders + .iter_mut() + .zip(buffer.fields()) + .map(|(builder, col)| BatchColumn { + column_id: col.column_id, + data: builder.to_vector(), + }) + .collect(); - let mut merged = if buffer.num_rows() == 1 { - // Replaces the buffer directly if it only has one row. - buffer.with_fields(fields)? - } else { - // Replaces the last row of the buffer. - let front = buffer.slice(0, buffer.num_rows() - 1); - let last = buffer.slice(buffer.num_rows() - 1, 1); - let last = last.with_fields(fields)?; - Batch::concat(vec![front, last])? + if buffer.num_rows() == 1 { + // Replaces the buffer directly if it only has one row. + buffer.with_fields(fields)? + } else { + // Replaces the last row of the buffer. + let front = buffer.slice(0, buffer.num_rows() - 1); + let last = buffer.slice(buffer.num_rows() - 1, 1); + let last = last.with_fields(fields)?; + Batch::concat(vec![front, last])? + } }; if self.filter_deleted { - filter_deleted_from_batch(&mut merged, metrics)?; + filter_deleted_from_batch(&mut output, metrics)?; } - if merged.is_empty() { + // Resets itself. `self.builders` already reset in `to_vector()`. + self.clear(); + + if output.is_empty() { Ok(None) } else { - Ok(Some(merged)) + Ok(Some(output)) } } @@ -426,6 +426,11 @@ impl DedupStrategy for LastNotNull { if buffer.primary_key() != batch.primary_key() { // Next key is different. let buffer = std::mem::replace(buffer, batch); + common_telemetry::info!( + "different key, merge last not null, buffer: {:?}, self.buffer: {:?}", + buffer, + self.buffer + ); let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; return Ok(merged); } @@ -433,6 +438,7 @@ impl DedupStrategy for LastNotNull { if buffer.last_timestamp() != batch.first_timestamp() { // The next batch has a different timestamp. let buffer = std::mem::replace(buffer, batch); + common_telemetry::info!("different ts, merge last not null, buffer: {:?}", buffer); let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; return Ok(merged); } @@ -442,6 +448,11 @@ impl DedupStrategy for LastNotNull { metrics.num_unselected_rows += 1; // We assumes each batch doesn't contain duplicate rows so we only need to check the first row. if batch.num_rows() == 1 { + common_telemetry::info!( + "num rows 1, push first row, batch: {:?}, buffer: {:?}", + batch, + buffer + ); self.last_fields.push_first_row(&batch); return Ok(None); } @@ -449,10 +460,20 @@ impl DedupStrategy for LastNotNull { // The next batch has the same key and timestamp but contains multiple rows. // We can merge the first row and buffer the remaining rows. let first = batch.slice(0, 1); + common_telemetry::info!( + "more rows, push first row, first: {:?}, buffer: {:?}", + first, + buffer + ); self.last_fields.push_first_row(&first); // Moves the remaining rows to the buffer. let batch = batch.slice(1, batch.num_rows() - 1); let buffer = std::mem::replace(buffer, batch); + common_telemetry::info!( + "more rows, remaining rows, buffer: {:?}, self.buffer: {:?}", + buffer, + self.buffer + ); let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; Ok(merged) @@ -466,6 +487,8 @@ impl DedupStrategy for LastNotNull { // Initializes last fields with the first buffer. self.last_fields.maybe_init(&buffer); + common_telemetry::info!("finish, buffer: {:?}", buffer); + let merged = self.last_fields.merge_last_not_null(buffer, metrics)?; Ok(merged) @@ -871,4 +894,127 @@ mod tests { assert_eq!(1, reader.metrics().num_unselected_rows); assert_eq!(0, reader.metrics().num_deleted_rows); } + + fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) { + let mut actual = Vec::new(); + let mut metrics = DedupMetrics::default(); + for batch in input { + if let Some(out) = strategy.push_batch(batch.clone(), &mut metrics).unwrap() { + actual.push(out); + } + } + if let Some(out) = strategy.finish(&mut metrics).unwrap() { + actual.push(out); + } + + assert_eq!(expect, actual); + } + + #[test] + fn test_last_not_null_strategy_delete_last() { + common_telemetry::init_default_ut_logging(); + + let input = [ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields( + b"k1", + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]), + new_batch_multi_fields( + b"k2", + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Delete], + &[(None, None), (Some(13), None)], + ), + new_batch_multi_fields(b"k2", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]), + new_batch_multi_fields(b"k2", &[2], &[2], &[OpType::Put], &[(None, None)]), + ], + ); + } + + #[test] + fn test_last_not_null_strategy_delete_one() { + common_telemetry::init_default_ut_logging(); + + let input = [ + new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[new_batch_multi_fields( + b"k2", + &[1], + &[6], + &[OpType::Put], + &[(Some(11), None)], + )], + ); + } + + #[test] + fn test_last_not_null_strategy_delete_all() { + common_telemetry::init_default_ut_logging(); + + let input = [ + new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k2", &[1], &[6], &[OpType::Delete], &[(Some(11), None)]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy(&input, &mut strategy, &[]); + } + + #[test] + fn test_last_not_null_strategy_same_batch() { + common_telemetry::init_default_ut_logging(); + + let input = [ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields( + b"k1", + &[1, 2], + &[1, 7], + &[OpType::Put, OpType::Put], + &[(Some(1), None), (Some(22), Some(222))], + ), + new_batch_multi_fields(b"k1", &[2], &[4], &[OpType::Put], &[(Some(12), None)]), + new_batch_multi_fields( + b"k1", + &[2, 3], + &[2, 5], + &[OpType::Put, OpType::Put], + &[(None, None), (Some(13), None)], + ), + new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(None, Some(3))]), + ]; + + let mut strategy = LastNotNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[ + new_batch_multi_fields(b"k1", &[1], &[6], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[2], &[7], &[OpType::Put], &[(Some(22), Some(222))]), + new_batch_multi_fields(b"k1", &[3], &[5], &[OpType::Put], &[(Some(13), Some(3))]), + ], + ); + } }