Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: postgres implement column limit for postgres #2950

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{
DataType,
Field,
FieldRef,
Fields,
Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
Expand Down Expand Up @@ -587,6 +588,7 @@ pub struct PostgresTableProviderConfig {
pub access: PostgresAccess,
pub schema: String,
pub table: String,
pub fields: Option<Vec<FieldRef>>, // filter
}

impl TryFrom<protogen::sqlexec::table_provider::PostgresTableProviderConfig>
Expand All @@ -600,6 +602,7 @@ impl TryFrom<protogen::sqlexec::table_provider::PostgresTableProviderConfig>
access: value.access.required("postgres access")?,
schema: value.schema,
table: value.table,
fields: None,
})
}
}
Expand Down Expand Up @@ -637,16 +640,25 @@ impl PostgresTableProvider {
access,
schema,
table,
fields,
} = conf;

let state = Arc::new(access.connect().await?);
let (arrow_schema, pg_types) = state.get_table_schema(&schema, &table).await?;

let (observed_schema, pg_types) = state.get_table_schema(&schema, &table).await?;

// TODO: compare observed schema to the specified fields, and
// maybe error here if there's no overlap.

Ok(PostgresTableProvider {
schema,
table,
state,
arrow_schema: Arc::new(arrow_schema),
arrow_schema: fields
.map(ArrowSchema::new)
.or_else(|| Some(observed_schema))
.map(Arc::new)
.unwrap(),
pg_types: Arc::new(pg_types),
})
}
Expand Down
1 change: 1 addition & 0 deletions crates/sqlbuiltins/src/functions/table/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl TableFunc for ReadPostgres {
access,
schema,
table,
fields: None,
};
let prov = PostgresTableProvider::try_new(prov_conf)
.await
Expand Down
14 changes: 12 additions & 2 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl<'a> ExternalDispatcher<'a> {
access,
schema: schema.to_owned(),
table: name.to_owned(),
fields: None,
};
let prov = PostgresTableProvider::try_new(prov_conf).await?;
Ok(Arc::new(prov))
Expand Down Expand Up @@ -428,6 +429,9 @@ impl<'a> ExternalDispatcher<'a> {
tunnel: Option<TunnelOptions>,
schema: Option<Schema>,
) -> Result<Arc<dyn TableProvider>> {
// TODO: possibly true that if you have a schema for provider
// that doesn't support it, the schema would be ignored.

match &opts {
TableOptionsV0::Debug(TableOptionsDebug { table_type }) => {
let provider = DebugTableType::from_str(table_type)?;
Expand All @@ -454,14 +458,20 @@ impl<'a> ExternalDispatcher<'a> {

TableOptionsV0::Postgres(TableOptionsPostgres {
connection_string,
schema,
schema: pg_schema,
table,
}) => {
let access = PostgresAccess::new_from_conn_str(connection_string, tunnel);
let prov_conf = PostgresTableProviderConfig {
access,
schema: schema.to_owned(),
schema: pg_schema.to_owned(),
table: table.to_owned(),
fields: schema.map(|s| {
s.all_fields()
.into_iter()
.map(|f| Arc::new(f.to_owned()))
.collect()
}),
};
let prov = PostgresTableProvider::try_new(prov_conf).await?;
Ok(Arc::new(prov))
Expand Down
17 changes: 16 additions & 1 deletion testdata/sqllogictests_postgres/data/setup-test-postgres-db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ INSERT INTO datatypes (
-- bikeshare_stations table for testing datasources.
CREATE TABLE IF NOT EXISTS bikeshare_stations (
station_id INT,
name TEXT,
name TEXT,
status TEXT,
address TEXT,
alternate_name TEXT,
Expand Down Expand Up @@ -94,3 +94,18 @@ CREATE TABLE IF NOT EXISTS bikeshare_trips (
);

\copy bikeshare_trips FROM './testdata/sqllogictests_datasources_common/data/gcs-artifacts/bikeshare_trips.csv' CSV HEADER;

CREATE TABLE IF NOT EXISTS minimal_test (
id INT,
ordinal TEXT,
fib INT,
power INT,
answer INT
);

INSERT INTO minimal_test VALUES (1, 'first', 1, 2, 42);
INSERT INTO minimal_test VALUES (2, 'second', 1, 4, 84);
INSERT INTO minimal_test VALUES (3, 'third', 2, 8, 168);
INSERT INTO minimal_test VALUES (4, 'fourth', 3, 16, 336);
INSERT INTO minimal_test VALUES (5, 'fifth', 5, 32, 336);
INSERT INTO minimal_test VALUES (6, 'sixth', 5, 64, 672);
71 changes: 71 additions & 0 deletions testdata/sqllogictests_postgres/external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,74 @@ SELECT count(*) FROM external_table;

statement ok
DROP TABLE external_table;

statement ok
CREATE EXTERNAL TABLE external_table
FROM postgres
OPTIONS (
host = 'localhost',
port = '5433',
user = 'glaredb',
password = 'password',
database = 'glaredb_test',
schema = 'public',
table = 'bikeshare_stations',
)
COLUMNS (
station_id int,
);

query I
SELECT * FROM external_table ORDER BY station_id LIMIT 2;
----
0
11


statement ok
CREATE EXTERNAL TABLE magic_test
FROM postgres
OPTIONS (
host = 'localhost',
port = '5433',
user = 'glaredb',
password = 'password',
database = 'glaredb_test',
schema = 'public',
table = 'minimal_test',
)
COLUMNS (
id int,
power int,
);

query I
SELECT * FROM magic_test ORDER BY id LIMIT 2;
----
1 2
2 4

statement ok
CREATE EXTERNAL TABLE magic_test_type_error
FROM postgres
OPTIONS (
host = 'localhost',
port = '5433',
user = 'glaredb',
password = 'password',
database = 'glaredb_test',
schema = 'public',
table = 'minimal_test',
)
COLUMNS (
id int,
ordinal int,
power text,
);

statement error
SELECT * FROM magic_test_type_error ORDER BY id LIMIT 2;

query I
SELECT * FROM magic_test WHERE power = 42;
----
Loading