Skip to content

Commit

Permalink
feat: setup skeleton (pola-rs#16900)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 12, 2024
1 parent cd68100 commit debcade
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ polars-pipe = { version = "0.40.0", path = "crates/polars-pipe", default-feature
polars-plan = { version = "0.40.0", path = "crates/polars-plan", default-features = false }
polars-row = { version = "0.40.0", path = "crates/polars-row", default-features = false }
polars-sql = { version = "0.40.0", path = "crates/polars-sql", default-features = false }
polars-stream = { version = "0.40.0", path = "crates/polars-stream", default-features = false }
polars-time = { version = "0.40.0", path = "crates/polars-time", default-features = false }
polars-utils = { version = "0.40.0", path = "crates/polars-utils", default-features = false }

Expand Down
20 changes: 20 additions & 0 deletions crates/polars-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ impl ExpressionConversionState {
}
}

pub fn create_physical_expr_streaming(
expr_ir: &ExprIR,
expr_arena: &Arena<AExpr>,
schema: Option<&SchemaRef>,
state: &mut ExpressionConversionState,
) -> PolarsResult<Arc<dyn PhysicalExpr>> {
let phys_expr =
create_physical_expr_inner(expr_ir.node(), Context::Default, expr_arena, schema, state)?;

if let Some(name) = expr_ir.get_alias() {
Ok(Arc::new(AliasExpr::new(
phys_expr,
name.clone(),
node_to_expr(expr_ir.node(), expr_arena),
)))
} else {
Ok(phys_expr)
}
}

pub fn create_physical_expr(
expr_ir: &ExprIR,
ctxt: Context,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ polars-json = { workspace = true, optional = true }
polars-ops = { workspace = true, features = ["chunked_ids"] }
polars-pipe = { workspace = true, optional = true }
polars-plan = { workspace = true }
polars-stream = { workspace = true, optional = true }
polars-time = { workspace = true, optional = true }
polars-utils = { workspace = true }

Expand All @@ -39,6 +40,7 @@ version_check = { workspace = true }
[features]
nightly = ["polars-core/nightly", "polars-pipe?/nightly", "polars-plan/nightly"]
streaming = ["polars-pipe", "polars-plan/streaming", "polars-ops/chunked_ids", "polars-expr/streaming"]
new-streaming = ["polars-stream"]
parquet = ["polars-io/parquet", "polars-plan/parquet", "polars-pipe?/parquet", "polars-expr/parquet"]
async = [
"polars-plan/async",
Expand Down
17 changes: 17 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl LazyFrame {
eager: false,
fast_projection: false,
row_estimate: false,
new_streaming: false,
})
}

Expand Down Expand Up @@ -210,6 +211,11 @@ impl LazyFrame {
self
}

pub fn with_new_streaming(mut self, toggle: bool) -> Self {
self.opt_state.new_streaming = toggle;
self
}

/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
self.opt_state.row_estimate = toggle;
Expand Down Expand Up @@ -681,6 +687,17 @@ impl LazyFrame {
/// }
/// ```
pub fn collect(self) -> PolarsResult<DataFrame> {
#[cfg(feature = "new-streaming")]
{
if self.opt_state.new_streaming {
let alp_plan = self.to_alp_optimized()?;
return polars_stream::run_query(
alp_plan.lp_top,
alp_plan.lp_arena,
alp_plan.expr_arena,
);
}
}
self._collect_post_opt(|_, _, _| Ok(()))
}

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/frame/opt_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct OptState {
pub fast_projection: bool,
/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
pub row_estimate: bool,
pub new_streaming: bool,
}

impl Default for OptState {
Expand All @@ -52,6 +53,7 @@ impl Default for OptState {
fast_projection: true,
eager: false,
row_estimate: true,
new_streaming: false,
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
polars-utils = { workspace = true }
rand = { workspace = true }
recursive = { workspace = true }
slotmap = { workspace = true }

polars-core = { workspace = true }
polars-error = { workspace = true }
polars-expr = { workspace = true }
polars-plan = { workspace = true }

[build-dependencies]
version_check = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
mod async_primitives;
#[allow(unused)]
mod executor;
mod skeleton;

pub use skeleton::run_query;

pub async fn dummy() {
let num_threads = 8;
Expand Down
98 changes: 98 additions & 0 deletions crates/polars-stream/src/skeleton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#![allow(warnings, unused)] // TODO: remove me
use polars_core::prelude::*;
use polars_core::POOL;
use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState};
use polars_plan::logical_plan::{Context, IR};
use polars_plan::prelude::expr_ir::ExprIR;
use polars_plan::prelude::AExpr;
use polars_utils::arena::{Arena, Node};

fn is_streamable(node: Node, arena: &Arena<AExpr>) -> bool {
polars_plan::logical_plan::is_streamable(node, arena, Context::Default)
}

pub fn run_query(
node: Node,
mut ir_arena: Arena<IR>,
mut expr_arena: Arena<AExpr>,
) -> PolarsResult<DataFrame> {
let mut lir_arena = Arena::with_capacity(ir_arena.len());
let root = lower_ir(node, &mut ir_arena, &mut expr_arena, &mut lir_arena)?;
let expr_depth_limit = get_expr_depth_limit()?;
let mut expr_conversion_state = ExpressionConversionState::new(false, expr_depth_limit);
let max_threads = POOL.current_num_threads();

match lir_arena.take(root) {
LogicalPlan::Filter { input, predicate } => {
let phys_expr = create_physical_expr(
&predicate,
Context::Default,
&expr_arena,
None,
&mut expr_conversion_state,
)?;
todo!()
},
_ => todo!(),
}
// todo!
}

#[recursive::recursive]
fn lower_ir(
node: Node,
ir_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
lir_arena: &mut Arena<LogicalPlan>,
) -> PolarsResult<Node> {
let node = match ir_arena.get(node) {
IR::Filter { input, predicate } if is_streamable(predicate.node(), expr_arena) => {
let predicate = predicate.clone();
let input = lower_ir(*input, ir_arena, expr_arena, lir_arena)?;
lir_arena.add(LogicalPlan::Filter { input, predicate })
},
IR::DataFrameScan {
df,
schema,
output_schema,
projection,
filter,
} => {
if let Some(filter) = filter {
if !is_streamable(filter.node(), expr_arena) {
return Ok(lir_arena.add(LogicalPlan::FallBack(node)));
}
}
lir_arena.add(LogicalPlan::DataFrameScan {
df: df.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
projection: projection.clone(),
filter: filter.clone(),
})
},
_ => return Ok(lir_arena.add(LogicalPlan::FallBack(node))),
};
Ok(node)
}

/// Invariant that all expression are elementwise.
#[derive(Clone, Default)]
enum LogicalPlan {
DataFrameScan {
df: Arc<DataFrame>,
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
projection: Option<Arc<[String]>>,
filter: Option<ExprIR>,
},
Filter {
input: Node,
predicate: ExprIR,
},
// Fallback to in-memory engine
FallBack(Node),
#[default]
Unreachable,
}
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ cloud = ["polars/cloud", "polars/aws", "polars/gcp", "polars/azure", "polars/htt
peaks = ["polars/peaks"]
hist = ["polars/hist"]
find_many = ["polars/find_many"]
new_streaming = ["polars-lazy/new-streaming"]

dtype-i8 = []
dtype-i16 = []
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/functions/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,7 @@ def collect_all(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)
prepared.append(ldf)

Expand Down Expand Up @@ -1850,6 +1851,7 @@ def collect_all_async(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)
prepared.append(ldf)

Expand Down
9 changes: 9 additions & 0 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ def explain(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)
if format == "tree":
return ldf.describe_optimized_plan_tree()
Expand Down Expand Up @@ -1101,6 +1102,7 @@ def show_graph(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)

dot = _ldf.to_dot(optimized)
Expand Down Expand Up @@ -1628,6 +1630,7 @@ def profile(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)
df, timings = ldf.profile()
(df, timings) = wrap_df(df), wrap_df(timings)
Expand Down Expand Up @@ -1815,6 +1818,8 @@ def collect(
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘
"""
new_streaming = _kwargs.get("new_streaming", False)

if no_optimization or _eager:
predicate_pushdown = False
projection_pushdown = False
Expand All @@ -1838,6 +1843,7 @@ def collect(
cluster_with_columns,
streaming,
_eager,
new_streaming,
)
if background:
return InProcessQuery(ldf.collect_concurrently())
Expand Down Expand Up @@ -2013,6 +2019,7 @@ def collect_async(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)

result = _GeventDataFrameResult() if gevent else _AioDataFrameResult()
Expand Down Expand Up @@ -2448,6 +2455,7 @@ def _set_sink_optimizations(
cluster_with_columns=False,
streaming=True,
_eager=False,
new_streaming=False,
)

def fetch(
Expand Down Expand Up @@ -2550,6 +2558,7 @@ def fetch(
cluster_with_columns,
streaming,
_eager=False,
new_streaming=False,
)
return wrap_df(lf.fetch(n_rows))

Expand Down
6 changes: 6 additions & 0 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ impl PyLazyFrame {
cluster_with_columns: bool,
streaming: bool,
_eager: bool,
#[allow(unused_variables)] new_streaming: bool,
) -> Self {
let ldf = self.ldf.clone();
let mut ldf = ldf
Expand All @@ -503,6 +504,11 @@ impl PyLazyFrame {
._with_eager(_eager)
.with_projection_pushdown(projection_pushdown);

#[cfg(feature = "new_streaming")]
{
ldf = ldf.with_new_streaming(new_streaming);
}

#[cfg(feature = "cse")]
{
ldf = ldf.with_comm_subplan_elim(comm_subplan_elim);
Expand Down

0 comments on commit debcade

Please sign in to comment.