Skip to content

Commit

Permalink
fix: fix type casting issue (GreptimeTeam#1652)
Browse files Browse the repository at this point in the history
* fix: fix type casting issue

* chore: apply suggestion from CR
  • Loading branch information
WenyXu authored and paomian committed Oct 19, 2023
1 parent 443b5c8 commit 5056641
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 86 deletions.
6 changes: 6 additions & 0 deletions src/common/datasource/tests/csv/type_cast.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
hostname,environment,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice,ts
host_0,test,32,58,36,72,61,21,53,12,59,72,2023-04-01T00:00:00+00:00
host_1,staging,12,32,50,84,19,73,38,37,72,2,2023-04-01T00:00:00+00:00
host_2,test,98,5,40,95,64,39,21,63,53,94,2023-04-01T00:00:00+00:00
host_3,test,98,95,7,48,99,67,14,86,36,23,2023-04-01T00:00:00+00:00
host_4,test,32,44,11,53,64,9,17,39,20,7,2023-04-01T00:00:00+00:00
114 changes: 28 additions & 86 deletions src/frontend/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::{DataType, Schema, SchemaRef};
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
Expand Down Expand Up @@ -245,11 +246,7 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);

ensure_schema_matches_ignore_timezone(
&projected_file_schema,
&projected_table_schema,
true,
)?;
ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;

files.push((
Arc::new(compat_schema),
Expand Down Expand Up @@ -336,78 +333,27 @@ async fn batch_insert(
Ok(res)
}

fn ensure_schema_matches_ignore_timezone(
left: &SchemaRef,
right: &SchemaRef,
ts_cast: bool,
) -> Result<()> {
let not_match = left
fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
let not_match = from
.fields
.iter()
.zip(right.fields.iter())
.zip(to.fields.iter())
.map(|(l, r)| (l.data_type(), r.data_type()))
.enumerate()
.find(|(_, (l, r))| !data_type_equals_ignore_timezone_with_options(l, r, ts_cast));
.find(|(_, (l, r))| !can_cast_types(l, r));

if let Some((index, _)) = not_match {
error::InvalidSchemaSnafu {
index,
table_schema: left.to_string(),
file_schema: right.to_string(),
table_schema: to.to_string(),
file_schema: from.to_string(),
}
.fail()
} else {
Ok(())
}
}

fn data_type_equals_ignore_timezone_with_options(
l: &DataType,
r: &DataType,
ts_cast: bool,
) -> bool {
match (l, r) {
(DataType::List(a), DataType::List(b))
| (DataType::LargeList(a), DataType::LargeList(b)) => {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
}
(DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => {
a_size == b_size
&& a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
}
(DataType::Struct(a), DataType::Struct(b)) => {
a.len() == b.len()
&& a.iter().zip(b).all(|(a, b)| {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
})
}
(DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => {
a_field == b_field && a_is_sorted == b_is_sorted
}
(DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => {
l_unit == r_unit || ts_cast
}
(&DataType::Utf8, DataType::Timestamp(_, _))
| (DataType::Timestamp(_, _), &DataType::Utf8) => ts_cast,
_ => l == r,
}
}

/// Allows the file schema is a subset of table
fn generated_schema_projection_and_compatible_file_schema(
file: &SchemaRef,
Expand Down Expand Up @@ -437,24 +383,23 @@ fn generated_schema_projection_and_compatible_file_schema(
mod tests {
use std::sync::Arc;

use datatypes::arrow::datatypes::{Field, Schema};
use datatypes::arrow::datatypes::{DataType, Field, Schema};

use super::*;

fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) {
test_schema_matches_with_options(l, r, false, matches)
}

fn test_schema_matches_with_options(
l: (DataType, bool),
r: (DataType, bool),
ts_cast: bool,
matches: bool,
) {
let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)]));
let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)]));
let res = ensure_schema_matches_ignore_timezone(&s1, &s2, ts_cast);
assert_eq!(matches, res.is_ok())
fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
let res = ensure_schema_compatible(&s1, &s2);
assert_eq!(
matches,
res.is_ok(),
"from data type: {}, to data type: {}, expected: {}, but got: {}",
from.0,
to.0,
matches,
res.is_ok()
)
}

#[test]
Expand Down Expand Up @@ -519,17 +464,17 @@ mod tests {
),
true,
),
false,
true,
);

test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);

test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false);
test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
}

#[test]
fn test_data_type_equals_ignore_timezone_with_options() {
test_schema_matches_with_options(
test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Microsecond,
Expand All @@ -545,10 +490,9 @@ mod tests {
true,
),
true,
true,
);

test_schema_matches_with_options(
test_schema_matches(
(DataType::Utf8, true),
(
DataType::Timestamp(
Expand All @@ -558,10 +502,9 @@ mod tests {
true,
),
true,
true,
);

test_schema_matches_with_options(
test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
Expand All @@ -571,7 +514,6 @@ mod tests {
),
(DataType::Utf8, true),
true,
true,
);
}

Expand Down
38 changes: 38 additions & 0 deletions tests-integration/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,44 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
}
}

#[apply(both_instances_cases)]
async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

// setups
execute_sql(
&instance,
"create table tsbs_cpu(hostname STRING, environment STRING, usage_user DOUBLE, usage_system DOUBLE, usage_idle DOUBLE, usage_nice DOUBLE, usage_iowait DOUBLE, usage_irq DOUBLE, usage_softirq DOUBLE, usage_steal DOUBLE, usage_guest DOUBLE, usage_guest_nice DOUBLE, ts TIMESTAMP TIME INDEX, PRIMARY KEY(hostname));",
)
.await;
let filepath = get_data_dir("../src/common/datasource/tests/csv/type_cast.csv")
.canonicalize()
.unwrap()
.display()
.to_string();

let output = execute_sql(
&instance,
&format!("copy tsbs_cpu from '{}' WITH(FORMAT='csv');", &filepath),
)
.await;

assert!(matches!(output, Output::AffectedRows(5)));

let output = execute_sql(&instance, "select * from tsbs_cpu order by hostname;").await;
let expected = "\
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+
| hostname | environment | usage_user | usage_system | usage_idle | usage_nice | usage_iowait | usage_irq | usage_softirq | usage_steal | usage_guest | usage_guest_nice | ts |
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+
| host_0 | test | 32.0 | 58.0 | 36.0 | 72.0 | 61.0 | 21.0 | 53.0 | 12.0 | 59.0 | 72.0 | 2023-04-01T00:00:00 |
| host_1 | staging | 12.0 | 32.0 | 50.0 | 84.0 | 19.0 | 73.0 | 38.0 | 37.0 | 72.0 | 2.0 | 2023-04-01T00:00:00 |
| host_2 | test | 98.0 | 5.0 | 40.0 | 95.0 | 64.0 | 39.0 | 21.0 | 63.0 | 53.0 | 94.0 | 2023-04-01T00:00:00 |
| host_3 | test | 98.0 | 95.0 | 7.0 | 48.0 | 99.0 | 67.0 | 14.0 | 86.0 | 36.0 | 23.0 | 2023-04-01T00:00:00 |
| host_4 | test | 32.0 | 44.0 | 11.0 | 53.0 | 64.0 | 9.0 | 17.0 | 39.0 | 20.0 | 7.0 | 2023-04-01T00:00:00 |
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+";
check_output_stream(output, expected).await;
}

#[apply(both_instances_cases)]
async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
let is_distributed_mode = instance.is_distributed_mode();
Expand Down

0 comments on commit 5056641

Please sign in to comment.