Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update arrow to 51, datafusion to 37 #2240

Merged
merged 5 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ lance-test-macros = { version = "=0.10.15", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.10.15", path = "./rust/lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "50.0.0", optional = false, features = ["prettyprint"] }
arrow-arith = "50.0"
arrow-array = "50.0"
arrow-buffer = "50.0"
arrow-cast = "50.0"
arrow-data = "50.0"
arrow-ipc = { version = "50.0", features = ["zstd"] }
arrow-ord = "50.0"
arrow-row = "50.0"
arrow-schema = "50.0"
arrow-select = "50.0"
arrow = { version = "51.0.0", optional = false, features = ["prettyprint"] }
arrow-arith = "51.0"
arrow-array = "51.0"
arrow-buffer = "51.0"
arrow-cast = "51.0"
arrow-data = "51.0"
arrow-ipc = { version = "51.0", features = ["zstd"] }
arrow-ord = "51.0"
arrow-row = "51.0"
arrow-schema = "51.0"
arrow-select = "51.0"
async-recursion = "1.0"
async-trait = "0.1"
aws-config = "0.56"
Expand All @@ -85,14 +85,18 @@ chrono = { version = "0.4.25", default-features = false, features = [
"now",
] }
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion = { version = "36.0.0", default-features = false, features = [
datafusion = { version = "37.0.0", default-features = false, features = [
"array_expressions",
"regex_expressions",
] }
datafusion-common = "37.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datafusion is on 37.1 https://docs.rs/datafusion/latest/datafusion/

Should we bump to datafusion 37.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yea, maybe just 37 for simplicity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, probably best to do 37.1 just to avoid issues. It doesn't change that often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have a reason to require >=37.1, I feel like it would be nice to keep at 37.0. Then users can use any 37.X version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean 37? Or does 37.0 allow 37.1?

Scanning through apache/datafusion#9904 I don't see any defects that would impact us.

datafusion-functions = { version = "37.0", features = ["regex_expressions"] }
datafusion-sql = "37.0"
datafusion-expr = "37.0"
datafusion-execution = "37.0"
datafusion-physical-expr = { version = "37.0", features = [
"regex_expressions",
] }
datafusion-common = "36.0"
datafusion-sql = "36.0"
datafusion-expr = "36.0"
datafusion-execution = "36.0"
datafusion-physical-expr = "36.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
Expand Down
8 changes: 4 additions & 4 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ name = "lance"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "50.0.0", features = ["pyarrow"] }
arrow-array = "50.0"
arrow-data = "50.0"
arrow-schema = "50.0"
arrow = { version = "51.0.0", features = ["pyarrow"] }
arrow-array = "51.0"
arrow-data = "51.0"
arrow-schema = "51.0"
object_store = "0.9.0"
async-trait = "0.1"
chrono = "0.4.31"
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ arrow-ord.workspace = true
async-trait.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait = { version = "36.0", optional = true }
datafusion-substrait = { version = "37.0", optional = true }
futures.workspace = true
lance-arrow.workspace = true
lance-core = { workspace = true, features = ["datafusion"] }
Expand Down
33 changes: 19 additions & 14 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use datafusion::{
TaskContext,
},
physical_plan::{
streaming::PartitionStream, DisplayAs, DisplayFormatType, ExecutionPlan,
streaming::PartitionStream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SendableRecordBatchStream,
},
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::Partitioning;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};

use lance_arrow::SchemaExt;
use lance_core::Result;
Expand All @@ -32,11 +32,15 @@ use log::{info, warn};
///
/// It can only be used once, and will return the stream. After that the node
/// is exhuasted.
///
/// Note: the stream should be finite, otherwise we will report datafusion properties
/// incorrectly.
pub struct OneShotExec {
stream: Mutex<Option<SendableRecordBatchStream>>,
// We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
// can still function after exhuasted
schema: Arc<ArrowSchema>,
properties: PlanProperties,
}

impl OneShotExec {
Expand All @@ -45,7 +49,12 @@ impl OneShotExec {
let schema = stream.schema().clone();
Self {
stream: Mutex::new(Some(stream)),
schema,
schema: schema.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::RoundRobinBatch(1),
datafusion::physical_plan::ExecutionMode::Bounded,
),
}
Comment on lines +52 to 58
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExecPlan interface changed. output_partitioning and output_ordering have gone away and properties() now replaces both of those (as well as a new field, "execution mode".

}
}
Expand Down Expand Up @@ -96,14 +105,6 @@ impl ExecutionPlan for OneShotExec {
self.schema.clone()
}

fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning {
Partitioning::RoundRobinBatch(1)
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down Expand Up @@ -135,7 +136,11 @@ impl ExecutionPlan for OneShotExec {
}

fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
todo!()
Ok(Statistics::new_unknown(&self.schema))
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
&self.properties
}
}

Expand Down Expand Up @@ -194,7 +199,7 @@ pub fn execute_plan(
let session_state = SessionState::new_with_config_rt(session_config, runtime_env);
// NOTE: we are only executing the first partition here. Therefore, if
// the plan has more than one partition, we will be missing data.
assert_eq!(plan.output_partitioning().partition_count(), 1);
assert_eq!(plan.properties().partitioning.partition_count(), 1);
Ok(plan.execute(0, session_state.task_ctx())?)
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ impl Ord for OrderableScalarValue {
(Dictionary(_k1, _v1), Dictionary(_k2, _v2)) => todo!(),
(Dictionary(_, v1), Null) => Self(*v1.clone()).cmp(&Self(ScalarValue::Null)),
(Dictionary(_, _), _) => panic!("Attempt to compare Dictionary with non-Dictionary"),
// What would a btree of unions even look like? May not be possible.
(Union(_, _, _), _) => todo!("Support for union scalars"),
(Null, Null) => Ordering::Equal,
(Null, _) => todo!(),
}
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-index/src/scalar/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ mod tests {
fn options(&self) -> &ConfigOptions {
todo!()
}

fn udfs_names(&self) -> Vec<String> {
todo!()
}

fn udafs_names(&self) -> Vec<String> {
todo!()
}

fn udwfs_names(&self) -> Vec<String> {
todo!()
}
}

fn check(
Expand Down
1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ arrow.workspace = true
num_cpus.workspace = true
# TODO: use datafusion sub-modules to reduce build size?
datafusion.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
lapack = { version = "0.19.0", optional = true }
lru_time_cache = "0.11"
Expand Down
140 changes: 93 additions & 47 deletions rust/lance/src/datafusion/logical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@

//! Extends logical expression.

use std::sync::Arc;

use arrow_schema::DataType;

use datafusion::logical_expr::ScalarFunctionDefinition;
use datafusion::logical_expr::ScalarUDF;
use datafusion::logical_expr::ScalarUDFImpl;
use datafusion::logical_expr::{
expr::ScalarFunction, BinaryExpr, BuiltinScalarFunction, GetFieldAccess, GetIndexedField,
Operator,
expr::ScalarFunction, BinaryExpr, GetFieldAccess, GetIndexedField, Operator,
};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use datafusion_functions::core::getfield::GetFieldFunc;
use lance_arrow::DataTypeExt;
use lance_datafusion::expr::safe_coerce_scalar;

Expand All @@ -34,6 +38,45 @@
}
}

/// A simple helper function that interprets an Expr as a string scalar
/// or returns None if it is not.
pub fn get_as_string_scalar_opt(expr: &Expr) -> Option<&String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: &String in a signature is pretty much never desirable.

Suggested change
pub fn get_as_string_scalar_opt(expr: &Expr) -> Option<&String> {
pub fn get_as_string_scalar_opt(expr: &Expr) -> Option<&str> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to &str.

match expr {
Expr::Literal(ScalarValue::Utf8(Some(s))) => Some(s),
_ => None,
}
}

// As part of the DF 37 release there are now two different ways to
// represent a nested field access in `Expr`. The old way is to use
// `Expr::field` which returns a `GetStructField` and the new way is
// to use `Expr::ScalarFunction` with a `GetFieldFunc` UDF.
//
// Currently, the old path leads to bugs in DF. This is probably a
// bug and will probably be fixed in a future version. In the meantime
// we need to make sure we are always using the new way to avoid this
// bug. This trait adds field_newstyle which lets us easily create
// logical `Expr` that use the new style.
pub trait ExprExt {
// Helper function to replace Expr::field in DF 37 since DF
// confuses itself with the GetStructField returned by Expr::field
fn field_newstyle(&self, name: &str) -> Expr;

Check warning on line 63 in rust/lance/src/datafusion/logical_expr.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

method `field_newstyle` is never used
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the most complicated change. The comment explains it ok. I expect this may eventually go away (e.g. when apache/datafusion#10181 is resolved)

}

impl ExprExt for Expr {
fn field_newstyle(&self, name: &str) -> Expr {
Expr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
GetFieldFunc::default(),
))),
args: vec![
self.clone(),
Expr::Literal(ScalarValue::Utf8(Some(name.to_string()))),
],
})
}
}

