Skip to content

Commit

Permalink
Support extract on intervals and durations
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information
nrc committed Aug 23, 2024
1 parent b8b76bc commit 905550d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 53 deletions.
34 changes: 23 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,22 @@ version = "41.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
arrow = { version = "52.2.0", features = [
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", features = [
"prettyprint",
] }
arrow-array = { version = "52.2.0", default-features = false, features = [
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false, features = [
"chrono-tz",
] }
arrow-buffer = { version = "52.2.0", default-features = false }
arrow-flight = { version = "52.2.0", features = [
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "52.2.0", default-features = false, features = [
arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "52.2.0", default-features = false }
arrow-schema = { version = "52.2.0", default-features = false }
arrow-string = { version = "52.2.0", default-features = false }
arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
Expand Down Expand Up @@ -123,9 +123,9 @@ indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.10.2", default-features = false }
object_store = { version = "0.11", default-features = false }
parking_lot = "0.12"
parquet = { version = "52.2.0", default-features = false, features = [
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand All @@ -134,7 +134,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.22.0"
serde_json = "1"
sqlparser = { version = "0.50.0", features = ["visitor"] }
sqlparser = { git = "https://github.com/sqlparser-rs/sqlparser-rs.git", rev = "fab834d", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down Expand Up @@ -165,3 +165,15 @@ large_futures = "warn"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_imports = "deny"

[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-select = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "2795b94" }
60 changes: 28 additions & 32 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ assert_cmd = "2.0"
ctor = "0.2.0"
predicates = "3.0"
rstest = "0.17"

[patch.crates-io]
arrow = { path = "../../arrow-rs/arrow" }
arrow-array = { path = "../../arrow-rs/arrow-array" }
arrow-buffer = { path = "../../arrow-rs/arrow-buffer" }
arrow-data = { path = "../../arrow-rs/arrow-data" }
arrow-flight = { path = "../../arrow-rs/arrow-flight" }
arrow-ipc = { path = "../../arrow-rs/arrow-ipc" }
arrow-ord = { path = "../../arrow-rs/arrow-ord" }
arrow-schema = { path = "../../arrow-rs/arrow-schema" }
arrow-select = { path = "../../arrow-rs/arrow-select" }
arrow-string = { path = "../../arrow-rs/arrow-string" }
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ num_cpus = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.21.0", optional = true }
pyo3 = { version = "0.22.2", optional = true }
sqlparser = { workspace = true }

[target.'cfg(target_family = "wasm")'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2002,7 +2002,7 @@ mod tests {
let int_col_offset = offset_index.get(4).unwrap();

// 325 pages in int_col
assert_eq!(int_col_offset.len(), 325);
assert_eq!(int_col_offset.page_locations.len(), 325);
match int_col_index {
Index::INT32(index) => {
assert_eq!(index.indexes.len(), 325);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::format::PageLocation;
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
Expand Down Expand Up @@ -362,7 +362,7 @@ struct PagesPruningStatistics<'a> {
converter: StatisticsConverter<'a>,
column_index: &'a ParquetColumnIndex,
offset_index: &'a ParquetOffsetIndex,
page_offsets: &'a Vec<PageLocation>,
page_offsets: &'a OffsetIndexMetaData,
}

impl<'a> PagesPruningStatistics<'a> {
Expand Down
34 changes: 28 additions & 6 deletions datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
use std::any::Any;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Float64Array};
use arrow::array::{Array, ArrayRef, Float64Array, PrimitiveArray};
use arrow::compute::{binary, cast, date_part, DatePart};
use arrow::datatypes::DataType::{
Date32, Date64, Float64, Time32, Time64, Timestamp, Utf8, Utf8View,
Date32, Date64, Duration, Float64, Interval, Time32, Time64, Timestamp, Utf8, Utf8View,
};
use arrow::datatypes::IntervalUnit::{YearMonth, DayTime, MonthDayNano};
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
use arrow::datatypes::{DataType, TimeUnit};
use arrow::datatypes::{DataType, Int32Type, TimeUnit};

use datafusion_common::cast::{
as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
Expand Down Expand Up @@ -107,6 +108,20 @@ impl DatePartFunc {
Exact(vec![Utf8View, Time64(Microsecond)]),
Exact(vec![Utf8, Time64(Nanosecond)]),
Exact(vec![Utf8View, Time64(Nanosecond)]),
Exact(vec![Utf8, Interval(YearMonth)]),
Exact(vec![Utf8View, Interval(YearMonth)]),
Exact(vec![Utf8, Interval(DayTime)]),
Exact(vec![Utf8View, Interval(DayTime)]),
Exact(vec![Utf8, Interval(MonthDayNano)]),
Exact(vec![Utf8View, Interval(MonthDayNano)]),
Exact(vec![Utf8, Duration(Second)]),
Exact(vec![Utf8View, Duration(Second)]),
Exact(vec![Utf8, Duration(Millisecond)]),
Exact(vec![Utf8View, Duration(Millisecond)]),
Exact(vec![Utf8, Duration(Microsecond)]),
Exact(vec![Utf8View, Duration(Microsecond)]),
Exact(vec![Utf8, Duration(Nanosecond)]),
Exact(vec![Utf8View, Duration(Nanosecond)]),
],
Volatility::Immutable,
),
Expand Down Expand Up @@ -211,9 +226,16 @@ fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
let secs = as_int32_array(secs.as_ref())?;
let subsecs = date_part(array, DatePart::Nanosecond)?;
let subsecs = as_int32_array(subsecs.as_ref())?;
// REVIEW: there has got to be a better way to do this: I want to treat null as 0, I was expecting
// some kind of map function (or even better map_if_null) on PrimitiveArray. Instead I have just
// discarded the null mask, which feels bad because if I was meant to be able to do this, I'd
// expect there to be a function to do it. I think it is also possible that a null-masked value in
// the array will not be 0 (I don't think that happens right now, but I think we're one optimisation
// away from disaster).
let subsecs: PrimitiveArray<Int32Type> = PrimitiveArray::new(subsecs.values().clone(), None);

let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
(secs as f64 + (subsecs as f64 / 1_000_000_000_f64)) * sf
let r: Float64Array = binary(secs, &subsecs, |secs, subsecs| {
(secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
})?;
Ok(Arc::new(r))
}
Expand Down Expand Up @@ -244,7 +266,7 @@ fn epoch(array: &dyn Array) -> Result<ArrayRef> {
Time64(Nanosecond) => {
as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
}
d => return exec_err!("Can not convert {d:?} to epoch"),
d => return exec_err!("Cannot convert {d:?} to epoch"),
};
Ok(Arc::new(f))
}

0 comments on commit 905550d

Please sign in to comment.