Skip to content

Commit

Permalink
chore: df 36 upgrade (#2853)
Browse files Browse the repository at this point in the history
Still a few items todo

- [x] get functions removed from `BuiltinScalarFunctions` added back in
(Isnan, Encode, ...)
- [x] doublecheck `array_append` functionality
  • Loading branch information
universalmind303 authored and tychoish committed Apr 9, 2024
1 parent b37802e commit e780412
Show file tree
Hide file tree
Showing 21 changed files with 1,786 additions and 1,234 deletions.
2,392 changes: 1,301 additions & 1,091 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ wildcard_imports = "deny"

[workspace.dependencies]
clap = { version = "4.5.4", features = ["derive"] }
datafusion = { version = "35.0.0", features = ["avro"] }
datafusion = { version = "36.0.0", features = ["avro"] }
arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
datafusion-proto = { version = "35.0.0" }
datafusion-proto = { version = "36.0.0" }
reqwest = { version = "0.11.27", default-features = false, features = [
"json",
"rustls-tls",
Expand All @@ -43,6 +43,6 @@ tracing = "0.1"
url = "2.5.0"

[workspace.dependencies.deltalake]
git = "https://github.com/delta-io/delta-rs.git"
rev = "993e2c202936719855f8831513bcbab1b9930b94"
git = "https://github.com/GlareDB/delta-rs.git"
rev = "94773cb304ebc5eaa48d7540eb01cdf08f8b401f"
features = ["s3", "gcs", "azure", "datafusion"]
7 changes: 6 additions & 1 deletion crates/datafusion_ext/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
self.sql_expr_to_logical_expr(*right, schema, planner_context)
.await?,
);
let stride = Box::new(Expr::Literal(ScalarValue::Int64(Some(1))));

GetFieldAccess::ListRange { start, stop }
GetFieldAccess::ListRange {
start,
stop,
stride,
}
}
_ => GetFieldAccess::ListIndex {
key: Box::new(
Expand Down
2 changes: 2 additions & 0 deletions crates/datafusion_ext/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
| SQLDataType::Int64
| SQLDataType::Float64
| SQLDataType::Struct(_)
| SQLDataType::JSONB
| SQLDataType::Unspecified
=> not_impl_err!(
"Unsupported SQL type {sql_type:?}"
),
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ tiberius = { version = "0.12.2", default-features = false, features = [
"rustls",
"chrono",
] }
lance = { git = "https://github.com/GlareDB/lance", rev = "de6df70d9c5d95a4818b8799c23e3d1ad649bc1d" }
lance = { git = "https://github.com/GlareDB/lance", branch = "df36" }
bson = "2.10.0"
scylla = { version = "0.12.0" }
glob = "0.3.1"
Expand Down
18 changes: 13 additions & 5 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl ParseOptionValue<MongoDbProtocol> for OptionValue {
}
}


impl Display for MongoDbProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Expand Down Expand Up @@ -462,15 +461,24 @@ fn df_to_bson(val: ScalarValue) -> Result<Bson, ExtensionError> {
ScalarValue::UInt64(v) => Ok(Bson::Int64(i64::try_from(v.unwrap_or_default()).unwrap())),
ScalarValue::Float32(v) => Ok(Bson::Double(f64::from(v.unwrap_or_default()))),
ScalarValue::Float64(v) => Ok(Bson::Double(v.unwrap_or_default())),
ScalarValue::Struct(v, f) => {
ScalarValue::Struct(sa) => {
let mut doc = RawDocumentBuf::new();
for (key, value) in f.into_iter().zip(v.unwrap_or_default().into_iter()) {
let fields = sa.fields();
let columns = sa.columns();
for (field, column) in fields.iter().zip(columns.iter()) {
if column.len() != 1 {
return Err(ExtensionError::String(
"Struct column should have only one row".to_string(),
));
}
let sv = ScalarValue::try_from_array(column, 0).unwrap();
doc.append(
key.name(),
RawBson::try_from(df_to_bson(value)?)
field.name(),
RawBson::try_from(df_to_bson(sv)?)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
);
}

Ok(Bson::Document(
doc.to_document()
.map_err(|e| DataFusionError::External(Box::new(e)))?,
Expand Down
10 changes: 6 additions & 4 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{ExecutionPlan, Statistics};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::ReadOnlyDataSourceMetricsExecAdapter;
use deltalake::delta_datafusion::DataFusionMixins;
use deltalake::kernel::{ArrayType, DataType as DeltaDataType};
use deltalake::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake::operations::create::CreateBuilder;
Expand Down Expand Up @@ -97,6 +98,7 @@ impl LogStoreFactory for FakeStoreFactory {
/// DeltaField represents data types as stored in Delta Lake, with additional
/// metadata for indicating the 'real' (original) type, for cases when
/// downcasting occurs.
#[derive(Debug)]
struct DeltaField {
data_type: DeltaDataType,
metadata: Option<HashMap<String, Value>>,
Expand All @@ -111,10 +113,9 @@ fn arrow_to_delta_safe(arrow_type: &DataType) -> DeltaResult<DeltaField> {
(&DataType::Timestamp(TimeUnit::Microsecond, tz.clone())).try_into()?;
let mut metadata = HashMap::new();
metadata.insert("arrow_type".to_string(), json!(dtype));

Ok(DeltaField {
data_type: delta_type,
metadata: None,
metadata: Some(metadata),
})
}
dtype @ DataType::FixedSizeList(fld, _) => {
Expand Down Expand Up @@ -216,7 +217,6 @@ impl NativeTableStorage {

for col in &opts.columns {
let delta_col = arrow_to_delta_safe(&col.arrow_type)?;

builder = builder.with_column(
col.name.clone(),
delta_col.data_type,
Expand All @@ -225,8 +225,9 @@ impl NativeTableStorage {
);
}

let delta_table = builder.await?;
// TODO: Partitioning
NativeTable::new(builder.await?)
NativeTable::new(delta_table)
};

Ok(tbl)
Expand Down Expand Up @@ -356,6 +357,7 @@ impl NativeTable {
} else {
SaveMode::Append
};

let store = self.delta.log_store();
let snapshot = self.delta.state.clone();
Arc::new(NativeTableInsertExec::new(
Expand Down
60 changes: 54 additions & 6 deletions crates/datasources/src/native/insert.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::{DataType, SchemaRef};
use datafusion::common::ToDFSchema;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::logical_expr::{ident, Cast, Expr};
use datafusion::physical_expr::{create_physical_expr, PhysicalSortExpr};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs,
Expand All @@ -16,6 +19,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use deltalake::kernel::StructField;
use deltalake::logstore::LogStore;
use deltalake::operations::write::WriteBuilder;
use deltalake::protocol::SaveMode;
Expand Down Expand Up @@ -86,7 +90,7 @@ impl ExecutionPlan for NativeTableInsertExec {
input: children[0].clone(),
store: self.store.clone(),
snapshot: self.snapshot.clone(),
save_mode: self.save_mode.clone(),
save_mode: self.save_mode,
}))
}

Expand All @@ -107,16 +111,60 @@ impl ExecutionPlan for NativeTableInsertExec {
context.session_config().clone(),
context.runtime_env(),
);

let schema = self.input.schema();
let fields = schema.fields().clone();
let input_dfschema = schema.to_dfschema()?;
// delta-rs does not support all data types, so we need to check if the input schema
// contains any unsupported data types.
// If it does, we need to cast them to supported data types.
let mut contains_unsupported_fields = false;
let projections = fields
.iter()
.map(|field| {
let e = if StructField::try_from(field.as_ref()).is_ok() {
ident(field.name())
} else {
contains_unsupported_fields = true;

match field.data_type() {
DataType::Timestamp(_, _) => Expr::Cast(Cast {
expr: Box::new(ident(field.name())),
data_type: DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
),
}),

dtype => {
return Err(DataFusionError::Execution(format!(
"Unsupported data type {:?} for field {}",
dtype,
field.name()
)))
}
}
};
let e = create_physical_expr(&e, &input_dfschema, state.execution_props()).unwrap();
Ok((e, field.name().clone()))
})
.collect::<DataFusionResult<Vec<_>>>()?;

let input = if contains_unsupported_fields {
Arc::new(ProjectionExec::try_new(projections, self.input.clone())?)
} else {
self.input.clone()
};

// Allows writing multiple output partitions from the input execution
// plan.
//
// TODO: Possibly try avoiding cloning the snapshot.
let builder = WriteBuilder::new(self.store.clone(), Some(self.snapshot.clone()))
.with_input_session_state(state)
.with_save_mode(self.save_mode.clone())
.with_input_execution_plan(self.input.clone());
.with_save_mode(self.save_mode)
.with_input_execution_plan(input.clone());

let input = self.input.clone();
let output = futures::stream::once(async move {
let _ = builder
.await
Expand Down
3 changes: 2 additions & 1 deletion crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl TableProvider for ObjStoreTableProvider {
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
let (files, statistics) = get_statistics_with_limit(files, self.schema(), limit).await?;
let (files, statistics) =
get_statistics_with_limit(files, self.schema(), limit, true).await?;

// If there are no files, return an empty exec plan.
if files.is_empty() {
Expand Down
Loading

0 comments on commit e780412

Please sign in to comment.