diff --git a/crates/datasources/src/postgres/mod.rs b/crates/datasources/src/postgres/mod.rs index b2c01097c..07fafb0f6 100644 --- a/crates/datasources/src/postgres/mod.rs +++ b/crates/datasources/src/postgres/mod.rs @@ -30,6 +30,7 @@ use datafusion::arrow::array::{ use datafusion::arrow::datatypes::{ DataType, Field, + FieldRef, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, @@ -587,6 +588,7 @@ pub struct PostgresTableProviderConfig { pub access: PostgresAccess, pub schema: String, pub table: String, + pub fields: Option>, // filter } impl TryFrom @@ -600,6 +602,7 @@ impl TryFrom access: value.access.required("postgres access")?, schema: value.schema, table: value.table, + fields: None, }) } } @@ -637,16 +640,25 @@ impl PostgresTableProvider { access, schema, table, + fields, } = conf; let state = Arc::new(access.connect().await?); - let (arrow_schema, pg_types) = state.get_table_schema(&schema, &table).await?; + + let (observed_schema, pg_types) = state.get_table_schema(&schema, &table).await?; + + // TODO: compare observed schema to the specified fields, and + // maybe error here if there's no overlap. Ok(PostgresTableProvider { schema, table, state, - arrow_schema: Arc::new(arrow_schema), + arrow_schema: fields + .map(ArrowSchema::new) + .or_else(|| Some(observed_schema)) + .map(Arc::new) + .unwrap(), pg_types: Arc::new(pg_types), }) } diff --git a/crates/sqlbuiltins/src/functions/table/postgres.rs b/crates/sqlbuiltins/src/functions/table/postgres.rs index 34f676c8d..e75fc21b3 100644 --- a/crates/sqlbuiltins/src/functions/table/postgres.rs +++ b/crates/sqlbuiltins/src/functions/table/postgres.rs @@ -59,6 +59,7 @@ impl TableFunc for ReadPostgres { access, schema, table, + fields: None, }; let prov = PostgresTableProvider::try_new(prov_conf) .await diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index af64e7a2d..240d20868 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -162,6 +162,7 @@ impl<'a> ExternalDispatcher<'a> { access, schema: schema.to_owned(), table: name.to_owned(), + fields: None, }; let prov = PostgresTableProvider::try_new(prov_conf).await?; Ok(Arc::new(prov)) @@ -428,6 +429,9 @@ impl<'a> ExternalDispatcher<'a> { tunnel: Option, schema: Option, ) -> Result> { + // TODO: possibly true that if you have a schema for provider + // that doesn't support it, the schema would be ignored. + match &opts { TableOptionsV0::Debug(TableOptionsDebug { table_type }) => { let provider = DebugTableType::from_str(table_type)?; @@ -454,14 +458,20 @@ impl<'a> ExternalDispatcher<'a> { TableOptionsV0::Postgres(TableOptionsPostgres { connection_string, - schema, + schema: pg_schema, table, }) => { let access = PostgresAccess::new_from_conn_str(connection_string, tunnel); let prov_conf = PostgresTableProviderConfig { access, - schema: schema.to_owned(), + schema: pg_schema.to_owned(), table: table.to_owned(), + fields: schema.map(|s| { + s.all_fields() + .into_iter() + .map(|f| Arc::new(f.to_owned())) + .collect() + }), }; let prov = PostgresTableProvider::try_new(prov_conf).await?; Ok(Arc::new(prov)) diff --git a/testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql b/testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql index 7bd0f5500..1992c6428 100644 --- a/testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql +++ b/testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql @@ -62,7 +62,7 @@ INSERT INTO datatypes ( -- bikeshare_stations table for testing datasources. CREATE TABLE IF NOT EXISTS bikeshare_stations ( station_id INT, - name TEXT, + name TEXT, status TEXT, address TEXT, alternate_name TEXT, @@ -94,3 +94,18 @@ CREATE TABLE IF NOT EXISTS bikeshare_trips ( ); \copy bikeshare_trips FROM './testdata/sqllogictests_datasources_common/data/gcs-artifacts/bikeshare_trips.csv' CSV HEADER; + +CREATE TABLE IF NOT EXISTS minimal_test ( + id INT, + ordinal TEXT, + fib INT, + power INT, + answer INT +); + +INSERT INTO minimal_test VALUES (1, 'first', 1, 2, 42); +INSERT INTO minimal_test VALUES (2, 'second', 1, 4, 84); +INSERT INTO minimal_test VALUES (3, 'third', 2, 8, 168); +INSERT INTO minimal_test VALUES (4, 'fourth', 3, 16, 336); +INSERT INTO minimal_test VALUES (5, 'fifth', 5, 32, 336); +INSERT INTO minimal_test VALUES (6, 'sixth', 5, 64, 672); diff --git a/testdata/sqllogictests_postgres/external_table.slt b/testdata/sqllogictests_postgres/external_table.slt index 03a4ad270..7f5290886 100644 --- a/testdata/sqllogictests_postgres/external_table.slt +++ b/testdata/sqllogictests_postgres/external_table.slt @@ -40,3 +40,74 @@ SELECT count(*) FROM external_table; statement ok DROP TABLE external_table; + +statement ok +CREATE EXTERNAL TABLE external_table + FROM postgres + OPTIONS ( + host = 'localhost', + port = '5433', + user = 'glaredb', + password = 'password', + database = 'glaredb_test', + schema = 'public', + table = 'bikeshare_stations', + ) + COLUMNS ( + station_id int, + ); + +query I +SELECT * FROM external_table ORDER BY station_id LIMIT 2; +---- +0 +11 + + +statement ok +CREATE EXTERNAL TABLE magic_test + FROM postgres + OPTIONS ( + host = 'localhost', + port = '5433', + user = 'glaredb', + password = 'password', + database = 'glaredb_test', + schema = 'public', + table = 'minimal_test', + ) + COLUMNS ( + id int, + power int, + ); + +query I +SELECT * FROM magic_test ORDER BY id LIMIT 2; +---- +1 2 +2 4 + +statement ok +CREATE EXTERNAL TABLE magic_test_type_error + FROM postgres + OPTIONS ( + host = 'localhost', + port = '5433', + user = 'glaredb', + password = 'password', + database = 'glaredb_test', + schema = 'public', + table = 'minimal_test', + ) + COLUMNS ( + id int, + ordinal int, + power text, + ); + +statement error +SELECT * FROM magic_test_type_error ORDER BY id LIMIT 2; + +query I +SELECT * FROM magic_test WHERE power = 42; +----