Skip to content

Commit

Permalink
chore(common): move compact_chunk from common to stream (#16081)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Apr 2, 2024
1 parent bc4f766 commit 347f448
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ postgres-types = { version = "0.2.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
] }
prehash = "1"
prometheus = { version = "0.13" }
prost = { workspace = true }
rand = "0.8"
Expand Down
2 changes: 0 additions & 2 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use arrow::{
mod bool_array;
pub mod bytes_array;
mod chrono_array;
pub mod compact_chunk;
mod data_chunk;
pub mod data_chunk_iter;
mod decimal_array;
Expand Down Expand Up @@ -52,7 +51,6 @@ pub use chrono_array::{
DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
TimestampArrayBuilder, TimestamptzArray, TimestamptzArrayBuilder,
};
pub use compact_chunk::*;
pub use data_chunk::{DataChunk, DataChunkTestExt};
pub use data_chunk_iter::RowRef;
pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ memcomparable = "0.2"
multimap = "0.10"
parking_lot = "0.12"
pin-project = "1"
prehash = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = { workspace = true }
rand = "0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ use std::mem;

use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};
use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::row::{Project, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::hash_util::Crc32FastBuilder;

use super::stream_chunk::{OpRowMutRef, StreamChunkMut};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::stream_record::Record;
use super::DataType;
use crate::array::{Op, RowRef, StreamChunk};
use crate::row::{Project, RowExt};
use crate::util::hash_util::Crc32FastBuilder;

/// A helper to compact the stream chunks with just modify the `Ops` and visibility of the chunk.
/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
pub struct StreamChunkCompactor {
chunks: Vec<StreamChunk>,
key: Vec<usize>,
Expand Down Expand Up @@ -292,9 +291,10 @@ pub fn merge_chunk_row(stream_chunk: StreamChunk, pk_indices: &[usize]) -> Strea

#[cfg(test)]
mod tests {
use risingwave_common::array::StreamChunk;
use risingwave_common::test_prelude::StreamChunkTestExt;

use super::*;
use crate::array::StreamChunk;
use crate::test_prelude::StreamChunkTestExt;

#[test]
fn test_merge_chunk_row() {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;

pub mod cache;
mod column_mapping;
pub mod compact_chunk;
pub mod log_store_impl;
pub mod metrics;
pub mod table;
3 changes: 2 additions & 1 deletion src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::DataType;
Expand All @@ -36,6 +36,7 @@ use thiserror_ext::AsReport;

use super::error::{StreamExecutorError, StreamExecutorResult};
use super::{Execute, Executor, ExecutorInfo, Message, PkIndices};
use crate::common::compact_chunk::{merge_chunk_row, StreamChunkCompactor};
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedMessageStream, MessageStream, Mutation,
};
Expand Down

0 comments on commit 347f448

Please sign in to comment.