Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): bump arrow v51 and datafusion v37.1 #2395

Merged
merged 11 commits into from
Apr 26, 2024
39 changes: 20 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,29 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50", features = ["chrono-tz"]}
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
arrow = { version = "51" }
arrow-arith = { version = "51" }
arrow-array = { version = "51", features = ["chrono-tz"] }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51" }
arrow-ipc = { version = "51" }
arrow-json = { version = "51" }
arrow-ord = { version = "51" }
arrow-row = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
object_store = { version = "0.9" }
parquet = { version = "50" }
parquet = { version = "51" }

# datafusion
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }
datafusion = { version = "37" }
datafusion-expr = { version = "37" }
datafusion-common = { version = "37" }
datafusion-proto = { version = "37" }
datafusion-sql = { version = "37" }
datafusion-physical-expr = { version = "37" }
datafusion-functions = { version = "37" }
datafusion-functions-array = { version = "37" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }
datafusion-functions-array = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -123,6 +124,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"datafusion-functions-array",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider {
self.tables.iter().map(|t| t.key().clone()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let location = self.tables.get(name).map(|t| t.clone())?;
let provider = open_table_with_storage_options(location, self.storage_options.0.clone())
.await
.ok()?;
Some(Arc::new(provider) as Arc<dyn TableProvider>)
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
}

fn register_table(
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use tracing::error;

use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse};
Expand Down Expand Up @@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider {
self.table_names.clone()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let maybe_table = self
.client
.get_table(&self.catalog_name, &self.schema_name, name)
.await
.ok()?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;

match maybe_table {
GetTableResponse::Success(table) => {
let table = DeltaTableBuilder::from_uri(table.storage_location)
.with_storage_options(self.storage_options.clone())
.load()
.await
.ok()?;
Some(Arc::new(table))
.await?;
Ok(Some(Arc::new(table)))
}
GetTableResponse::Error(err) => {
error!("failed to fetch table from unity catalog: {}", err.message);
None
Err(DataFusionError::External(Box::new(err)))
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/data_catalog/unity/models.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
//! Api models for databricks unity catalog APIs

use core::fmt;
use std::collections::HashMap;

use serde::Deserialize;

/// Error response from unity API
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
/// The error code
pub error_code: String,
/// The error message
pub message: String,
}
impl fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "[{}] {}", self.error_code, self.message)
}
}
impl std::error::Error for ErrorResponse {}

/// List catalogs response
#[derive(Deserialize)]
Expand Down
19 changes: 16 additions & 3 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
//! Utility functions for Datafusion's Expressions

use std::{
fmt::{self, format, Display, Error, Formatter, Write},
fmt::{self, Display, Error, Formatter, Write},
sync::Arc,
};

use arrow_schema::DataType;
use chrono::{Date, NaiveDate, NaiveDateTime, TimeZone};
use chrono::{NaiveDate, NaiveDateTime};
use datafusion::execution::context::SessionState;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
Expand Down Expand Up @@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}

fn udfs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udafs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udwfs_names(&self) -> Vec<String> {
unimplemented!()
}
}

/// Parse a string predicate into an `Expr`
Expand Down Expand Up @@ -417,8 +429,9 @@ mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_expr::{col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use arrow_schema::SchemaRef;
use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::Expr;
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::stream::BoxStream;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};

Expand All @@ -28,6 +30,7 @@ pub struct FindFilesExec {
predicate: Expr,
state: DeltaTableState,
log_store: LogStoreRef,
plan_properties: PlanProperties,
}

impl FindFilesExec {
Expand All @@ -36,6 +39,11 @@ impl FindFilesExec {
predicate,
log_store,
state,
plan_properties: PlanProperties::new(
EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()),
Partitioning::RoundRobinBatch(num_cpus::get()),
ExecutionMode::Bounded,
),
})
}
}
Expand Down Expand Up @@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec {
ONLY_FILES_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::RoundRobinBatch(num_cpus::get())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
38 changes: 19 additions & 19 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,15 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider,
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema,
};
Expand Down Expand Up @@ -819,12 +818,8 @@ impl ExecutionPlan for DeltaScan {
self.parquet_scan.schema()
}

fn output_partitioning(&self) -> Partitioning {
self.parquet_scan.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.parquet_scan.output_ordering()
fn properties(&self) -> &PlanProperties {
self.parquet_scan.properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -916,6 +911,10 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::RunEndEncoded(_, _)
| ArrowDataType::BinaryView
| ArrowDataType::Utf8View
| ArrowDataType::LargeListView(_)
| ArrowDataType::ListView(_)
| ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
"Unsupported data type for Delta Lake {}",
t
Expand Down Expand Up @@ -1033,8 +1032,8 @@ pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let data = futures::future::try_join_all(
(0..plan.properties().output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
Expand All @@ -1046,8 +1045,9 @@ pub(crate) async fn execute_plan_to_batch(

DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;
}),
)
.await?;

let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;

Expand Down Expand Up @@ -1297,9 +1297,9 @@ pub(crate) struct FindFilesExprProperties {
/// non-deterministic functions, and determine if the expression only contains
/// partition columns
impl TreeNodeVisitor for FindFilesExprProperties {
type N = Expr;
type Node = Expr;

fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result<VisitRecursion> {
fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
// TODO: We can likely relax the volatility to STABLE. Would require further
// research to confirm the same value is generated during the scan and
// rewrite phases.
Expand Down Expand Up @@ -1340,27 +1340,27 @@ impl TreeNodeVisitor for FindFilesExprProperties {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
};
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains nondeterministic function {}",
func_def.name()
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
}
_ => {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains unsupported expression {}",
expr
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
}

Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}

Expand Down
8 changes: 2 additions & 6 deletions crates/core/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,8 @@ impl ExecutionPlan for MetricObserverExec {
self.parent.schema()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
self.parent.output_partitioning()
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
self.parent.output_ordering()
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.parent.properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/kernel/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ impl Scalar {
| Dictionary(_, _)
| RunEndEncoded(_, _)
| Union(_, _)
| Utf8View
| BinaryView
| ListView(_)
| LargeListView(_)
| Null => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl std::future::IntoFuture for ConstraintBuilder {

let plan: Arc<dyn ExecutionPlan> = Arc::new(scan);
let mut tasks = vec![];
for p in 0..plan.output_partitioning().partition_count() {
for p in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_checker = checker.clone();
let task_ctx = Arc::new(TaskContext::from(&state));
Expand Down
Loading
Loading