Skip to content

Commit

Permalink
fix(state clean): state clean should not delete NULL (risingwavelabs#…
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 24, 2023
1 parent 4b03a93 commit be9723e
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,22 +808,28 @@ where
} else {
Some(self.pk_serde.prefix(1))
};
let range_end_suffix = watermark.map(|watermark| {
let watermark_suffix = watermark.map(|watermark| {
serialize_pk(
row::once(Some(watermark)),
prefix_serializer.as_ref().unwrap(),
)
});
if let Some(range_end_suffix) = range_end_suffix {
let range_begin_suffix = vec![];
trace!(table_id = %self.table_id, range_end = ?range_end_suffix, vnodes = ?{
if let Some(watermark_suffix) = watermark_suffix {
// We either serialize null into `0u8`, data into `(1u8 || scalar)`, or serialize null
// into `1u8`, data into `(0u8 || scalar)`. We do not want to delete null
// here, so `range_begin_suffix` cannot be `vec![]` when null is represented as `0u8`.
let range_begin_suffix = watermark_suffix
.first()
.map(|bit| vec![*bit])
.unwrap_or_default();
trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
self.vnodes.iter_vnodes().collect_vec()
}, "delete range");
for vnode in self.vnodes.iter_vnodes() {
let mut range_begin = vnode.to_be_bytes().to_vec();
let mut range_end = range_begin.clone();
range_begin.extend(&range_begin_suffix);
range_end.extend(&range_end_suffix);
range_end.extend(&watermark_suffix);
delete_ranges.push((Bytes::from(range_begin), Bytes::from(range_end)));
}
}
Expand Down

0 comments on commit be9723e

Please sign in to comment.