From f6bafe8f9ee734d227817d34db3a0e107a1c10a4 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 14 Jun 2023 03:41:20 +0800 Subject: [PATCH] fix accordingly Signed-off-by: Richard Chien --- src/batch/src/executor/top_n.rs | 6 +++--- src/common/benches/bench_encoding.rs | 3 ++- .../src/executor/aggregation/agg_state_cache.rs | 5 +++-- src/stream/src/executor/aggregation/minput.rs | 2 +- src/stream/src/executor/over_window/eowc.rs | 12 ++++-------- src/stream/src/executor/over_window/mod.rs | 2 -- src/stream/src/executor/over_window/state/mod.rs | 2 +- src/stream/src/executor/sort_buffer.rs | 6 ++---- 8 files changed, 16 insertions(+), 22 deletions(-) diff --git a/src/batch/src/executor/top_n.rs b/src/batch/src/executor/top_n.rs index 4c19c4e563cf0..4eca84596c9a7 100644 --- a/src/batch/src/executor/top_n.rs +++ b/src/batch/src/executor/top_n.rs @@ -25,7 +25,7 @@ use risingwave_common::estimate_size::EstimateSize; use risingwave_common::memory::MemoryContext; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::memcmp_encoding::encode_chunk; +use risingwave_common::util::memcmp_encoding::{encode_chunk, MemcmpEncoded}; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -200,7 +200,7 @@ impl TopNHeap { #[derive(Clone, EstimateSize)] pub struct HeapElem { - encoded_row: Vec, + encoded_row: MemcmpEncoded, row: OwnedRow, } @@ -225,7 +225,7 @@ impl Ord for HeapElem { } impl HeapElem { - pub fn new(encoded_row: Vec, row: impl Row) -> Self { + pub fn new(encoded_row: MemcmpEncoded, row: impl Row) -> Self { Self { encoded_row, row: row.into_owned_row(), diff --git a/src/common/benches/bench_encoding.rs b/src/common/benches/bench_encoding.rs index 994b86dabd255..ffa4005cb8c32 100644 --- a/src/common/benches/bench_encoding.rs +++ b/src/common/benches/bench_encoding.rs @@ -19,6 +19,7 @@ use risingwave_common::array::{ListValue, StructValue}; use risingwave_common::types::{ DataType, Date, Datum, Interval, ScalarImpl, StructType, Time, Timestamp, }; +use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::{memcmp_encoding, value_encoding}; @@ -42,7 +43,7 @@ impl Case { } } -fn key_serialization(datum: &Datum) -> Vec { +fn key_serialization(datum: &Datum) -> MemcmpEncoded { let result = memcmp_encoding::encode_value( datum.as_ref().map(ScalarImpl::as_scalar_ref_impl), OrderType::default(), diff --git a/src/stream/src/executor/aggregation/agg_state_cache.rs b/src/stream/src/executor/aggregation/agg_state_cache.rs index 22d120268f46d..0fa83143ab67c 100644 --- a/src/stream/src/executor/aggregation/agg_state_cache.rs +++ b/src/stream/src/executor/aggregation/agg_state_cache.rs @@ -17,6 +17,7 @@ use risingwave_common::array::{ArrayImpl, Op}; use risingwave_common::buffer::Bitmap; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::types::{Datum, DatumRef}; +use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_common::util::row_serde::OrderedRowSerde; use smallvec::SmallVec; @@ -24,7 +25,7 @@ use super::minput_agg_impl::MInputAggregator; use crate::common::cache::{StateCache, StateCacheFiller}; /// Cache key type. -type CacheKey = Vec; +type CacheKey = MemcmpEncoded; // TODO(yuchao): May extract common logic here to `struct [Data/Stream]ChunkRef` if there's other // usage in the future. https://github.com/risingwavelabs/risingwave/pull/5908#discussion_r1002896176 @@ -76,7 +77,7 @@ impl<'a> Iterator for StateCacheInputBatch<'a> { .map(|col_idx| self.columns[*col_idx].value_at(self.idx)), &mut key, ); - key + key.into() }; let value = self .arg_col_indices diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index fbb3bbd26393b..723f6eafb614e 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -204,7 +204,7 @@ impl MaterializedInputState { .project(&self.state_table_order_col_indices), &mut cache_key, ); - cache_key + cache_key.into() }; let cache_value = self .state_table_arg_col_indices diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 317f76e3cd3e3..87cf3cbbde998 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -24,7 +24,7 @@ use risingwave_common::estimate_size::EstimateSize; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; -use risingwave_common::util::memcmp_encoding; +use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; use risingwave_common::util::sort_util::OrderType; use risingwave_common::{must_match, row}; use risingwave_expr::function::window::WindowFuncCall; @@ -32,7 +32,6 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; use super::state::{create_window_state, EstimatedVecDeque, WindowState}; -use super::MemcmpEncoded; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::table::state_table::StateTable; use crate::executor::over_window::state::{StateEvictHint, StateKey}; @@ -241,8 +240,7 @@ impl EowcOverWindowExecutor { let encoded_pk = memcmp_encoding::encode_row( (&row).project(&this.input_pk_indices), &vec![OrderType::ascending(); this.input_pk_indices.len()], - )? - .into_boxed_slice(); + )?; let key = StateKey { order_key: order_key.into(), encoded_pk, @@ -292,8 +290,7 @@ impl EowcOverWindowExecutor { let encoded_partition_key = memcmp_encoding::encode_row( &partition_key, &vec![OrderType::ascending(); this.partition_key_indices.len()], - )? - .into_boxed_slice(); + )?; // Get the partition. Self::ensure_key_in_cache( @@ -316,8 +313,7 @@ impl EowcOverWindowExecutor { let encoded_pk = memcmp_encoding::encode_row( input_row.project(&this.input_pk_indices), &vec![OrderType::ascending(); this.input_pk_indices.len()], - )? - .into_boxed_slice(); + )?; let key = StateKey { order_key: order_key.into(), encoded_pk, diff --git a/src/stream/src/executor/over_window/mod.rs b/src/stream/src/executor/over_window/mod.rs index fc461bb70c204..d44415a84bf50 100644 --- a/src/stream/src/executor/over_window/mod.rs +++ b/src/stream/src/executor/over_window/mod.rs @@ -16,5 +16,3 @@ mod eowc; mod state; pub use eowc::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; - -type MemcmpEncoded = Box<[u8]>; diff --git a/src/stream/src/executor/over_window/state/mod.rs b/src/stream/src/executor/over_window/state/mod.rs index 8bbc411f3ce43..aa2e7435fe8fd 100644 --- a/src/stream/src/executor/over_window/state/mod.rs +++ b/src/stream/src/executor/over_window/state/mod.rs @@ -17,10 +17,10 @@ use std::collections::{BTreeSet, VecDeque}; use educe::Educe; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::types::{Datum, DefaultOrdered, ScalarImpl}; +use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_expr::function::window::{WindowFuncCall, WindowFuncKind}; use smallvec::SmallVec; -use super::MemcmpEncoded; use crate::executor::{StreamExecutorError, StreamExecutorResult}; mod buffer; diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index 17f2c9bf4a870..769d8f5c89785 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -27,6 +27,7 @@ use risingwave_common::row::{self, OwnedRow, Row, RowExt}; use risingwave_common::types::{ DefaultOrd, DefaultOrdered, ScalarImpl, ScalarRefImpl, ToOwnedDatum, }; +use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use risingwave_storage::row_serde::row_serde_util::deserialize_pk_with_vnode; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -35,9 +36,6 @@ use super::{StreamExecutorError, StreamExecutorResult}; use crate::common::cache::{OrderedStateCache, StateCache, StateCacheFiller}; use crate::common::table::state_table::StateTable; -// TODO(rc): This should be a struct in `memcmp_encoding` module. See #8606. -type MemcmpEncoded = Box<[u8]>; - type CacheKey = ( DefaultOrdered, // sort (watermark) column value MemcmpEncoded, // memcmp-encoded pk @@ -56,7 +54,7 @@ fn row_to_cache_key( buffer_table .pk_serde() .serialize((&row).project(buffer_table.pk_indices()), &mut pk); - (timestamp_val.into(), pk.into_boxed_slice()) + (timestamp_val.into(), pk.into()) } /// [`SortBuffer`] is a common component that consume an unordered stream and produce an ordered