Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use actual precision for clickhouse data types #2528

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 83 additions & 27 deletions crates/datasources/src/clickhouse/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,9 @@ use std::task::{Context, Poll};
use chrono::{DateTime, NaiveDate};
use chrono_tz::Tz;
use datafusion::arrow::array::{
Array,
BooleanBuilder,
Date32Builder,
Float32Array,
Float64Array,
Int16Array,
Int32Array,
Int64Array,
Int8Array,
StringBuilder,
TimestampNanosecondBuilder,
UInt16Array,
UInt32Array,
UInt64Array,
Array, BooleanBuilder, Date32Builder, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, StringBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
TimestampNanosecondBuilder, TimestampSecondBuilder, UInt16Array, UInt32Array, UInt64Array,
vrongmeal marked this conversation as resolved.
Show resolved Hide resolved
UInt8Array,
};
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
Expand Down Expand Up @@ -121,9 +110,7 @@ fn block_to_batch(schema: Arc<Schema>, block: Block) -> Result<RecordBatch> {
arrs.push(arr);
}

let batch = RecordBatch::try_new(schema, arrs)?;
let batch = crate::common::util::normalize_batch(&batch)?;
Ok(batch)
Ok(RecordBatch::try_new(schema, arrs)?)
}

/// Converts a column from a block into an arrow array.
Expand Down Expand Up @@ -228,6 +215,78 @@ fn column_to_array(
}
Arc::new(vals.finish())
}
DataType::Timestamp(TimeUnit::Second, tz) => {
let mut vals = TimestampSecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
let mut vals = TimestampMillisecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_millis(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_millis(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
let mut vals = TimestampMicrosecondBuilder::with_capacity(column.len());
for val in column {
match val {
Value::DateTime(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these conversions work across resolutions too? maybe good to have a comment about their rounding/translation strategy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by resolutions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what I'm asking is "what are the cases where the try_from fails? Is it out-of-bounds issues only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's only out-of-bound cases.

.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_micros(),
),
Value::DateTime64(v) => vals.append_value(
DateTime::<Tz>::try_from(v)
.map_err(ClickhouseError::DateTimeConvert)?
.timestamp_micros(),
),
Value::Null if nullable => vals.append_null(),
other => {
return Err(ClickhouseError::String(format!(
"unexpected value type: {other}"
)))
}
}
}
Arc::new(vals.finish().with_timezone_opt(tz))
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
let mut vals = TimestampNanosecondBuilder::with_capacity(column.len());
for val in column {
Expand Down Expand Up @@ -305,7 +364,7 @@ pub fn clickhouse_type_to_arrow_type(
let mut out = vec![];
let mut in_parens = 0usize;
let mut last_start = 0;
// todo: handle parens in enum strings?
// TODO: handle parens in enum strings?
for (i, c) in input.char_indices() {
match c {
',' => {
Expand Down Expand Up @@ -438,8 +497,7 @@ pub fn clickhouse_type_to_arrow_type(
)));
}
let tz = &args[0][1..args[0].len() - 1];
// TODO: This is technically "second" precision.
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.into())).into()
DataType::Timestamp(TimeUnit::Second, Some(tz.into())).into()
}
"DateTime64" => {
if args.len() == 2 {
Expand All @@ -450,8 +508,7 @@ pub fn clickhouse_type_to_arrow_type(
)));
}
let p = parse_precision(args[0])?;
// TODO: Use the actual precision.
let _tu = if p < 3 {
let tu = if p < 3 {
TimeUnit::Second
} else if p < 6 {
TimeUnit::Millisecond
Expand All @@ -461,11 +518,10 @@ pub fn clickhouse_type_to_arrow_type(
TimeUnit::Nanosecond
};
let tz = &args[1][1..args[1].len() - 1];
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.into())).into()
DataType::Timestamp(tu, Some(tz.into())).into()
} else if args.len() == 1 {
let p = parse_precision(args[0])?;
// TODO: Use the actual precision.
let _tu = if p < 3 {
let tu = if p < 3 {
TimeUnit::Second
} else if p < 6 {
TimeUnit::Millisecond
Expand All @@ -474,7 +530,7 @@ pub fn clickhouse_type_to_arrow_type(
} else {
TimeUnit::Nanosecond
};
DataType::Timestamp(TimeUnit::Nanosecond, None).into()
DataType::Timestamp(tu, None).into()
} else {
return Err(KlickhouseError::TypeParseError(format!(
"bad arg count for DateTime64, expected 1 or 2 and got {}",
Expand Down Expand Up @@ -606,7 +662,7 @@ pub fn clickhouse_type_to_arrow_type(
"unsupported Date32 type".to_string(),
));
}
"DateTime" => DataType::Timestamp(TimeUnit::Nanosecond, None).into(),
"DateTime" => DataType::Timestamp(TimeUnit::Second, None).into(),
"IPv4" => {
return Err(KlickhouseError::TypeParseError(
"unsupported IPv4 type".to_string(),
Expand Down
1 change: 1 addition & 0 deletions crates/pgrepr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dtoa = "1.0.9"
chrono-tz = "0.8.5"
bytes = "1.4.0"
const_format = "0.2.32"
once_cell = "1.19.0"
Loading
Loading