/// Given a Expr::Column or Expr::GetIndexedField, get the data type of referenced
/// field in the schema.
///
Expand All @@ -49,6 +92,15 @@
field_path.push(c.name.as_str());
break;
}
Expr::ScalarFunction(udf) => {
if udf.name() == GetFieldFunc::default().name() {
let name = get_as_string_scalar_opt(&udf.args[1])?;
field_path.push(&name);
current_expr = &udf.args[0];
} else {
return None;
}
}
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
if let GetFieldAccess::NamedStructField {
name: ScalarValue::Utf8(Some(name)),
Expand Down Expand Up @@ -87,52 +139,41 @@
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
if matches!(op, Operator::And | Operator::Or) {
return Ok(Expr::BinaryExpr(BinaryExpr {
Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(resolve_expr(left.as_ref(), schema)?),
op: *op,
right: Box::new(resolve_expr(right.as_ref(), schema)?),
}));
}
match (left.as_ref(), right.as_ref()) {
(Expr::Column(_) | Expr::GetIndexedField(_), Expr::Literal(_)) => {
if let Some(resolved_type) = resolve_column_type(left.as_ref(), schema) {
Ok(Expr::BinaryExpr(BinaryExpr {
left: left.clone(),
op: *op,
right: Box::new(resolve_value(right.as_ref(), &resolved_type)?),
}))
} else {
Ok(expr.clone())
}
}
(Expr::Literal(_), Expr::Column(_) | Expr::GetIndexedField(_)) => {
if let Some(resolved_type) = resolve_column_type(right.as_ref(), schema) {
Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(resolve_value(left.as_ref(), &resolved_type)?),
op: *op,
right: right.clone(),
}))
} else {
Ok(expr.clone())
}
}))
} else if let Some(left_type) = dbg!(resolve_column_type(left.as_ref(), schema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over dbg!?

Suggested change
} else if let Some(left_type) = dbg!(resolve_column_type(left.as_ref(), schema)) {
} else if let Some(left_type) = resolve_column_type(left.as_ref(), schema) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed.

match right.as_ref() {
Expr::Literal(_) => Ok(Expr::BinaryExpr(BinaryExpr {
left: left.clone(),
op: *op,
right: Box::new(resolve_value(right.as_ref(), &left_type)?),
})),
// For cases complex expressions (not just literals) on right hand side like x = 1 + 1 + -2*2
Expr::BinaryExpr(r) => Ok(Expr::BinaryExpr(BinaryExpr {
left: left.clone(),
op: *op,
right: Box::new(Expr::BinaryExpr(BinaryExpr {
left: coerce_expr(&r.left, &left_type).map(Box::new)?,
op: r.op,
right: coerce_expr(&r.right, &left_type).map(Box::new)?,
})),
})),
_ => Ok(expr.clone()),
}
// For cases complex expressions (not just literals) on right hand side like x = 1 + 1 + -2*2
(Expr::Column(_) | Expr::GetIndexedField(_), Expr::BinaryExpr(r)) => {
if let Some(resolved_type) = resolve_column_type(left.as_ref(), schema) {
Ok(Expr::BinaryExpr(BinaryExpr {
left: left.clone(),
op: *op,
right: Box::new(Expr::BinaryExpr(BinaryExpr {
left: coerce_expr(&r.left, &resolved_type).map(Box::new)?,
op: r.op,
right: coerce_expr(&r.right, &resolved_type).map(Box::new)?,
})),
}))
} else {
Ok(expr.clone())
}
} else if let Some(right_type) = dbg!(resolve_column_type(right.as_ref(), schema)) {
match left.as_ref() {
Expr::Literal(_) => Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(resolve_value(left.as_ref(), &right_type)?),
op: *op,
right: right.clone(),
})),
_ => Ok(expr.clone()),
}
_ => Ok(expr.clone()),
} else {
Ok(expr.clone())
}
}
Expr::InList(in_list) => {
Expand Down Expand Up @@ -189,14 +230,19 @@
///
/// - *expr*: a datafusion logical expression
pub fn coerce_filter_type_to_boolean(expr: Expr) -> Result<Expr> {
match expr {
match &expr {
// TODO: consider making this dispatch more generic, i.e. fun.output_type -> coerce
// instead of hardcoding coerce method for each function
Expr::ScalarFunction(ScalarFunction {
func_def: ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::RegexpMatch),
func_def: ScalarFunctionDefinition::UDF(udf),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no more builtin functions. So we need to look for regexp_match as a UDF instead.

..
}) => Ok(Expr::IsNotNull(Box::new(expr))),

}) => {
if udf.name() == "regexp_match" {
Ok(Expr::IsNotNull(Box::new(expr)))
} else {
Ok(expr)
}
}
_ => Ok(expr),
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ impl Scanner {
&[],
&plan.schema(),
"",
false,
)?;
let plan_schema = plan.schema().clone();
let count_plan = Arc::new(AggregateExec::try_new(
Expand Down
Loading
Loading