Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use microsecond precision for postgres datasource #2530

Merged
merged 6 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 83 additions & 13 deletions crates/datasources/src/clickhouse/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use datafusion::arrow::array::{
Int64Array,
Int8Array,
StringBuilder,
TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
TimestampNanosecondBuilder,
TimestampSecondBuilder,
UInt16Array,
UInt32Array,
UInt64Array,
Expand Down Expand Up @@ -121,9 +124,7 @@ fn block_to_batch(schema: Arc<Schema>, block: Block) -> Result<RecordBatch> {
arrs.push(arr);
}

let batch = RecordBatch::try_new(schema, arrs)?;
let batch = crate::common::util::normalize_batch(&batch)?;
Ok(batch)
Ok(RecordBatch::try_new(schema, arrs)?)
}

/// Converts a column from a block into an arrow array.
Expand Down Expand Up @@ -228,6 +229,78 @@ fn column_to_array(
}
Arc::new(vals.finish())
}
DataType::Timestamp(TimeUnit::Second, tz) => {
let mut vals = TimestampSecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
let mut vals = TimestampMillisecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_millis(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_millis(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
let mut vals = TimestampMicrosecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_micros(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_micros(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
let mut vals = TimestampNanosecondBuilder::with_capacity(column.len());
for val in column {
Expand Down Expand Up @@ -305,7 +378,7 @@ pub fn clickhouse_type_to_arrow_type(
let mut out = vec![];
let mut in_parens = 0usize;
let mut last_start = 0;
// todo: handle parens in enum strings?
// TODO: handle parens in enum strings?
for (i, c) in input.char_indices() {
match c {
',' => {
Expand Down Expand Up @@ -438,8 +511,7 @@ pub fn clickhouse_type_to_arrow_type(
)));
}
let tz = &args[0][1..args[0].len() - 1];
// TODO: This is technically "second" precision.
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.into())).into()
DataType::Timestamp(TimeUnit::Second, Some(tz.into())).into()
}
"DateTime64" => {
if args.len() == 2 {
Expand All @@ -450,8 +522,7 @@ pub fn clickhouse_type_to_arrow_type(
)));
}
let p = parse_precision(args[0])?;
// TODO: Use the actual precision.
let _tu = if p < 3 {
let tu = if p < 3 {
TimeUnit::Second
} else if p < 6 {
TimeUnit::Millisecond
Expand All @@ -461,11 +532,10 @@ pub fn clickhouse_type_to_arrow_type(
TimeUnit::Nanosecond
};
let tz = &args[1][1..args[1].len() - 1];
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.into())).into()
DataType::Timestamp(tu, Some(tz.into())).into()
} else if args.len() == 1 {
let p = parse_precision(args[0])?;
// TODO: Use the actual precision.
let _tu = if p < 3 {
let tu = if p < 3 {
TimeUnit::Second
} else if p < 6 {
TimeUnit::Millisecond
Expand All @@ -474,7 +544,7 @@ pub fn clickhouse_type_to_arrow_type(
} else {
TimeUnit::Nanosecond
};
DataType::Timestamp(TimeUnit::Nanosecond, None).into()
DataType::Timestamp(tu, None).into()
} else {
return Err(KlickhouseError::TypeParseError(format!(
"bad arg count for DateTime64, expected 1 or 2 and got {}",
Expand Down Expand Up @@ -606,7 +676,7 @@ pub fn clickhouse_type_to_arrow_type(
"unsupported Date32 type".to_string(),
));
}
"DateTime" => DataType::Timestamp(TimeUnit::Nanosecond, None).into(),
"DateTime" => DataType::Timestamp(TimeUnit::Second, None).into(),
"IPv4" => {
return Err(KlickhouseError::TypeParseError(
"unsupported IPv4 type".to_string(),
Expand Down
71 changes: 29 additions & 42 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@ use std::task::{Context, Poll};
use async_trait::async_trait;
use chrono::naive::{NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use datafusion::arrow::array::Decimal128Builder;
use datafusion::arrow::array::{
Array,
BinaryBuilder,
BooleanBuilder,
Date32Builder,
Decimal128Builder,
Float32Builder,
Float64Builder,
Int16Builder,
Int32Builder,
Int64Builder,
StringBuilder,
Time64MicrosecondBuilder,
TimestampMicrosecondBuilder,
};
use datafusion::arrow::datatypes::{
DataType,
Field,
Expand Down Expand Up @@ -1091,21 +1105,6 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
return Ok(RecordBatch::try_new_with_options(schema, Vec::new(), &options).unwrap());
}

use datafusion::arrow::array::{
Array,
BinaryBuilder,
BooleanBuilder,
Date32Builder,
Float32Builder,
Float64Builder,
Int16Builder,
Int32Builder,
Int64Builder,
StringBuilder,
Time64NanosecondBuilder,
TimestampNanosecondBuilder,
};

let rows = rows
.into_iter()
.collect::<Result<Vec<_>, _>>()
Expand Down Expand Up @@ -1163,45 +1162,33 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
}
Arc::new(arr.finish())
}
dt @ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len());
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let mut arr = TimestampMicrosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveDateTime> = row.try_get(col_idx)?;
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
let val = val.map(|v| v.timestamp_micros());
arr.append_option(val);
}
Arc::new(arr.finish())
}
dt @ DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len())
.with_data_type(dt.clone());
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
let mut arr = TimestampMicrosecondBuilder::with_capacity(rows.len())
.with_timezone(tz.clone());
for row in rows.iter() {
let val: Option<DateTime<Utc>> = row.try_get(col_idx)?;
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
let val = val.map(|v| v.timestamp_micros());
arr.append_option(val);
}
Arc::new(arr.finish())
}
DataType::Time64(TimeUnit::Nanosecond) => {
let mut arr = Time64NanosecondBuilder::with_capacity(rows.len());
DataType::Time64(TimeUnit::Microsecond) => {
let mut arr = Time64MicrosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveTime> = row.try_get(col_idx)?;
let val = val.map(|v| {
let nanos = v.nanosecond() as i64;
let sub_micros = (v.nanosecond() / 1_000) as i64;
let secs_since_midnight = v.num_seconds_from_midnight() as i64;
(secs_since_midnight * 1_000_000_000) + nanos
(secs_since_midnight * 1_000_000) + sub_micros
});
arr.append_option(val);
}
Expand Down Expand Up @@ -1250,11 +1237,11 @@ fn try_create_arrow_schema(names: Vec<String>, types: &Vec<PostgresType>) -> Res
// to specify the precision and scale for the column. Setting these
// same as bigquery.
&PostgresType::NUMERIC => DataType::Decimal128(38, 9),
&PostgresType::TIMESTAMP => DataType::Timestamp(TimeUnit::Nanosecond, None),
&PostgresType::TIMESTAMP => DataType::Timestamp(TimeUnit::Microsecond, None),
&PostgresType::TIMESTAMPTZ => {
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
}
&PostgresType::TIME => DataType::Time64(TimeUnit::Nanosecond),
&PostgresType::TIME => DataType::Time64(TimeUnit::Microsecond),
&PostgresType::DATE => DataType::Date32,
// TODO: Time with timezone and interval data types in postgres are
// of 12 and 16 bytes respectively. This kind of size is not
Expand Down
1 change: 1 addition & 0 deletions crates/pgrepr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dtoa = "1.0.9"
chrono-tz = "0.8.5"
bytes = "1.4.0"
const_format = "0.2.32"
once_cell = "1.19.0"
Loading
Loading