Skip to content

Commit

Permalink
Support writing UTC adjusted time arrays to parquet (#6278)
Browse files Browse the repository at this point in the history
* check if time is adjusted to utc from metadata

* add test

* add roundtrip test

* cargo fmt

* Fix regression

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
aykut-bozkurt and alamb authored Aug 23, 2024
1 parent 6dd4a5f commit 8c956a9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
69 changes: 67 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ pub(crate) fn evaluate_predicate(
#[cfg(test)]
mod tests {
use std::cmp::min;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
Expand All @@ -949,11 +949,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef};
use arrow_schema::{
ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
};
use arrow_select::concat::concat_batches;

use crate::arrow::arrow_reader::{
Expand Down Expand Up @@ -1223,6 +1226,68 @@ mod tests {
Ok(())
}

#[test]
fn test_time_utc_roundtrip() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new(
"time_millis",
ArrowDataType::Time32(TimeUnit::Millisecond),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"adjusted_to_utc".to_string(),
"".to_string(),
)])),
Field::new(
"time_micros",
ArrowDataType::Time64(TimeUnit::Microsecond),
true,
)
.with_metadata(HashMap::from_iter(vec![(
"adjusted_to_utc".to_string(),
"".to_string(),
)])),
]));

let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![
Arc::new(Time32MillisecondArray::from(vec![
Some(-1),
Some(0),
Some(86_399_000),
Some(86_400_000),
Some(86_401_000),
None,
])),
Arc::new(Time64MicrosecondArray::from(vec![
Some(-1),
Some(0),
Some(86_399 * 1_000_000),
Some(86_400 * 1_000_000),
Some(86_401 * 1_000_000),
None,
])),
],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0).as_primitive::<Time32MillisecondType>();
ret.column(1).as_primitive::<Time64MicrosecondType>();

Ok(())
}

struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
10 changes: 8 additions & 2 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
}
DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
u => unreachable!("Invalid unit for Time32: {:?}", u),
Expand All @@ -438,7 +438,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
Expand Down Expand Up @@ -1430,7 +1430,9 @@ mod tests {
}
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
REQUIRED INT64 ts_seconds;
Expand Down Expand Up @@ -1481,7 +1483,11 @@ mod tests {
),
Field::new("date", DataType::Date32, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
Field::new("time_milli_utc", DataType::Time32(TimeUnit::Millisecond), true)
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), "".to_string())])),
Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
Field::new("time_micro_utc", DataType::Time64(TimeUnit::Microsecond), true)
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), "".to_string())])),
Field::new(
"ts_milli",
DataType::Timestamp(TimeUnit::Millisecond, None),
Expand Down

0 comments on commit 8c956a9

Please sign in to comment.