Skip to content

Commit

Permalink
chore: bump datafusion version to fix last_value regression
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Jun 19, 2024
1 parent 22d1268 commit c7dfea8
Show file tree
Hide file tree
Showing 8 changed files with 545 additions and 257 deletions.
684 changes: 488 additions & 196 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
derive_builder = "0.12"
dotenv = "0.15"
# TODO(LFC): Wait for https://github.com/etcdv3/etcd-client/pull/76
Expand Down
6 changes: 3 additions & 3 deletions src/common/datasource/src/file_format/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::vec;

use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -167,8 +166,9 @@ async fn test_parquet_exec() {
.to_string();
let base_config = scan_config(schema.clone(), None, path);

let exec = ParquetExec::new(base_config, None, None, TableParquetOptions::default())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let exec = ParquetExec::builder(base_config)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)))
.build();

let ctx = SessionContext::new();

Expand Down
7 changes: 1 addition & 6 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,12 +877,7 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::Decimal128(v, p, s) => v
.map(|v| Value::Decimal128(Decimal128::new(v, p, s)))
.unwrap_or(Value::Null),
ScalarValue::Decimal256(_, _, _)
| ScalarValue::Struct(_)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _) => {
_ => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.data_type(),
}
Expand Down
7 changes: 1 addition & 6 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,7 @@ impl Helper {
let vector = Decimal128Vector::from(vec![v]).with_precision_and_scale(p, s)?;
ConstantVector::new(Arc::new(vector), length)
}
ScalarValue::Decimal256(_, _, _)
| ScalarValue::Struct(_)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _) => {
_ => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {value}"),
}
Expand Down
15 changes: 10 additions & 5 deletions src/file-engine/src/query/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use common_datasource::file_format::Format;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::{Statistics, ToDFSchema};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
Expand Down Expand Up @@ -196,12 +195,18 @@ fn new_parquet_stream_with_exec_plan(
None
};

// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
let parquet_exec = ParquetExec::new(scan_config, filters, None, TableParquetOptions::default())
let mut builder = ParquetExec::builder(scan_config);
if let Some(filters) = filters {
builder = builder.with_predicate(filters);
}
let parquet_exec = builder
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
)))
.build();

// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
let stream = parquet_exec
.execute(0, task_ctx)
.context(error::ParquetScanPlanSnafu)?;
Expand Down
7 changes: 4 additions & 3 deletions src/flow/src/transform/aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use hydroflow::futures::future::Map;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::aggregate_function::AggregationInvocation;
use substrait_proto::proto::aggregate_rel::{Grouping, Measure};
Expand Down
58 changes: 29 additions & 29 deletions src/flow/src/transform/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use common_time::{Date, Timestamp};
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::Literal;
Expand All @@ -33,41 +34,41 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
let scalar_value = match &lit.literal_type {
Some(LiteralType::Boolean(b)) => (Value::from(*b), CDT::boolean_datatype()),
Some(LiteralType::I8(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n as i8), CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u8), CDT::uint8_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n as i8), CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u8), CDT::uint8_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I16(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n as i16), CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u16), CDT::uint16_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n as i16), CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u16), CDT::uint16_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I32(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n), CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u32), CDT::uint32_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n), CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u32), CDT::uint32_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I64(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n), CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u64), CDT::uint64_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n), CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u64), CDT::uint64_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::Fp32(f)) => (Value::from(*f), CDT::float32_datatype()),
Some(LiteralType::Fp64(f)) => (Value::from(*f), CDT::float64_datatype()),
Some(LiteralType::Timestamp(t)) => match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_REF => (
TIMESTAMP_SECOND_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_second(*t)),
CDT::timestamp_second_datatype(),
),
TIMESTAMP_MILLI_TYPE_REF => (
TIMESTAMP_MILLI_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_millisecond(*t)),
CDT::timestamp_millisecond_datatype(),
),
TIMESTAMP_MICRO_TYPE_REF => (
TIMESTAMP_MICRO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_microsecond(*t)),
CDT::timestamp_microsecond_datatype(),
),
TIMESTAMP_NANO_TYPE_REF => (
TIMESTAMP_NANO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_nanosecond(*t)),
CDT::timestamp_nanosecond_datatype(),
),
Expand Down Expand Up @@ -115,37 +116,36 @@ pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<C
match kind {
Kind::Bool(_) => Ok(CDT::boolean_datatype()),
Kind::I8(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint8_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint8_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I16(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint16_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint16_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I32(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint32_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint32_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I64(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint64_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint64_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Fp32(_) => Ok(CDT::float32_datatype()),
Kind::Fp64(_) => Ok(CDT::float64_datatype()),
Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_REF => Ok(CDT::timestamp_second_datatype()),
TIMESTAMP_MILLI_TYPE_REF => Ok(CDT::timestamp_millisecond_datatype()),
TIMESTAMP_MICRO_TYPE_REF => Ok(CDT::timestamp_microsecond_datatype()),
TIMESTAMP_NANO_TYPE_REF => Ok(CDT::timestamp_nanosecond_datatype()),
TIMESTAMP_SECOND_TYPE_VARIATION_REF => Ok(CDT::timestamp_second_datatype()),
TIMESTAMP_MILLI_TYPE_VARIATION_REF => Ok(CDT::timestamp_millisecond_datatype()),
TIMESTAMP_MICRO_TYPE_VARIATION_REF => Ok(CDT::timestamp_microsecond_datatype()),
TIMESTAMP_NANO_TYPE_VARIATION_REF => Ok(CDT::timestamp_nanosecond_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Date(date) => match date.type_variation_reference {
DATE_32_TYPE_REF => Ok(CDT::date_datatype()),
DATE_64_TYPE_REF => Ok(CDT::date_datatype()),
DATE_32_TYPE_VARIATION_REF | DATE_64_TYPE_VARIATION_REF => Ok(CDT::date_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Binary(_) => Ok(CDT::binary_datatype()),
Expand Down

0 comments on commit c7dfea8

Please sign in to comment.