Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): add eval_batch for ScalarExpr #4551

Merged
merged 17 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workspace = true

[dependencies]
api.workspace = true
arrow.workspace = true
arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
Expand Down
34 changes: 10 additions & 24 deletions src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,21 @@
//!
//! And the [`Context`] is the environment for the render process, it contains all the necessary information for the render process

use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::rc::Rc;

use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::futures::SinkExt;
use hydroflow::lattices::cc_traits::Get;
use std::collections::BTreeMap;

use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::OptionExt;

use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
};
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};

mod map;
mod reduce;
Expand Down Expand Up @@ -218,20 +207,17 @@ mod test {
use std::cell::RefCell;
use std::rc::Rc;

use common_time::DateTime;
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use pretty_assertions::{assert_eq, assert_ne};
use pretty_assertions::assert_eq;

use super::*;
use crate::expr::BinaryFunc;
use crate::repr::Row;
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
time_range: Range<i64>,
time_range: std::ops::Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
) {
Expand Down
5 changes: 2 additions & 3 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::{Plan, TypedPlan};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;

Expand Down Expand Up @@ -206,8 +206,6 @@ fn eval_mfp_core(

#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;

use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
Expand All @@ -216,6 +214,7 @@ mod test {
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, BinaryFunc, GlobalId};
use crate::plan::Plan;
use crate::repr::{ColumnType, RelationType};

/// test if temporal filter works properly
Expand Down
13 changes: 6 additions & 7 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ use std::ops::Range;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};

use crate::compute::render::{Context, SubgraphArg};
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::expr::{EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};

Expand Down Expand Up @@ -790,8 +788,6 @@ fn from_val_to_slice_idx(
// TODO(discord9): add tests for accum ser/de
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;

use common_time::{DateTime, Interval, Timestamp};
use datatypes::data_type::{ConcreteDataType, ConcreteDataType as CDT};
Expand All @@ -800,7 +796,10 @@ mod test {
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc};
use crate::expr::{
self, AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, UnaryFunc,
};
use crate::plan::Plan;
use crate::repr::{ColumnType, RelationType};

/// SELECT sum(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00')
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::collections::{BTreeMap, VecDeque};

use common_telemetry::{debug, info};
use common_telemetry::debug;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
Expand All @@ -27,7 +27,7 @@ use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{EvalError, GlobalId};
use crate::expr::EvalError;
use crate::repr::{DiffRow, Row, BROADCAST_CAP};

#[allow(clippy::mutable_key_type)]
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/compute/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;

use hydroflow::scheduled::graph::Hydroflow;
Expand Down
5 changes: 2 additions & 3 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use itertools::Itertools;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use crate::compute::render::Context;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::DiffRow;
use crate::utils::{ArrangeHandler, Arrangement};
use crate::utils::ArrangeHandler;

pub type Toff<T = DiffRow> = TeeingHandoff<T>;

Expand Down
88 changes: 86 additions & 2 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.

mod df_func;
pub(crate) mod error;
mod func;
mod id;
Expand All @@ -22,9 +23,92 @@ mod relation;
mod scalar;
mod signature;

pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
use datatypes::prelude::DataType;
use datatypes::vectors::VectorRef;
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub(crate) use id::{GlobalId, Id, LocalId};
use itertools::Itertools;
pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan};
pub(crate) use relation::{AggregateExpr, AggregateFunc};
pub(crate) use scalar::{DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr};
pub(crate) use scalar::{ScalarExpr, TypedExpr};
use snafu::{ensure, ResultExt};

use crate::expr::error::DataTypeSnafu;

/// A batch of vectors with the same length but without schema, only useful in dataflow
pub struct Batch {
batch: Vec<VectorRef>,
row_count: usize,
}

impl Batch {
pub fn new(batch: Vec<VectorRef>, row_count: usize) -> Self {
Self { batch, row_count }
}

pub fn batch(&self) -> &[VectorRef] {
&self.batch
}

pub fn row_count(&self) -> usize {
self.row_count
}

/// Slices the `Batch`, returning a new `Batch`.
///
/// # Panics
/// This function panics if `offset + length > self.row_count()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let batch = self
.batch()
.iter()
.map(|v| v.slice(offset, length))
.collect_vec();
Batch::new(batch, length)
}

/// append another batch to self
pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
ensure!(
self.batch.len() == other.batch.len(),
InvalidArgumentSnafu {
reason: format!(
"Expect two batch to have same numbers of column, found {} and {} columns",
self.batch.len(),
other.batch.len()
)
}
);

let batch_builders = self
.batch
.iter()
.map(|v| {
v.data_type()
.create_mutable_vector(self.row_count() + other.row_count())
})
.collect_vec();

let mut result = vec![];
let zelf_row_count = self.row_count();
let other_row_count = other.row_count();
for (idx, mut builder) in batch_builders.into_iter().enumerate() {
builder
.extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
builder
.extend_slice_of(other.batch()[idx].as_ref(), 0, other_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
result.push(builder.to_vector());
}
self.batch = result;
self.row_count = zelf_row_count + other_row_count;
Ok(())
}
}
Loading