Skip to content

Commit

Permalink
fix: Use microsecond precision for postgres datasource (#2530)
Browse files Browse the repository at this point in the history
```
> select * from read_postgres('postgresql://glaredb:password@0.0.0.0:5433/glaredb_test', 'public', 'abc');
┌─────────────────────┐
│ a                   │
│ ──                  │
│ Timestamp<µs, UTC>  │
╞═════════════════════╡
│ 0001-01-01T00:00:00 │
└─────────────────────┘
```

Fixes: #2438

---------

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal authored Jan 31, 2024
1 parent b0e49a4 commit 2038cdf
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
71 changes: 29 additions & 42 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,21 @@ use std::task::{Context, Poll};
use async_trait::async_trait;
use chrono::naive::{NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDate, Timelike, Utc};
use datafusion::arrow::array::Decimal128Builder;
use datafusion::arrow::array::{
Array,
BinaryBuilder,
BooleanBuilder,
Date32Builder,
Decimal128Builder,
Float32Builder,
Float64Builder,
Int16Builder,
Int32Builder,
Int64Builder,
StringBuilder,
Time64MicrosecondBuilder,
TimestampMicrosecondBuilder,
};
use datafusion::arrow::datatypes::{
DataType,
Field,
Expand Down Expand Up @@ -1091,21 +1105,6 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
return Ok(RecordBatch::try_new_with_options(schema, Vec::new(), &options).unwrap());
}

use datafusion::arrow::array::{
Array,
BinaryBuilder,
BooleanBuilder,
Date32Builder,
Float32Builder,
Float64Builder,
Int16Builder,
Int32Builder,
Int64Builder,
StringBuilder,
Time64NanosecondBuilder,
TimestampNanosecondBuilder,
};

let rows = rows
.into_iter()
.collect::<Result<Vec<_>, _>>()
Expand Down Expand Up @@ -1163,45 +1162,33 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
}
Arc::new(arr.finish())
}
dt @ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len());
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let mut arr = TimestampMicrosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveDateTime> = row.try_get(col_idx)?;
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
let val = val.map(|v| v.timestamp_micros());
arr.append_option(val);
}
Arc::new(arr.finish())
}
dt @ DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => {
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len())
.with_data_type(dt.clone());
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
let mut arr = TimestampMicrosecondBuilder::with_capacity(rows.len())
.with_timezone(tz.clone());
for row in rows.iter() {
let val: Option<DateTime<Utc>> = row.try_get(col_idx)?;
let val = val
.map(|v| {
v.timestamp_nanos_opt().ok_or_else(|| {
PostgresError::DataOverflow(v.to_string(), dt.clone())
})
})
.transpose()?;
let val = val.map(|v| v.timestamp_micros());
arr.append_option(val);
}
Arc::new(arr.finish())
}
DataType::Time64(TimeUnit::Nanosecond) => {
let mut arr = Time64NanosecondBuilder::with_capacity(rows.len());
DataType::Time64(TimeUnit::Microsecond) => {
let mut arr = Time64MicrosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveTime> = row.try_get(col_idx)?;
let val = val.map(|v| {
let nanos = v.nanosecond() as i64;
let sub_micros = (v.nanosecond() / 1_000) as i64;
let secs_since_midnight = v.num_seconds_from_midnight() as i64;
(secs_since_midnight * 1_000_000_000) + nanos
(secs_since_midnight * 1_000_000) + sub_micros
});
arr.append_option(val);
}
Expand Down Expand Up @@ -1250,11 +1237,11 @@ fn try_create_arrow_schema(names: Vec<String>, types: &Vec<PostgresType>) -> Res
// to specify the precision and scale for the column. Setting these
// same as bigquery.
&PostgresType::NUMERIC => DataType::Decimal128(38, 9),
&PostgresType::TIMESTAMP => DataType::Timestamp(TimeUnit::Nanosecond, None),
&PostgresType::TIMESTAMP => DataType::Timestamp(TimeUnit::Microsecond, None),
&PostgresType::TIMESTAMPTZ => {
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
}
&PostgresType::TIME => DataType::Time64(TimeUnit::Nanosecond),
&PostgresType::TIME => DataType::Time64(TimeUnit::Microsecond),
&PostgresType::DATE => DataType::Date32,
// TODO: Time with timezone and interval data types in postgres are
// of 12 and 16 bytes respectively. This kind of size is not
Expand Down
11 changes: 10 additions & 1 deletion testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ VALUES (
12345.67891234
);

INSERT INTO datatypes(c1) VALUES (NULL); -- inserts nulls
INSERT INTO datatypes (c1) VALUES (NULL); -- inserts nulls

-- Zero value timestamps: https://github.com/GlareDB/glaredb/issues/2438
INSERT INTO datatypes (
c15,
c18
) VALUES (
'0001-01-01 00:00:00',
'0001-01-01 00:00:00 UTC'
);

-- bikeshare_stations table for testing datasources.
CREATE TABLE IF NOT EXISTS bikeshare_stations (
Expand Down
1 change: 1 addition & 0 deletions testdata/sqllogictests_postgres/datatypes.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SELECT * FROM datatypes;
----
t 1 2 3 4.5 6.7 a b cde fghi {"a":[1,2]} [{"b":null},{"c":true}] 292a485f-a56a-4938-8f1a-bbbbbbbbbbb1 \x62696e 1999-09-30 16:32:04 16:32:04 1999-09-30 1999-09-30 14:32:04+00 12345.678900000 12346.000000000 12345.678910000
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL 1-01-01 00:00:00 NULL NULL 1-01-01 00:00:00+00 NULL NULL NULL

halt

Expand Down

0 comments on commit 2038cdf

Please sign in to comment.