diff --git a/Cargo.lock b/Cargo.lock index 1e3172019..8c0296cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6137,6 +6137,8 @@ dependencies = [ "prost", "prost-build", "prost-types", + "serde", + "serde_json", "thiserror", "tonic", "tonic-build 0.11.0", @@ -7902,6 +7904,7 @@ dependencies = [ "num-traits", "object_store", "once_cell", + "parser", "pgrepr", "protogen", "reqwest", diff --git a/crates/catalog/src/session_catalog.rs b/crates/catalog/src/session_catalog.rs index 4886f68c8..19b78ab90 100644 --- a/crates/catalog/src/session_catalog.rs +++ b/crates/catalog/src/session_catalog.rs @@ -18,11 +18,7 @@ use protogen::metastore::types::catalog::{ TableEntry, TunnelEntry, }; -use protogen::metastore::types::options::{ - InternalColumnDefinition, - TableOptions, - TableOptionsInternal, -}; +use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsInternal}; use tracing::debug; use super::client::MetastoreClientHandle; @@ -482,12 +478,13 @@ impl TempCatalog { external: false, is_temp: true, }, - options: TableOptions::Internal(TableOptionsInternal { - columns: columns.to_owned(), - }), + options: TableOptionsInternal { + columns: columns.clone(), + } + .into(), tunnel_id: None, access_mode: SourceAccessMode::ReadWrite, - columns: Some(columns.to_owned()), + columns: Some(columns), } }) } @@ -526,12 +523,13 @@ impl TempCatalog { external: false, is_temp: true, }, - options: TableOptions::Internal(TableOptionsInternal { + options: TableOptionsInternal { columns: Vec::new(), - }), + } + .into(), tunnel_id: None, access_mode: SourceAccessMode::ReadWrite, - columns: Some(Vec::new()), + columns: None, }); } diff --git a/crates/datasources/src/bson/table.rs b/crates/datasources/src/bson/table.rs index ff17e02e3..ed72032e0 100644 --- a/crates/datasources/src/bson/table.rs +++ b/crates/datasources/src/bson/table.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use bson::RawDocumentBuf; use bytes::BytesMut; -use datafusion::arrow::datatypes::{FieldRef, Schema}; +use datafusion::arrow::datatypes::Schema; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::TableProvider; use datafusion::parquet::data_type::AsBytes; @@ -20,12 +20,12 @@ use crate::object_store::{ObjStoreAccess, ObjStoreAccessor}; pub async fn bson_streaming_table( store_access: Arc, source_url: DatasourceUrl, - fields: Option>, + schema: Option, schema_inference_sample_size: Option, ) -> Result, BsonError> { // TODO: set a maximum (1024?) or have an adaptive mode // (at least n but stop after n the same) or skip documents - let sample_size = if fields.is_some() { + let sample_size = if schema.is_some() { 0 } else { schema_inference_sample_size.unwrap_or(100) @@ -93,8 +93,8 @@ pub async fn bson_streaming_table( let mut streams = Vec::>::with_capacity(readers.len() + 1); // get the schema; if provided as an argument, just use that, otherwise, sample. - let schema = if let Some(fields) = fields { - Arc::new(Schema::new(fields)) + let schema = if let Some(schema) = schema { + Arc::new(schema) } else { // iterate through the readers and build up a sample of the first // documents to be used to infer the schema. diff --git a/crates/datasources/src/debug/errors.rs b/crates/datasources/src/debug/errors.rs index 932cdec48..39730367c 100644 --- a/crates/datasources/src/debug/errors.rs +++ b/crates/datasources/src/debug/errors.rs @@ -8,4 +8,5 @@ pub enum DebugError { InvalidTunnel(String), } + pub type Result = std::result::Result; diff --git a/crates/datasources/src/lib.rs b/crates/datasources/src/lib.rs index d84a248ec..185b5ca52 100644 --- a/crates/datasources/src/lib.rs +++ b/crates/datasources/src/lib.rs @@ -1,4 +1,5 @@ //! Data source implementations. + pub mod bigquery; pub mod bson; pub mod cassandra; diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index 26b65340c..4a41e4340 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -30,7 +30,7 @@ use object_store::prefix::PrefixStore; use object_store::ObjectStore; use object_store_util::shared::SharedObjectStore; use protogen::metastore::types::catalog::TableEntry; -use protogen::metastore::types::options::{TableOptions, TableOptionsInternal}; +use protogen::metastore::types::options::{TableOptionsInternal, TableOptionsV0}; use serde_json::{json, Value}; use url::Url; use uuid::Uuid; @@ -262,12 +262,11 @@ impl NativeTableStorage { Ok(x.next().await.is_some()) } - fn opts_from_ent(table: &TableEntry) -> Result<&TableOptionsInternal> { - let opts = match &table.options { - TableOptions::Internal(opts) => opts, - _ => return Err(NativeError::NotNative(table.clone())), - }; - Ok(opts) + fn opts_from_ent(table: &TableEntry) -> Result { + match table.options { + TableOptionsV0::Internal(ref opts) => Ok(opts.clone()), + _ => Err(NativeError::NotNative(table.clone())), + } } fn create_delta_store_for_table(&self, table: &TableEntry) -> Arc { @@ -498,11 +497,7 @@ mod tests { use deltalake::protocol::SaveMode; use object_store_util::conf::StorageConfig; use protogen::metastore::types::catalog::{EntryMeta, EntryType, SourceAccessMode, TableEntry}; - use protogen::metastore::types::options::{ - InternalColumnDefinition, - TableOptions, - TableOptionsInternal, - }; + use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsInternal}; use tempfile::tempdir; use url::Url; use uuid::Uuid; @@ -533,13 +528,14 @@ mod tests { external: false, is_temp: false, }, - options: TableOptions::Internal(TableOptionsInternal { + options: TableOptionsInternal { columns: vec![InternalColumnDefinition { name: "id".to_string(), nullable: true, arrow_type: DataType::Int32, }], - }), + } + .into(), tunnel_id: None, access_mode: SourceAccessMode::ReadOnly, columns: None, diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 527298a6a..b80e590e1 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -25,7 +25,7 @@ use futures::StreamExt; use glob::{MatchOptions, Pattern}; use object_store::path::Path as ObjectStorePath; use object_store::{ObjectMeta, ObjectStore}; -use protogen::metastore::types::options::{TableOptions, TableOptionsObjectStore}; +use protogen::metastore::types::options::{TableOptionsObjectStore, TableOptionsV0}; use self::azure::AzureStoreAccess; use self::glob_util::{get_resolved_patterns, ResolvedPattern}; @@ -380,26 +380,26 @@ pub fn file_type_from_path(path: &ObjectStorePath) -> Result { pub fn init_session_registry<'a>( runtime: &RuntimeEnv, - entries: impl Iterator, + entries: impl Iterator, ) -> Result<()> { for opts in entries { let access: Arc = match opts { - TableOptions::Local(_) => Arc::new(LocalStoreAccess), + TableOptionsV0::Local(_) => Arc::new(LocalStoreAccess), // TODO: Consider consolidating Gcs, S3 and Delta and Iceberg `TableOptions` and // `ObjStoreAccess` since they largely overlap - TableOptions::Gcs(opts) => Arc::new(GcsStoreAccess { + TableOptionsV0::Gcs(opts) => Arc::new(GcsStoreAccess { bucket: opts.bucket.clone(), service_account_key: opts.service_account_key.clone(), opts: HashMap::new(), }), - TableOptions::S3(opts) => Arc::new(S3StoreAccess { + TableOptionsV0::S3(opts) => Arc::new(S3StoreAccess { bucket: opts.bucket.clone(), region: Some(opts.region.clone()), access_key_id: opts.access_key_id.clone(), secret_access_key: opts.secret_access_key.clone(), opts: HashMap::new(), }), - TableOptions::Azure(TableOptionsObjectStore { + TableOptionsV0::Azure(TableOptionsObjectStore { location, storage_options, .. @@ -407,22 +407,22 @@ pub fn init_session_registry<'a>( let uri = DatasourceUrl::try_new(location)?; Arc::new(AzureStoreAccess::try_from_uri(&uri, storage_options)?) } - TableOptions::Delta(TableOptionsObjectStore { + TableOptionsV0::Delta(TableOptionsObjectStore { location, storage_options, .. }) - | TableOptions::Iceberg(TableOptionsObjectStore { + | TableOptionsV0::Iceberg(TableOptionsObjectStore { location, storage_options, .. }) - | TableOptions::Lance(TableOptionsObjectStore { + | TableOptionsV0::Lance(TableOptionsObjectStore { location, storage_options, .. }) - | TableOptions::Bson(TableOptionsObjectStore { + | TableOptionsV0::Bson(TableOptionsObjectStore { location, storage_options, .. @@ -434,18 +434,18 @@ pub fn init_session_registry<'a>( // Continue on all others. Explicitly mentioning all the left // over options so we don't forget adding object stores that are // supported in the future (like azure). - TableOptions::Internal(_) - | TableOptions::Debug(_) - | TableOptions::Postgres(_) - | TableOptions::BigQuery(_) - | TableOptions::Mysql(_) - | TableOptions::MongoDb(_) - | TableOptions::Snowflake(_) - | TableOptions::SqlServer(_) - | TableOptions::Clickhouse(_) - | TableOptions::Cassandra(_) - | TableOptions::Excel(_) - | TableOptions::Sqlite(_) => continue, + TableOptionsV0::Internal(_) + | TableOptionsV0::Debug(_) + | TableOptionsV0::Postgres(_) + | TableOptionsV0::BigQuery(_) + | TableOptionsV0::Mysql(_) + | TableOptionsV0::MongoDb(_) + | TableOptionsV0::Snowflake(_) + | TableOptionsV0::SqlServer(_) + | TableOptionsV0::Clickhouse(_) + | TableOptionsV0::Cassandra(_) + | TableOptionsV0::Excel(_) + | TableOptionsV0::Sqlite(_) => continue, }; let base_url = access.base_url()?; diff --git a/crates/glaredb/tests/catalog_compat.rs b/crates/glaredb/tests/catalog_compat.rs new file mode 100644 index 000000000..c9cba7800 --- /dev/null +++ b/crates/glaredb/tests/catalog_compat.rs @@ -0,0 +1,37 @@ +mod setup; +use crate::setup::make_cli; + + +#[test] +/// Assert that we can still load the old catalog without panicking +fn test_catalog_backwards_compat() { + let mut cmd = make_cli(); + let pwd = std::env::current_dir().unwrap(); + let root_dir = pwd.parent().unwrap().parent().unwrap(); + let old_catalog = root_dir.join("testdata/catalog_compat/v0"); + cmd.args(["-l", old_catalog.to_str().unwrap()]) + .assert() + .success(); +} + + +#[test] +/// Make sure that we can read the table options from the old catalog +/// The v0 catalog has a table created from the following SQL: +/// ```sql +/// CREATE EXTERNAL TABLE debug_table +/// FROM debug OPTIONS ( +/// table_type 'never_ending' +/// ); +/// ``` +fn test_catalog_backwards_compat_tbl_options() { + let mut cmd = make_cli(); + let pwd = std::env::current_dir().unwrap(); + let root_dir = pwd.parent().unwrap().parent().unwrap(); + let old_catalog = root_dir.join("testdata/catalog_compat/v0"); + + let query = "SELECT * FROM debug_table LIMIT 1"; + cmd.args(["-l", old_catalog.to_str().unwrap(), "-q", query]) + .assert() + .success(); +} diff --git a/crates/metastore/src/client.rs b/crates/metastore/src/client.rs index c1c9a3fd8..ee9a482e5 100644 --- a/crates/metastore/src/client.rs +++ b/crates/metastore/src/client.rs @@ -359,7 +359,6 @@ impl StatefulWorker { .map_err(CatalogError::from), Err(e) => Err(CatalogError::new(e.to_string())), }; - let result = match result { Ok(resp) => { let resp = resp.into_inner(); diff --git a/crates/metastore/src/database.rs b/crates/metastore/src/database.rs index a3579d153..a01663503 100644 --- a/crates/metastore/src/database.rs +++ b/crates/metastore/src/database.rs @@ -19,11 +19,12 @@ use protogen::metastore::types::catalog::{ TableEntry, TunnelEntry, ViewEntry, + CURRENT_CATALOG_VERSION, }; use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsInternal, - TableOptions, + TableOptionsInternal, TunnelOptions, }; use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation, Mutation}; @@ -71,6 +72,7 @@ static BUILTIN_CATALOG: Lazy = Lazy::new(|| BuiltinCatalog::new( /// 2. Persistence is managed via leases in object storage. /// /// The source of truth for a database catalog is always what's in object store. +#[derive(Debug)] pub struct DatabaseCatalog { db_id: Uuid, @@ -182,6 +184,7 @@ impl DatabaseCatalog { version: guard.version, entries: guard.entries.as_ref().clone(), deployment: guard.deployment.clone(), + catalog_version: CURRENT_CATALOG_VERSION, } } @@ -529,6 +532,7 @@ impl State { .into_iter() .filter(|(_, ent)| !ent.get_meta().builtin) .collect(), + catalog_version: CURRENT_CATALOG_VERSION, }, extra: ExtraState { oid_counter: self.oid_counter, @@ -853,7 +857,7 @@ impl State { external: false, is_temp: false, }, - options: TableOptions::Internal(create_table.options), + options: create_table.options.into(), tunnel_id: None, access_mode: SourceAccessMode::ReadWrite, columns: None, @@ -894,10 +898,10 @@ impl State { external: true, is_temp: false, }, - options: create_ext.options, + options: create_ext.options.clone(), tunnel_id, access_mode: SourceAccessMode::ReadOnly, - columns: None, + columns: create_ext.columns, }; let policy = CreatePolicy::new(create_ext.if_not_exists, create_ext.or_replace)?; @@ -1266,7 +1270,10 @@ impl BuiltinCatalog { external: false, is_temp: false, }, - options: TableOptions::new_internal(table.columns.clone()), + options: TableOptionsInternal { + columns: table.columns.clone(), + } + .into(), tunnel_id: None, access_mode: SourceAccessMode::ReadOnly, columns: None, @@ -1841,12 +1848,11 @@ mod tests { let mutation = Mutation::CreateExternalTable(CreateExternalTable { schema: "mushroom".to_string(), name: "bowser".to_string(), - options: TableOptions::Debug(TableOptionsDebug { - table_type: String::new(), - }), + options: TableOptionsDebug::default().into(), if_not_exists: true, or_replace: false, tunnel: None, + columns: None, }); let _ = db .try_mutate(state.version, vec![mutation.clone(), mutation]) @@ -1998,12 +2004,11 @@ mod tests { vec![Mutation::CreateExternalTable(CreateExternalTable { schema: "public".to_string(), name: "read_postgres".to_string(), - options: TableOptions::Debug(TableOptionsDebug { - table_type: String::new(), - }), + options: TableOptionsDebug::default().into(), if_not_exists: true, or_replace: false, tunnel: None, + columns: None, })], ) .await diff --git a/crates/metastore/src/srv.rs b/crates/metastore/src/srv.rs index 559fe2c96..5a19d80d5 100644 --- a/crates/metastore/src/srv.rs +++ b/crates/metastore/src/srv.rs @@ -19,7 +19,6 @@ use uuid::Uuid; use crate::database::DatabaseCatalog; use crate::errors::MetastoreError; use crate::storage::persist::Storage; - /// Metastore GRPC service. pub struct Service { /// Reference to underlying object storage. @@ -96,17 +95,17 @@ impl MetastoreService for Service { request: Request, ) -> Result, Status> { let req = request.into_inner(); - debug!(?req, "mutate catalog"); + let id = Uuid::from_slice(&req.db_id) .map_err(|_| MetastoreError::InvalidDatabaseId(req.db_id))?; let catalog = self.get_or_load_catalog(id).await?; + let mutations = req .mutations .into_iter() .map(|m| Mutation::try_from(m).map_err(MetastoreError::from)) .collect::>()?; - // TODO: Catch error and return status. let updated = catalog.try_mutate(req.catalog_version, mutations).await?; @@ -118,6 +117,7 @@ impl MetastoreService for Service { } } + #[cfg(test)] mod tests { use object_store::memory::InMemory; diff --git a/crates/metastore/src/storage/persist.rs b/crates/metastore/src/storage/persist.rs index 9e009838f..70b83a11b 100644 --- a/crates/metastore/src/storage/persist.rs +++ b/crates/metastore/src/storage/persist.rs @@ -6,7 +6,11 @@ use object_store::{Error as ObjectStoreError, ObjectStore}; use pgrepr::oid::FIRST_AVAILABLE_ID; use prost::Message; use protogen::gen::metastore::storage; -use protogen::metastore::types::catalog::{CatalogState, DeploymentMetadata}; +use protogen::metastore::types::catalog::{ + CatalogState, + DeploymentMetadata, + CURRENT_CATALOG_VERSION, +}; use protogen::metastore::types::storage::{CatalogMetadata, ExtraState, PersistedCatalog}; use tracing::{debug, error}; use uuid::Uuid; @@ -75,6 +79,7 @@ impl Storage { version: 0, entries: HashMap::new(), deployment: DeploymentMetadata { storage_size: 0 }, + catalog_version: CURRENT_CATALOG_VERSION, }, extra: ExtraState { oid_counter: FIRST_AVAILABLE_ID, diff --git a/crates/protogen/Cargo.toml b/crates/protogen/Cargo.toml index ed5b4f5c3..614a60d1b 100644 --- a/crates/protogen/Cargo.toml +++ b/crates/protogen/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true workspace = true [dependencies] -datafusion = { workspace = true } +datafusion = { workspace = true, features = ["serde"] } datafusion-proto = { workspace = true } thiserror.workspace = true tonic = { workspace = true } @@ -15,7 +15,8 @@ prost = { workspace = true } prost-types = { workspace = true } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } tracing = { workspace = true } - +serde_json = { workspace = true } +serde = { workspace = true } [build-dependencies] tonic-build = "0.11" # Only needed to handle custom btree mapping; can be removed when we bump the tonic version which will have this too diff --git a/crates/protogen/proto/common/arrow.proto b/crates/protogen/proto/common/arrow.proto index a41e0c853..0b3bdfc4a 100644 --- a/crates/protogen/proto/common/arrow.proto +++ b/crates/protogen/proto/common/arrow.proto @@ -10,6 +10,7 @@ option go_package = "github.com/glaredb/cloud/pkg/protogen/common"; message Schema { repeated Field columns = 1; + map metadata = 2; } message Field { diff --git a/crates/protogen/proto/metastore/catalog.proto b/crates/protogen/proto/metastore/catalog.proto index 3f43c512b..2172aaa83 100644 --- a/crates/protogen/proto/metastore/catalog.proto +++ b/crates/protogen/proto/metastore/catalog.proto @@ -39,6 +39,12 @@ import "common/arrow.proto"; // The state of the catalog at some version. message CatalogState { + // The version of the actual catalog implementation. + // This is incremented when there are incompatible changes to the physical + // catalog layout. + // Purposely set to a high number to avoid accidental collisions + optional uint32 catalog_version = 2047; + // Version of this catalog. Increments on every mutation. uint64 version = 1; @@ -49,7 +55,6 @@ message CatalogState { // Metadata for the deployment. DeploymentMetadata deployment = 3; - // next: 4 } @@ -160,11 +165,15 @@ message TableEntry { reserved 2; // Column fields - options.TableOptions options = 3; + // The old table options. This is deprecated and should not be used. + // It is only here for backwards compatibility. + options.TableOptionsV0 options_v0 = 3; optional uint32 tunnel_id = 4; SourceAccessMode access_mode = 5; repeated options.InternalColumnDefinition columns = 6; - // next: 7 + // The new table options. + options.TableOptionsV1 options = 7; + // next: 8 } message ViewEntry { diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index 95a971f33..54cb4ffc4 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -72,7 +72,6 @@ message DatabaseOptionsMysql { message DatabaseOptionsMongoDb { string connection_string = 1; - repeated InternalColumnDefinition columns = 2; } message DatabaseOptionsSqlServer { @@ -121,9 +120,15 @@ message StorageOptions { map inner = 1; } +message TableOptionsV1 { + string name = 1; + bytes options = 2; + uint32 version = 3; +} + // Table options -message TableOptions { +message TableOptionsV0 { oneof options { TableOptionsInternal internal = 1; TableOptionsDebug debug = 2; @@ -205,10 +210,9 @@ message TableOptionsMongo { string connection_string = 1; string database = 2; string collection = 3; - repeated InternalColumnDefinition columns = 4; } -message TableOptionsExcel{ +message TableOptionsExcel { string location = 1; StorageOptions storage_options = 2; optional string file_type = 3; @@ -244,8 +248,8 @@ message TableOptionsObjectStore { // Optional: number of records to sample for formats with inferred schema. optional int64 schema_sample_size = 5; - // (optional) explit schema - repeated InternalColumnDefinition columns = 6; + // explit schema (not used anymore) + reserved 6; optional string name = 7; } diff --git a/crates/protogen/proto/metastore/service.proto b/crates/protogen/proto/metastore/service.proto index f4cf2ae99..d60214c78 100644 --- a/crates/protogen/proto/metastore/service.proto +++ b/crates/protogen/proto/metastore/service.proto @@ -8,6 +8,7 @@ option go_package = "github.com/glaredb/cloud/pkg/protogen/metastore"; import "metastore/catalog.proto"; import "metastore/options.proto"; +import "common/arrow.proto"; message FetchCatalogRequest { // ID of the database catalog to fetch. @@ -91,11 +92,12 @@ message CreateFunction { message CreateExternalTable { string schema = 1; string name = 2; - options.TableOptions options = 3; + options.TableOptionsV0 options = 3; bool if_not_exists = 4; optional string tunnel = 5; bool or_replace = 6; - // next: 7 + repeated options.InternalColumnDefinition columns = 7; + // next: 8 } message CreateExternalDatabase { diff --git a/crates/protogen/src/common/arrow.rs b/crates/protogen/src/common/arrow.rs index 091eff993..de4287e21 100644 --- a/crates/protogen/src/common/arrow.rs +++ b/crates/protogen/src/common/arrow.rs @@ -7,6 +7,7 @@ use datafusion::arrow::datatypes::{ DataType, Field, IntervalUnit, + Schema, TimeUnit, UnionFields, UnionMode, @@ -252,6 +253,35 @@ impl TryFrom<&Field> for arrow::Field { } } + +impl TryFrom<&arrow::Schema> for Schema { + type Error = ProtoConvError; + + fn try_from(schema: &arrow::Schema) -> Result { + let fields = schema + .columns + .iter() + .map(Field::try_from) + .collect::, _>>()?; + Ok(Self::new_with_metadata(fields, schema.metadata.clone())) + } +} + +impl TryFrom<&Schema> for arrow::Schema { + type Error = ProtoConvError; + + fn try_from(schema: &Schema) -> Result { + Ok(Self { + columns: schema + .fields() + .iter() + .map(|f| f.as_ref().try_into()) + .collect::, ProtoConvError>>()?, + metadata: schema.metadata.clone(), + }) + } +} + impl From for TimeUnit { fn from(time_unit: arrow::TimeUnit) -> Self { match time_unit { diff --git a/crates/protogen/src/lib.rs b/crates/protogen/src/lib.rs index 172a01c3f..ec79008a0 100644 --- a/crates/protogen/src/lib.rs +++ b/crates/protogen/src/lib.rs @@ -8,9 +8,8 @@ pub mod common; pub mod metastore; -pub mod sqlexec; - pub mod rpcsrv; +pub mod sqlexec; pub use errors::ProtoConvError; pub mod errors { @@ -23,6 +22,9 @@ pub mod errors { #[error("Unknown enum variant for '{0}': {1}")] UnknownEnumVariant(&'static str, i32), + #[error("Unknown variant for '{0}'")] + UnknownVariant(String), + #[error("Received zero-value enum variant for '{0}'")] ZeroValueEnumVariant(&'static str), diff --git a/crates/protogen/src/metastore/types/catalog.rs b/crates/protogen/src/metastore/types/catalog.rs index 2ed19388c..1002d7079 100644 --- a/crates/protogen/src/metastore/types/catalog.rs +++ b/crates/protogen/src/metastore/types/catalog.rs @@ -10,19 +10,31 @@ use super::options::{ CredentialsOptions, DatabaseOptions, InternalColumnDefinition, - TableOptions, - TableOptionsInternal, + TableOptionsV0, TunnelOptions, }; use crate::gen::common::arrow::ArrowType; use crate::gen::metastore::catalog::{self, type_signature}; -use crate::{FromOptionalField, ProtoConvError}; +use crate::{gen, FromOptionalField, ProtoConvError}; + +/// The current version of the catalog IMPLEMENTATION +/// this is incremented every time there is a breaking change physical representation of the catalog. +pub const CURRENT_CATALOG_VERSION: u32 = 1; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CatalogState { + /// the version of the catalog STATE. + /// This version corresponds the state of the catalog and is incremented every time there is a change to the catalog. + /// It is associated with the "data" in the catalog pub version: u64, pub entries: HashMap, pub deployment: DeploymentMetadata, + /// This is the version of the catalog IMPLEMENTATION. + /// any new code paths should generally use [`CURRENT_CATALOG_VERSION`] unless it is specifically + /// for handling older versions of the catalog. + /// unlike the `version` field, this is only incremented when the physical representation of the catalog changes. + /// it is associated with the "code" that represents the catalog. + pub catalog_version: u32, } impl TryFrom for CatalogState { @@ -47,6 +59,7 @@ impl TryFrom for CatalogState { version: value.version, entries, deployment, + catalog_version: value.catalog_version.unwrap_or(0), }) } } @@ -65,6 +78,7 @@ impl TryFrom for catalog::CatalogState { }) .collect::>()?, deployment: Some(value.deployment.try_into()?), + catalog_version: Some(value.catalog_version), }) } } @@ -177,7 +191,7 @@ impl TryFrom for catalog::CatalogEntry { CatalogEntry::Database(v) => catalog::catalog_entry::Entry::Database(v.into()), CatalogEntry::Schema(v) => catalog::catalog_entry::Entry::Schema(v.into()), CatalogEntry::View(v) => catalog::catalog_entry::Entry::View(v.into()), - CatalogEntry::Table(v) => catalog::catalog_entry::Entry::Table(v.try_into()?), + CatalogEntry::Table(v) => catalog::catalog_entry::Entry::Table(v.into()), CatalogEntry::Tunnel(v) => catalog::catalog_entry::Entry::Tunnel(v.into()), CatalogEntry::Function(v) => catalog::catalog_entry::Entry::Function(v.into()), CatalogEntry::Credentials(v) => catalog::catalog_entry::Entry::Credentials(v.into()), @@ -438,7 +452,7 @@ impl From for catalog::SchemaEntry { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct TableEntry { pub meta: EntryMeta, - pub options: TableOptions, + pub options: TableOptionsV0, pub tunnel_id: Option, pub access_mode: SourceAccessMode, pub columns: Option>, @@ -447,21 +461,12 @@ pub struct TableEntry { impl TableEntry { /// Try to get the columns for this table if available. pub fn get_internal_columns(&self) -> Option> { - match &self.options { - TableOptions::Internal(TableOptionsInternal { columns, .. }) => { - Some(columns.to_owned()) - } - _ => { - if let Some(columns) = self.columns.as_ref() { - let mut out = Vec::with_capacity(columns.len()); - out.extend_from_slice(columns.as_slice()); - Some(out) - } else { - None - } - } + match self.options { + TableOptionsV0::Internal(ref options) => Some(options.columns.clone()), + _ => None, } } + pub fn get_columns(&self) -> Option> { self.get_internal_columns().map(|val| { val.iter() @@ -472,15 +477,22 @@ impl TableEntry { }) } } - impl TryFrom for TableEntry { type Error = ProtoConvError; fn try_from(value: catalog::TableEntry) -> Result { let meta: EntryMeta = value.meta.required("meta")?; - let mut columns = Vec::with_capacity(value.columns.len()); - for col in value.columns { - columns.push(col.try_into()?); + if value.options.is_some() { + return Err(ProtoConvError::UnsupportedSerialization( + "new table options not suppported", + )); } + + let columns: Vec = value + .columns + .into_iter() + .map(|c| c.try_into()) + .collect::>()?; + let columns = if columns.is_empty() { None } else { @@ -489,7 +501,7 @@ impl TryFrom for TableEntry { Ok(TableEntry { meta, - options: value.options.required("options".to_string())?, + options: value.options_v0.required("options_v0")?, tunnel_id: value.tunnel_id, access_mode: value.access_mode.try_into()?, columns, @@ -497,25 +509,23 @@ impl TryFrom for TableEntry { } } -impl TryFrom for catalog::TableEntry { - type Error = ProtoConvError; - fn try_from(value: TableEntry) -> Result { - let columns = if let Some(columns) = value.columns { - let mut out = Vec::with_capacity(columns.len()); - for col in columns { - out.push(col.try_into()?); - } - out - } else { - Vec::new() - }; - Ok(catalog::TableEntry { +impl From for catalog::TableEntry { + fn from(value: TableEntry) -> Self { + let columns: Vec = value + .columns + .unwrap_or_default() + .into_iter() + .map(Into::into) + .collect::>(); + + catalog::TableEntry { meta: Some(value.meta.into()), - options: Some(value.options.try_into()?), + options_v0: Some(value.options.try_into().unwrap()), tunnel_id: value.tunnel_id, access_mode: value.access_mode.into(), + options: None, columns, - }) + } } } @@ -894,6 +904,7 @@ mod tests { version: 4, entries: HashMap::new(), deployment: None, + catalog_version: None, }; let converted: CatalogState = state.try_into().unwrap(); @@ -901,6 +912,7 @@ mod tests { version: 4, entries: HashMap::new(), deployment: DeploymentMetadata { storage_size: 0 }, + catalog_version: 0, }; assert_eq!(expected, converted); diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index a91e5ebd4..003d69840 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -1,15 +1,18 @@ use std::collections::BTreeMap; -use std::fmt; +use std::fmt::{self, Display}; +use std::hash::Hash; use std::sync::Arc; use datafusion::arrow::datatypes::{DataType, Field, Fields, SchemaRef}; use datafusion::common::DFSchemaRef; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use crate::gen::common::arrow; use crate::gen::metastore::options; use crate::{FromOptionalField, ProtoConvError}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct InternalColumnDefinition { pub name: String, pub nullable: bool, @@ -35,18 +38,15 @@ impl InternalColumnDefinition { .collect() } - /// Create a vec of column definitions from arrow fields. - pub fn from_arrow_fields(cols: C) -> Vec - where - C: IntoIterator>, - { - cols.into_iter() - .map(|field| InternalColumnDefinition { - name: field.name().clone(), - nullable: field.is_nullable(), - arrow_type: field.data_type().clone(), - }) - .collect() + /// Create an iterator of column definitions from arrow fields. + pub fn from_arrow_fields( + fields: &Fields, + ) -> impl Iterator + '_ { + fields.into_iter().map(|field| InternalColumnDefinition { + name: field.name().clone(), + nullable: field.is_nullable(), + arrow_type: field.data_type().clone(), + }) } /// Create a vec of column definitions from arrow fields. @@ -54,9 +54,13 @@ impl InternalColumnDefinition { where C: IntoIterator, { - cols.into_iter() - .map(|col| Arc::new(Field::new(col.name, col.arrow_type, col.nullable))) - .collect() + cols.into_iter().map(|col| Arc::new(col.into())).collect() + } +} + +impl From for Field { + fn from(value: InternalColumnDefinition) -> Self { + Field::new(value.name, value.arrow_type, value.nullable) } } @@ -64,6 +68,7 @@ impl TryFrom for InternalColumnDefinition { type Error = ProtoConvError; fn try_from(value: options::InternalColumnDefinition) -> Result { let arrow_type: DataType = value.arrow_type.as_ref().required("arrow_type")?; + Ok(InternalColumnDefinition { name: value.name, nullable: value.nullable, @@ -72,21 +77,23 @@ impl TryFrom for InternalColumnDefinition { } } -impl TryFrom for options::InternalColumnDefinition { - type Error = ProtoConvError; - fn try_from(value: InternalColumnDefinition) -> Result { - let arrow_type = arrow::ArrowType::try_from(&value.arrow_type)?; - Ok(options::InternalColumnDefinition { +impl From for options::InternalColumnDefinition { + fn from(value: InternalColumnDefinition) -> Self { + // We don't support any of the unserializable arrow types + // So this should always be infallible + let arrow_type = + arrow::ArrowType::try_from(&value.arrow_type).expect("Arrow type must be serializable"); + options::InternalColumnDefinition { name: value.name, nullable: value.nullable, arrow_type: Some(arrow_type), - }) + } } } // Database options -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum DatabaseOptions { Internal(DatabaseOptionsInternal), Debug(DatabaseOptionsDebug), @@ -218,7 +225,7 @@ impl From for options::DatabaseOptions { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsInternal {} impl TryFrom for DatabaseOptionsInternal { @@ -234,7 +241,7 @@ impl From for options::DatabaseOptionsInternal { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsDebug {} impl TryFrom for DatabaseOptionsDebug { @@ -250,7 +257,7 @@ impl From for options::DatabaseOptionsDebug { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsPostgres { pub connection_string: String, } @@ -272,7 +279,7 @@ impl From for options::DatabaseOptionsPostgres { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsBigQuery { pub service_account_key: String, pub project_id: String, @@ -297,7 +304,7 @@ impl From for options::DatabaseOptionsBigQuery { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsMysql { pub connection_string: String, } @@ -319,10 +326,9 @@ impl From for options::DatabaseOptionsMysql { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsMongoDb { pub connection_string: String, - pub columns: Vec, } impl TryFrom for DatabaseOptionsMongoDb { @@ -330,11 +336,6 @@ impl TryFrom for DatabaseOptionsMongoDb { fn try_from(value: options::DatabaseOptionsMongoDb) -> Result { Ok(DatabaseOptionsMongoDb { connection_string: value.connection_string, - columns: value - .columns - .iter() - .map(|i| self::InternalColumnDefinition::try_from(i.to_owned())) - .collect::>()?, }) } } @@ -343,17 +344,11 @@ impl From for options::DatabaseOptionsMongoDb { fn from(value: DatabaseOptionsMongoDb) -> Self { options::DatabaseOptionsMongoDb { connection_string: value.connection_string, - columns: value - .columns - .into_iter() - .map(|v| v.try_into()) - .collect::>() - .unwrap_or_else(|_| Vec::new()), } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsSqlServer { pub connection_string: String, } @@ -375,7 +370,7 @@ impl From for options::DatabaseOptionsSqlServer { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsClickhouse { pub connection_string: String, } @@ -396,7 +391,7 @@ impl From for options::DatabaseOptionsClickhouse { } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsCassandra { pub host: String, pub username: Option, @@ -424,7 +419,7 @@ impl From for options::DatabaseOptionsCassandra { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsSnowflake { pub account_name: String, pub login_name: String, @@ -461,7 +456,7 @@ impl From for options::DatabaseOptionsSnowflake { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsDeltaLake { pub catalog: DeltaLakeCatalog, pub storage_options: StorageOptions, @@ -488,7 +483,7 @@ impl From for options::DatabaseOptionsDeltaLake { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum DeltaLakeCatalog { Unity(DeltaLakeUnityCatalog), } @@ -514,7 +509,7 @@ impl From for options::database_options_delta_lake::Catalog { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DeltaLakeUnityCatalog { pub catalog_id: String, pub databricks_access_token: String, @@ -554,8 +549,9 @@ impl From for options::DeltaLakeUnityCatalog { /// - [Azure options](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants) /// - [S3 options](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants) /// - [Google options](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants) -#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct StorageOptions { + #[serde(flatten)] pub inner: BTreeMap, } @@ -585,13 +581,97 @@ impl From for options::StorageOptions { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TableOptionsV1 { + pub name: String, + pub options: Vec, + pub version: u32, +} -// Table options +pub trait TableOptionsImpl: Serialize + DeserializeOwned { + const NAME: &'static str; + const VERSION: u32 = 1; +} -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum TableOptions { - Internal(TableOptionsInternal), +impl Display for TableOptionsV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name) + } +} + +impl TableOptionsV1 { + pub fn as_str(&self) -> &str { + &self.name + } + + pub fn new(options: &S) -> Self { + let options = serde_json::to_vec(options).expect("options must serialize"); + + TableOptionsV1 { + name: S::NAME.to_string(), + options, + version: S::VERSION, + } + } + + pub fn extract(&self) -> Result { + if self.name != T::NAME { + return Err(ProtoConvError::ParseError(format!( + "Expected table options of type {}, got {}", + T::NAME, + self.name + ))); + } + serde_json::from_slice(&self.options).map_err(|e| ProtoConvError::ParseError(e.to_string())) + } + + /// Extract the table options to a specific type. + /// This does not check the variant of the table options, so this will panic if the variant is not the expected one. + /// Use `extract` if you want to handle the error case. + /// This should only be used if the variant is checked elsewhere. + pub fn extract_unchecked(&self) -> T { + serde_json::from_slice(&self.options) + .expect("Options should infallibly deserialize. This indicates a programming error.") + } +} + +impl From for TableOptionsV1 +where + T: TableOptionsImpl, +{ + fn from(value: T) -> Self { + TableOptionsV1::new(&value) + } +} + +impl TryFrom for TableOptionsV1 { + type Error = ProtoConvError; + fn try_from(value: options::TableOptionsV1) -> Result { + let name = value.name; + let values = value.options; + + Ok(TableOptionsV1 { + options: values, + name, + version: value.version, + }) + } +} + +impl From for options::TableOptionsV1 { + fn from(value: TableOptionsV1) -> Self { + options::TableOptionsV1 { + name: value.name, + options: value.options, + version: value.version, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum TableOptionsV0 { Debug(TableOptionsDebug), + Internal(TableOptionsInternal), Postgres(TableOptionsPostgres), BigQuery(TableOptionsBigQuery), Mysql(TableOptionsMysql), @@ -612,9 +692,9 @@ pub enum TableOptions { Sqlite(TableOptionsObjectStore), } -impl TableOptions { - pub const INTERNAL: &'static str = "internal"; +impl TableOptionsV0 { pub const DEBUG: &'static str = "debug"; + pub const INTERNAL: &'static str = "internal"; pub const POSTGRES: &'static str = "postgres"; pub const BIGQUERY: &'static str = "bigquery"; pub const MYSQL: &'static str = "mysql"; @@ -634,121 +714,299 @@ impl TableOptions { pub const EXCEL: &'static str = "excel"; pub const SQLITE: &'static str = "sqlite"; - pub const fn new_internal(columns: Vec) -> TableOptions { - TableOptions::Internal(TableOptionsInternal { columns }) + pub const fn new_internal(columns: Vec) -> TableOptionsV0 { + TableOptionsV0::Internal(TableOptionsInternal { columns }) } pub fn as_str(&self) -> &'static str { match self { - TableOptions::Internal(_) => Self::INTERNAL, - TableOptions::Debug(_) => Self::DEBUG, - TableOptions::Postgres(_) => Self::POSTGRES, - TableOptions::BigQuery(_) => Self::BIGQUERY, - TableOptions::Mysql(_) => Self::MYSQL, - TableOptions::Local(_) => Self::LOCAL, - TableOptions::Gcs(_) => Self::GCS, - TableOptions::S3(_) => Self::S3_STORAGE, - TableOptions::MongoDb(_) => Self::MONGODB, - TableOptions::Snowflake(_) => Self::SNOWFLAKE, - TableOptions::Delta(_) => Self::DELTA, - TableOptions::Iceberg(_) => Self::ICEBERG, - TableOptions::Azure(_) => Self::AZURE, - TableOptions::SqlServer(_) => Self::SQL_SERVER, - TableOptions::Lance(_) => Self::LANCE, - TableOptions::Bson(_) => Self::BSON, - TableOptions::Clickhouse(_) => Self::CLICKHOUSE, - TableOptions::Cassandra(_) => Self::CASSANDRA, - TableOptions::Excel(_) => Self::EXCEL, - TableOptions::Sqlite(_) => Self::SQLITE, + TableOptionsV0::Debug(_) => Self::DEBUG, + TableOptionsV0::Internal(_) => Self::INTERNAL, + TableOptionsV0::Postgres(_) => Self::POSTGRES, + TableOptionsV0::BigQuery(_) => Self::BIGQUERY, + TableOptionsV0::Mysql(_) => Self::MYSQL, + TableOptionsV0::Local(_) => Self::LOCAL, + TableOptionsV0::Gcs(_) => Self::GCS, + TableOptionsV0::S3(_) => Self::S3_STORAGE, + TableOptionsV0::MongoDb(_) => Self::MONGODB, + TableOptionsV0::Snowflake(_) => Self::SNOWFLAKE, + TableOptionsV0::Delta(_) => Self::DELTA, + TableOptionsV0::Iceberg(_) => Self::ICEBERG, + TableOptionsV0::Azure(_) => Self::AZURE, + TableOptionsV0::SqlServer(_) => Self::SQL_SERVER, + TableOptionsV0::Lance(_) => Self::LANCE, + TableOptionsV0::Bson(_) => Self::BSON, + TableOptionsV0::Clickhouse(_) => Self::CLICKHOUSE, + TableOptionsV0::Cassandra(_) => Self::CASSANDRA, + TableOptionsV0::Excel(_) => Self::EXCEL, + TableOptionsV0::Sqlite(_) => Self::SQLITE, + } + } +} + +impl From for TableOptionsV1 { + fn from(value: TableOptionsV0) -> Self { + match value { + TableOptionsV0::Debug(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Internal(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Postgres(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::BigQuery(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Mysql(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Local(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Gcs(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::S3(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::MongoDb(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Snowflake(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Delta(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Iceberg(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Azure(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::SqlServer(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Lance(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Bson(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Clickhouse(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Cassandra(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Excel(opts) => TableOptionsV1::new(&opts), + TableOptionsV0::Sqlite(opts) => TableOptionsV1::new(&opts), } } } -impl fmt::Display for TableOptions { +impl TryFrom<&TableOptionsV1> for TableOptionsV0 { + type Error = ProtoConvError; + + fn try_from(value: &TableOptionsV1) -> Result { + let _v: serde_json::Value = serde_json::from_slice(&value.options).unwrap(); + + match value.name.as_ref() { + TableOptionsObjectStore::NAME => { + let obj_store: TableOptionsObjectStore = value.extract()?; + let file_type = obj_store.file_type.clone().unwrap_or_default(); + match file_type.as_ref() { + Self::DELTA => Ok(TableOptionsV0::Delta(obj_store)), + Self::ICEBERG => Ok(TableOptionsV0::Iceberg(obj_store)), + Self::AZURE => Ok(TableOptionsV0::Azure(obj_store)), + Self::LANCE => Ok(TableOptionsV0::Lance(obj_store)), + Self::BSON => Ok(TableOptionsV0::Bson(obj_store)), + _ => Err(ProtoConvError::UnknownVariant(value.name.to_string())), + } + } + Self::INTERNAL => { + let internal: TableOptionsInternal = value.extract()?; + Ok(TableOptionsV0::Internal(internal)) + } + Self::POSTGRES => { + let postgres: TableOptionsPostgres = value.extract()?; + Ok(TableOptionsV0::Postgres(postgres)) + } + Self::BIGQUERY => { + let bigquery: TableOptionsBigQuery = value.extract()?; + Ok(TableOptionsV0::BigQuery(bigquery)) + } + Self::MYSQL => { + let mysql: TableOptionsMysql = value.extract()?; + Ok(TableOptionsV0::Mysql(mysql)) + } + Self::LOCAL => { + let local: TableOptionsLocal = value.extract()?; + Ok(TableOptionsV0::Local(local)) + } + Self::GCS => { + let gcs: TableOptionsGcs = value.extract()?; + Ok(TableOptionsV0::Gcs(gcs)) + } + Self::S3_STORAGE => { + let s3: TableOptionsS3 = value.extract()?; + Ok(TableOptionsV0::S3(s3)) + } + Self::MONGODB => { + let mongo: TableOptionsMongoDb = value.extract()?; + Ok(TableOptionsV0::MongoDb(mongo)) + } + Self::SNOWFLAKE => { + let snowflake: TableOptionsSnowflake = value.extract()?; + Ok(TableOptionsV0::Snowflake(snowflake)) + } + Self::SQL_SERVER => { + let sql_server: TableOptionsSqlServer = value.extract()?; + Ok(TableOptionsV0::SqlServer(sql_server)) + } + Self::CLICKHOUSE => { + let clickhouse: TableOptionsClickhouse = value.extract()?; + Ok(TableOptionsV0::Clickhouse(clickhouse)) + } + Self::CASSANDRA => { + let cassandra: TableOptionsCassandra = value.extract()?; + Ok(TableOptionsV0::Cassandra(cassandra)) + } + Self::EXCEL => { + let excel: TableOptionsExcel = value.extract()?; + Ok(TableOptionsV0::Excel(excel)) + } + Self::SQLITE => { + let sqlite: TableOptionsObjectStore = value.extract()?; + Ok(TableOptionsV0::Sqlite(sqlite)) + } + Self::DEBUG => { + let debug: TableOptionsDebug = value.extract()?; + Ok(TableOptionsV0::Debug(debug)) + } + _ => Err(ProtoConvError::UnknownVariant(value.name.to_string())), + } + } +} + +impl fmt::Display for TableOptionsV0 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.as_str()) } } -impl TryFrom for TableOptions { +impl TryFrom for options::table_options_v0::Options { + type Error = ProtoConvError; + fn try_from(value: TableOptionsV0) -> Result { + Ok(match value { + TableOptionsV0::Debug(v) => options::table_options_v0::Options::Debug(v.into()), + TableOptionsV0::Internal(v) => { + options::table_options_v0::Options::Internal(v.try_into()?) + } + TableOptionsV0::Postgres(v) => options::table_options_v0::Options::Postgres(v.into()), + TableOptionsV0::BigQuery(v) => options::table_options_v0::Options::Bigquery(v.into()), + TableOptionsV0::Mysql(v) => options::table_options_v0::Options::Mysql(v.into()), + TableOptionsV0::Local(v) => options::table_options_v0::Options::Local(v.into()), + TableOptionsV0::Gcs(v) => options::table_options_v0::Options::Gcs(v.into()), + TableOptionsV0::S3(v) => options::table_options_v0::Options::S3(v.into()), + TableOptionsV0::MongoDb(v) => options::table_options_v0::Options::Mongo(v.into()), + TableOptionsV0::Snowflake(v) => options::table_options_v0::Options::Snowflake(v.into()), + TableOptionsV0::Delta(v) => options::table_options_v0::Options::Delta(v.into()), + TableOptionsV0::Iceberg(v) => options::table_options_v0::Options::Iceberg(v.into()), + TableOptionsV0::Azure(v) => options::table_options_v0::Options::Azure(v.into()), + TableOptionsV0::SqlServer(v) => options::table_options_v0::Options::SqlServer(v.into()), + TableOptionsV0::Lance(v) => options::table_options_v0::Options::Lance(v.into()), + TableOptionsV0::Bson(v) => options::table_options_v0::Options::Bson(v.into()), + TableOptionsV0::Clickhouse(v) => { + options::table_options_v0::Options::Clickhouse(v.into()) + } + TableOptionsV0::Cassandra(v) => options::table_options_v0::Options::Cassandra(v.into()), + TableOptionsV0::Excel(v) => options::table_options_v0::Options::Excel(v.into()), + TableOptionsV0::Sqlite(v) => options::table_options_v0::Options::Sqlite(v.into()), + }) + } +} + +impl TryFrom for TableOptionsV0 { type Error = ProtoConvError; - fn try_from(value: options::table_options::Options) -> Result { + fn try_from(value: options::table_options_v0::Options) -> Result { Ok(match value { - options::table_options::Options::Internal(v) => TableOptions::Internal(v.try_into()?), - options::table_options::Options::Debug(v) => TableOptions::Debug(v.try_into()?), - options::table_options::Options::Postgres(v) => TableOptions::Postgres(v.try_into()?), - options::table_options::Options::Bigquery(v) => TableOptions::BigQuery(v.try_into()?), - options::table_options::Options::Mysql(v) => TableOptions::Mysql(v.try_into()?), - options::table_options::Options::Local(v) => TableOptions::Local(v.try_into()?), - options::table_options::Options::Gcs(v) => TableOptions::Gcs(v.try_into()?), - options::table_options::Options::S3(v) => TableOptions::S3(v.try_into()?), - options::table_options::Options::Mongo(v) => TableOptions::MongoDb(v.try_into()?), - options::table_options::Options::Snowflake(v) => TableOptions::Snowflake(v.try_into()?), - options::table_options::Options::Delta(v) => TableOptions::Delta(v.try_into()?), - options::table_options::Options::Iceberg(v) => TableOptions::Iceberg(v.try_into()?), - options::table_options::Options::Azure(v) => TableOptions::Azure(v.try_into()?), - options::table_options::Options::SqlServer(v) => TableOptions::SqlServer(v.try_into()?), - options::table_options::Options::Lance(v) => TableOptions::Lance(v.try_into()?), - options::table_options::Options::Bson(v) => TableOptions::Bson(v.try_into()?), - options::table_options::Options::Clickhouse(v) => { - TableOptions::Clickhouse(v.try_into()?) + options::table_options_v0::Options::Internal(v) => { + TableOptionsV0::Internal(v.try_into()?) + } + options::table_options_v0::Options::Debug(v) => TableOptionsV0::Debug(v.try_into()?), + options::table_options_v0::Options::Postgres(v) => { + TableOptionsV0::Postgres(v.try_into()?) + } + options::table_options_v0::Options::Bigquery(v) => { + TableOptionsV0::BigQuery(v.try_into()?) + } + options::table_options_v0::Options::Mysql(v) => TableOptionsV0::Mysql(v.try_into()?), + options::table_options_v0::Options::Local(v) => TableOptionsV0::Local(v.try_into()?), + options::table_options_v0::Options::Gcs(v) => TableOptionsV0::Gcs(v.try_into()?), + options::table_options_v0::Options::S3(v) => TableOptionsV0::S3(v.try_into()?), + options::table_options_v0::Options::Mongo(v) => TableOptionsV0::MongoDb(v.try_into()?), + options::table_options_v0::Options::Snowflake(v) => { + TableOptionsV0::Snowflake(v.try_into()?) + } + options::table_options_v0::Options::Delta(v) => TableOptionsV0::Delta(v.try_into()?), + options::table_options_v0::Options::Iceberg(v) => { + TableOptionsV0::Iceberg(v.try_into()?) + } + options::table_options_v0::Options::Azure(v) => TableOptionsV0::Azure(v.try_into()?), + options::table_options_v0::Options::SqlServer(v) => { + TableOptionsV0::SqlServer(v.try_into()?) + } + options::table_options_v0::Options::Lance(v) => TableOptionsV0::Lance(v.try_into()?), + options::table_options_v0::Options::Bson(v) => TableOptionsV0::Bson(v.try_into()?), + options::table_options_v0::Options::Clickhouse(v) => { + TableOptionsV0::Clickhouse(v.try_into()?) } - options::table_options::Options::Cassandra(v) => TableOptions::Cassandra(v.try_into()?), - options::table_options::Options::Excel(v) => TableOptions::Excel(v.try_into()?), - options::table_options::Options::Sqlite(v) => TableOptions::Sqlite(v.try_into()?), + options::table_options_v0::Options::Cassandra(v) => { + TableOptionsV0::Cassandra(v.try_into()?) + } + options::table_options_v0::Options::Excel(v) => TableOptionsV0::Excel(v.try_into()?), + options::table_options_v0::Options::Sqlite(v) => TableOptionsV0::Sqlite(v.try_into()?), }) } } -impl TryFrom for TableOptions { +impl TryFrom for TableOptionsV0 { type Error = ProtoConvError; - fn try_from(value: options::TableOptions) -> Result { + fn try_from(value: options::TableOptionsV0) -> Result { value.options.required("options") } } -impl TryFrom for options::table_options::Options { +impl TryFrom for options::TableOptionsV0 { type Error = ProtoConvError; - fn try_from(value: TableOptions) -> Result { - Ok(match value { - TableOptions::Internal(v) => options::table_options::Options::Internal(v.try_into()?), - TableOptions::Debug(v) => options::table_options::Options::Debug(v.into()), - TableOptions::Postgres(v) => options::table_options::Options::Postgres(v.into()), - TableOptions::BigQuery(v) => options::table_options::Options::Bigquery(v.into()), - TableOptions::Mysql(v) => options::table_options::Options::Mysql(v.into()), - TableOptions::Local(v) => options::table_options::Options::Local(v.into()), - TableOptions::Gcs(v) => options::table_options::Options::Gcs(v.into()), - TableOptions::S3(v) => options::table_options::Options::S3(v.into()), - TableOptions::MongoDb(v) => options::table_options::Options::Mongo(v.into()), - TableOptions::Snowflake(v) => options::table_options::Options::Snowflake(v.into()), - TableOptions::Delta(v) => options::table_options::Options::Delta(v.into()), - TableOptions::Iceberg(v) => options::table_options::Options::Iceberg(v.into()), - TableOptions::Azure(v) => options::table_options::Options::Azure(v.into()), - TableOptions::SqlServer(v) => options::table_options::Options::SqlServer(v.into()), - TableOptions::Lance(v) => options::table_options::Options::Lance(v.into()), - TableOptions::Bson(v) => options::table_options::Options::Bson(v.into()), - TableOptions::Clickhouse(v) => options::table_options::Options::Clickhouse(v.into()), - TableOptions::Cassandra(v) => options::table_options::Options::Cassandra(v.into()), - TableOptions::Excel(v) => options::table_options::Options::Excel(v.into()), - TableOptions::Sqlite(v) => options::table_options::Options::Sqlite(v.into()), + fn try_from(value: TableOptionsV0) -> Result { + Ok(options::TableOptionsV0 { + options: Some(value.try_into()?), }) } } -impl TryFrom for options::TableOptions { +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TableOptionsDebug { + pub table_type: String, +} + +impl From for TableOptionsV0 { + fn from(value: TableOptionsDebug) -> Self { + TableOptionsV0::Debug(value) + } +} + +impl Default for TableOptionsDebug { + fn default() -> Self { + TableOptionsDebug { + table_type: "never_ending".to_string(), + } + } +} + +impl TableOptionsImpl for TableOptionsDebug { + const NAME: &'static str = "debug"; +} + +impl TryFrom for TableOptionsDebug { type Error = ProtoConvError; - fn try_from(value: TableOptions) -> Result { - Ok(options::TableOptions { - options: Some(value.try_into()?), + fn try_from(value: options::TableOptionsDebug) -> Result { + Ok(TableOptionsDebug { + table_type: value.table_type, }) } } +impl From for options::TableOptionsDebug { + fn from(value: TableOptionsDebug) -> Self { + options::TableOptionsDebug { + table_type: value.table_type, + } + } +} -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsInternal { pub columns: Vec, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsInternal) -> Self { + TableOptionsV0::Internal(value) + } +} + +impl TableOptionsImpl for TableOptionsInternal { + const NAME: &'static str = "internal"; +} + impl From for TableOptionsInternal { fn from(value: DFSchemaRef) -> Self { TableOptionsInternal { @@ -798,42 +1056,26 @@ impl TryFrom for options::TableOptionsInternal { type Error = ProtoConvError; fn try_from(value: TableOptionsInternal) -> Result { Ok(options::TableOptionsInternal { - columns: value - .columns - .into_iter() - .map(|col| col.try_into()) - .collect::>()?, + columns: value.columns.into_iter().map(Into::into).collect(), }) } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TableOptionsDebug { - pub table_type: String, -} - -impl TryFrom for TableOptionsDebug { - type Error = ProtoConvError; - fn try_from(value: options::TableOptionsDebug) -> Result { - Ok(TableOptionsDebug { - table_type: value.table_type, - }) - } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TableOptionsPostgres { + pub connection_string: String, + pub schema: String, + pub table: String, } -impl From for options::TableOptionsDebug { - fn from(value: TableOptionsDebug) -> Self { - options::TableOptionsDebug { - table_type: value.table_type, - } +impl From for TableOptionsV0 { + fn from(value: TableOptionsPostgres) -> Self { + TableOptionsV0::Postgres(value) } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TableOptionsPostgres { - pub connection_string: String, - pub schema: String, - pub table: String, +impl TableOptionsImpl for TableOptionsPostgres { + const NAME: &'static str = "postgres"; } impl TryFrom for TableOptionsPostgres { @@ -857,7 +1099,7 @@ impl From for options::TableOptionsPostgres { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsBigQuery { pub service_account_key: String, pub project_id: String, @@ -865,6 +1107,16 @@ pub struct TableOptionsBigQuery { pub table_id: String, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsBigQuery) -> Self { + TableOptionsV0::BigQuery(value) + } +} + +impl TableOptionsImpl for TableOptionsBigQuery { + const NAME: &'static str = "bigquery"; +} + impl TryFrom for TableOptionsBigQuery { type Error = ProtoConvError; fn try_from(value: options::TableOptionsBigQuery) -> Result { @@ -888,13 +1140,23 @@ impl From for options::TableOptionsBigQuery { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsMysql { pub connection_string: String, pub schema: String, pub table: String, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsMysql) -> Self { + TableOptionsV0::Mysql(value) + } +} + +impl TableOptionsImpl for TableOptionsMysql { + const NAME: &'static str = "mysql"; +} + impl TryFrom for TableOptionsMysql { type Error = ProtoConvError; fn try_from(value: options::TableOptionsMysql) -> Result { @@ -916,13 +1178,23 @@ impl From for options::TableOptionsMysql { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsLocal { pub location: String, pub file_type: String, pub compression: Option, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsLocal) -> Self { + TableOptionsV0::Local(value) + } +} + +impl TableOptionsImpl for TableOptionsLocal { + const NAME: &'static str = "local"; +} + impl TryFrom for TableOptionsLocal { type Error = ProtoConvError; fn try_from(value: options::TableOptionsLocal) -> Result { @@ -944,7 +1216,7 @@ impl From for options::TableOptionsLocal { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsGcs { pub service_account_key: Option, pub bucket: String, @@ -953,6 +1225,16 @@ pub struct TableOptionsGcs { pub compression: Option, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsGcs) -> Self { + TableOptionsV0::Gcs(value) + } +} + +impl TableOptionsImpl for TableOptionsGcs { + const NAME: &'static str = "gcs"; +} + impl TryFrom for TableOptionsGcs { type Error = ProtoConvError; fn try_from(value: options::TableOptionsGcs) -> Result { @@ -978,7 +1260,7 @@ impl From for options::TableOptionsGcs { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsS3 { pub access_key_id: Option, pub secret_access_key: Option, @@ -989,6 +1271,16 @@ pub struct TableOptionsS3 { pub compression: Option, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsS3) -> Self { + TableOptionsV0::S3(value) + } +} + +impl TableOptionsImpl for TableOptionsS3 { + const NAME: &'static str = "s3"; +} + impl TryFrom for TableOptionsS3 { type Error = ProtoConvError; fn try_from(value: options::TableOptionsS3) -> Result { @@ -1017,67 +1309,45 @@ impl From for options::TableOptionsS3 { } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsMongoDb { pub connection_string: String, pub database: String, pub collection: String, - pub columns: Option>, +} + +impl From for TableOptionsV0 { + fn from(value: TableOptionsMongoDb) -> Self { + TableOptionsV0::MongoDb(value) + } +} + +impl TableOptionsImpl for TableOptionsMongoDb { + const NAME: &'static str = "mongo"; } impl TryFrom for TableOptionsMongoDb { type Error = ProtoConvError; fn try_from(value: options::TableOptionsMongo) -> Result { - let columns = if value.columns.is_empty() { - None - } else { - Some( - value - .columns - .iter() - .map(|i| { - self::InternalColumnDefinition::try_from(i.to_owned()).map(|mut v| { - v.nullable = true; - v - }) - }) - .collect::>()?, - ) - }; - Ok(TableOptionsMongoDb { connection_string: value.connection_string, database: value.database, collection: value.collection, - columns, }) } } impl From for options::TableOptionsMongo { fn from(value: TableOptionsMongoDb) -> Self { - let columns = if value.columns.is_none() { - Vec::new() - } else { - value - .columns - .unwrap() - .into_iter() - .map(|v| v.try_into()) - .collect::>() - .unwrap_or_else(|_| Vec::new()) - }; - options::TableOptionsMongo { connection_string: value.connection_string, database: value.database, collection: value.collection, - columns, } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsExcel { pub location: String, pub storage_options: StorageOptions, @@ -1087,6 +1357,16 @@ pub struct TableOptionsExcel { pub has_header: bool, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsExcel) -> Self { + TableOptionsV0::Excel(value) + } +} + +impl TableOptionsImpl for TableOptionsExcel { + const NAME: &'static str = "excel"; +} + impl TryFrom for TableOptionsExcel { type Error = ProtoConvError; fn try_from(value: options::TableOptionsExcel) -> Result { @@ -1114,13 +1394,23 @@ impl From for options::TableOptionsExcel { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsSqlServer { pub connection_string: String, pub schema: String, pub table: String, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsSqlServer) -> Self { + TableOptionsV0::SqlServer(value) + } +} + +impl TableOptionsImpl for TableOptionsSqlServer { + const NAME: &'static str = "sql_server"; +} + impl TryFrom for TableOptionsSqlServer { type Error = ProtoConvError; fn try_from(value: options::TableOptionsSqlServer) -> Result { @@ -1142,13 +1432,23 @@ impl From for options::TableOptionsSqlServer { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsClickhouse { pub connection_string: String, pub table: String, pub database: Option, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsClickhouse) -> Self { + TableOptionsV0::Clickhouse(value) + } +} + +impl TableOptionsImpl for TableOptionsClickhouse { + const NAME: &'static str = "clickhouse"; +} + impl TryFrom for TableOptionsClickhouse { type Error = ProtoConvError; fn try_from(value: options::TableOptionsClickhouse) -> Result { @@ -1170,7 +1470,7 @@ impl From for options::TableOptionsClickhouse { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsCassandra { pub host: String, pub keyspace: String, @@ -1179,6 +1479,16 @@ pub struct TableOptionsCassandra { pub password: Option, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsCassandra) -> Self { + TableOptionsV0::Cassandra(value) + } +} + +impl TableOptionsImpl for TableOptionsCassandra { + const NAME: &'static str = "cassandra"; +} + impl TryFrom for TableOptionsCassandra { type Error = ProtoConvError; fn try_from(value: options::TableOptionsCassandra) -> Result { @@ -1204,7 +1514,7 @@ impl From for options::TableOptionsCassandra { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsSnowflake { pub account_name: String, pub login_name: String, @@ -1216,6 +1526,16 @@ pub struct TableOptionsSnowflake { pub table_name: String, } +impl From for TableOptionsV0 { + fn from(value: TableOptionsSnowflake) -> Self { + TableOptionsV0::Snowflake(value) + } +} + +impl TableOptionsImpl for TableOptionsSnowflake { + const NAME: &'static str = "snowflake"; +} + impl TryFrom for TableOptionsSnowflake { type Error = ProtoConvError; fn try_from(value: options::TableOptionsSnowflake) -> Result { @@ -1248,7 +1568,7 @@ impl From for options::TableOptionsSnowflake { } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DatabaseOptionsSqlite { pub location: String, pub storage_options: Option, @@ -1273,7 +1593,7 @@ impl From for options::DatabaseOptionsSqlite { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TableOptionsObjectStore { pub location: String, pub name: Option, @@ -1281,7 +1601,10 @@ pub struct TableOptionsObjectStore { pub file_type: Option, pub compression: Option, pub schema_sample_size: Option, - pub columns: Vec, +} + +impl TableOptionsImpl for TableOptionsObjectStore { + const NAME: &'static str = "object_store"; } impl TryFrom for TableOptionsObjectStore { @@ -1294,16 +1617,6 @@ impl TryFrom for TableOptionsObjectStore { file_type: value.file_type, compression: value.compression, schema_sample_size: value.schema_sample_size, - columns: value - .columns - .iter() - .map(|i| { - self::InternalColumnDefinition::try_from(i.to_owned()).map(|mut v| { - v.nullable = true; - v - }) - }) - .collect::>()?, }) } } @@ -1317,17 +1630,11 @@ impl From for options::TableOptionsObjectStore { file_type: value.file_type, compression: value.compression, schema_sample_size: value.schema_sample_size, - columns: value - .columns - .into_iter() - .map(|v| v.try_into()) - .collect::>() - .unwrap_or_else(|_| Vec::new()), } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum TunnelOptions { Internal(TunnelOptionsInternal), Debug(TunnelOptionsDebug), @@ -1390,7 +1697,7 @@ impl From for options::TunnelOptions { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TunnelOptionsInternal {} impl TryFrom for TunnelOptionsInternal { @@ -1406,7 +1713,7 @@ impl From for options::TunnelOptionsInternal { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TunnelOptionsDebug {} impl TryFrom for TunnelOptionsDebug { @@ -1422,7 +1729,7 @@ impl From for options::TunnelOptionsDebug { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct TunnelOptionsSsh { pub connection_string: String, pub ssh_key: Vec, @@ -1446,7 +1753,7 @@ impl From for options::TunnelOptionsSsh { } } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum CredentialsOptions { Debug(CredentialsOptionsDebug), Gcp(CredentialsOptionsGcp), @@ -1521,7 +1828,7 @@ impl From for options::CredentialsOptions { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct CredentialsOptionsDebug { pub table_type: String, } @@ -1543,7 +1850,7 @@ impl From for options::CredentialsOptionsDebug { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct CredentialsOptionsGcp { pub service_account_key: String, } @@ -1565,7 +1872,7 @@ impl From for options::CredentialsOptionsGcp { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct CredentialsOptionsAws { pub access_key_id: String, pub secret_access_key: String, @@ -1590,7 +1897,7 @@ impl From for options::CredentialsOptionsAws { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct CredentialsOptionsAzure { pub account_name: String, pub access_key: String, @@ -1615,7 +1922,7 @@ impl From for options::CredentialsOptionsAzure { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct CredentialsOptionsOpenAI { pub api_key: String, pub api_base: Option, @@ -1786,7 +2093,6 @@ pub struct CopyToFormatOptionsJson { #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct CopyToFormatOptionsBson {} - #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct CopyToFormatOptionsLance { pub max_rows_per_file: Option, diff --git a/crates/protogen/src/metastore/types/service.rs b/crates/protogen/src/metastore/types/service.rs index 97b93e438..d08458d51 100644 --- a/crates/protogen/src/metastore/types/service.rs +++ b/crates/protogen/src/metastore/types/service.rs @@ -4,12 +4,13 @@ use super::catalog::{FunctionType, SourceAccessMode}; use super::options::{ CredentialsOptions, DatabaseOptions, - TableOptions, + InternalColumnDefinition, TableOptionsInternal, + TableOptionsV0, TunnelOptions, }; use crate::gen::metastore::service; -use crate::{FromOptionalField, ProtoConvError}; +use crate::{gen, FromOptionalField, ProtoConvError}; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Mutation { @@ -78,7 +79,6 @@ impl TryFrom for Mutation { }) } } - impl TryFrom for service::mutation::Mutation { type Error = ProtoConvError; fn try_from(value: Mutation) -> Result { @@ -345,15 +345,32 @@ impl TryFrom for service::CreateFunction { pub struct CreateExternalTable { pub schema: String, pub name: String, - pub options: TableOptions, + pub options: TableOptionsV0, pub or_replace: bool, pub if_not_exists: bool, pub tunnel: Option, + pub columns: Option>, } + impl TryFrom for CreateExternalTable { type Error = ProtoConvError; fn try_from(value: service::CreateExternalTable) -> Result { + let columns: Vec = value + .columns + .into_iter() + .map(|column| { + let col: InternalColumnDefinition = column.try_into()?; + Ok::<_, ProtoConvError>(col) + }) + .collect::>()?; + + let columns = if columns.is_empty() { + None + } else { + Some(columns) + }; + // TODO: Check if string are zero value. Ok(CreateExternalTable { schema: value.schema, @@ -362,6 +379,7 @@ impl TryFrom for CreateExternalTable { or_replace: value.or_replace, if_not_exists: value.if_not_exists, tunnel: value.tunnel, + columns, }) } } @@ -369,6 +387,11 @@ impl TryFrom for CreateExternalTable { impl TryFrom for service::CreateExternalTable { type Error = ProtoConvError; fn try_from(value: CreateExternalTable) -> Result { + let columns: Vec = value + .columns + .map(|s| s.into_iter().map(Into::into).collect()) + .unwrap_or_default(); + Ok(service::CreateExternalTable { schema: value.schema, name: value.name, @@ -376,6 +399,7 @@ impl TryFrom for service::CreateExternalTable { or_replace: value.or_replace, if_not_exists: value.if_not_exists, tunnel: value.tunnel, + columns, }) } } diff --git a/crates/protogen/src/sqlexec/physical_plan.rs b/crates/protogen/src/sqlexec/physical_plan.rs index a4807f562..522703382 100644 --- a/crates/protogen/src/sqlexec/physical_plan.rs +++ b/crates/protogen/src/sqlexec/physical_plan.rs @@ -198,11 +198,13 @@ pub struct CreateExternalTableExec { #[prost(bool, tag = "3")] pub if_not_exists: bool, #[prost(message, tag = "4")] - pub table_options: Option, + pub table_options: Option, #[prost(message, optional, tag = "5")] pub tunnel: Option, #[prost(bool, tag = "6")] pub or_replace: bool, + #[prost(message, optional, tag = "7")] + pub table_schema: Option, } #[derive(Clone, PartialEq, Message)] diff --git a/crates/sqlbuiltins/Cargo.toml b/crates/sqlbuiltins/Cargo.toml index 4f2a4a08f..e055d5318 100644 --- a/crates/sqlbuiltins/Cargo.toml +++ b/crates/sqlbuiltins/Cargo.toml @@ -15,6 +15,7 @@ datafusion_ext = { path = "../datafusion_ext" } catalog = { path = "../catalog" } telemetry = { path = "../telemetry" } datasources = { path = "../datasources" } +parser = { path = "../parser" } decimal = { path = "../decimal" } async-trait = { workspace = true } datafusion = { workspace = true } diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index ce6018e7f..ac7b3d291 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -15,6 +15,7 @@ mod object_store; mod parquet_metadata; mod postgres; mod snowflake; + mod sqlite; mod sqlserver; pub mod system; diff --git a/crates/sqlbuiltins/src/lib.rs b/crates/sqlbuiltins/src/lib.rs index ca2b95640..a0ecea3d2 100644 --- a/crates/sqlbuiltins/src/lib.rs +++ b/crates/sqlbuiltins/src/lib.rs @@ -7,7 +7,6 @@ //! This crate provides the implementation of various builtin sql objects //! (particularly functions). These live outside the `sqlexec` crate to allow //! it to be imported into both `sqlexec` and `metastore`. - pub mod builtins; pub mod errors; pub mod functions; diff --git a/crates/sqlbuiltins/src/validation.rs b/crates/sqlbuiltins/src/validation.rs index 816e8ebaf..c4e2f6a2b 100644 --- a/crates/sqlbuiltins/src/validation.rs +++ b/crates/sqlbuiltins/src/validation.rs @@ -2,7 +2,7 @@ use protogen::metastore::types::options::{ CopyToDestinationOptions, CredentialsOptions, DatabaseOptions, - TableOptions, + TableOptionsV0, TunnelOptions, }; @@ -65,11 +65,11 @@ pub fn validate_table_tunnel_support(table: &str, tunnel: &str) -> Result<()> { if matches!( (table, tunnel), // Debug - (TableOptions::DEBUG, TunnelOptions::DEBUG) + (TableOptionsV0::DEBUG, TunnelOptions::DEBUG) // Postgres - | (TableOptions::POSTGRES, TunnelOptions::SSH) + | (TableOptionsV0::POSTGRES, TunnelOptions::SSH) // MySQL - | (TableOptions::MYSQL, TunnelOptions::SSH) + | (TableOptionsV0::MYSQL, TunnelOptions::SSH) ) { Ok(()) } else { @@ -103,16 +103,16 @@ pub fn validate_table_creds_support(table: &str, creds: &str) -> Result<()> { if matches!( (table, creds), // Debug - (TableOptions::DEBUG, CredentialsOptions::DEBUG) | + (TableOptionsV0::DEBUG, CredentialsOptions::DEBUG) | // Google cloud - (TableOptions::GCS, CredentialsOptions::GCP) | - (TableOptions::BIGQUERY, CredentialsOptions::GCP) | + (TableOptionsV0::GCS, CredentialsOptions::GCP) | + (TableOptionsV0::BIGQUERY, CredentialsOptions::GCP) | // AWS - (TableOptions::S3_STORAGE, CredentialsOptions::AWS) | + (TableOptionsV0::S3_STORAGE, CredentialsOptions::AWS) | // Azure - (TableOptions::AZURE, CredentialsOptions::AZURE) | + (TableOptionsV0::AZURE, CredentialsOptions::AZURE) | // Delta & Iceberg & Lance - (TableOptions::DELTA | TableOptions::ICEBERG | TableOptions::LANCE, CredentialsOptions::GCP | CredentialsOptions::AWS | CredentialsOptions::AZURE ) + (TableOptionsV0::DELTA | TableOptionsV0::ICEBERG | TableOptionsV0::LANCE, CredentialsOptions::GCP | CredentialsOptions::AWS | CredentialsOptions::AZURE ) ) { Ok(()) } else { diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index a9ab02179..af64e7a2d 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use std::sync::Arc; use catalog::session_catalog::SessionCatalog; +use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::json::JsonFormat; @@ -52,8 +53,6 @@ use protogen::metastore::types::options::{ DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DatabaseOptionsSqlite, - InternalColumnDefinition, - TableOptions, TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, @@ -69,6 +68,7 @@ use protogen::metastore::types::options::{ TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, + TableOptionsV0, TunnelOptions, }; use sqlbuiltins::builtins::DEFAULT_CATALOG; @@ -148,14 +148,14 @@ impl<'a> ExternalDispatcher<'a> { name: &str, ) -> Result> { let tunnel = self.get_tunnel_opts(db.tunnel_id)?; - + // TODO: use the DatasourceRegistry to dispatch instead match &db.options { - DatabaseOptions::Internal(_) => unimplemented!(), DatabaseOptions::Debug(DatabaseOptionsDebug {}) => { // Use name of the table as table type here. let provider = DebugTableType::from_str(name)?; Ok(provider.into_table_provider(tunnel.as_ref())) } + DatabaseOptions::Internal(_) => unimplemented!(), DatabaseOptions::Postgres(DatabaseOptionsPostgres { connection_string }) => { let access = PostgresAccess::new_from_conn_str(connection_string, tunnel); let prov_conf = PostgresTableProviderConfig { @@ -191,11 +191,9 @@ impl<'a> ExternalDispatcher<'a> { let provider = accessor.into_table_provider(table_access, true).await?; Ok(Arc::new(provider)) } - DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { - connection_string, .. - }) => { + DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { connection_string }) => { let table_info = MongoDbTableAccessInfo { - database: schema.to_string(), + database: schema.to_string(), // A mongodb database is pretty much a schema. collection: name.to_string(), fields: None, }; @@ -294,19 +292,149 @@ impl<'a> ExternalDispatcher<'a> { } } + pub async fn dispatch_external_table( &self, table: &TableEntry, ) -> Result> { let tunnel = self.get_tunnel_opts(table.tunnel_id)?; + let columns_opt = table.columns.clone(); + let optional_schema = columns_opt.map(|cols| { + let fields: Vec = cols + .into_iter() + .map(|col| { + let fld: Field = col.into(); + fld + }) + .collect(); + Schema::new(fields) + }); + + + self.dispatch_table_options_v0(&table.options, tunnel, optional_schema) + .await + } + + async fn create_obj_store_table_provider( + &self, + access: Arc, + path: impl AsRef, + file_type: &str, + compression: Option<&String>, + ) -> Result> { + let path = path.as_ref(); + // TODO: only parquet/ndjson/csv actually support compression, + // so we'll end up attempting to handle compression for some + // types and not others. + let compression = compression + .map(|c| c.parse::()) + .transpose()? + .unwrap_or(FileCompressionType::UNCOMPRESSED); + + let accessor = ObjStoreAccessor::new(access.clone())?; + + match file_type { + "csv" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new( + CsvFormat::default() + .with_file_compression_type(compression) + .with_schema_infer_max_rec(Some(20480)), + ), + accessor.clone().list_globbed(path).await?, + ) + .await?), + "parquet" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new(ParquetFormat::default()), + accessor.clone().list_globbed(path).await?, + ) + .await?), + "bson" => Ok(bson_streaming_table( + access.clone(), + DatasourceUrl::try_new(path)?, + None, + Some(128), + ) + .await?), + "json" => Ok( + json_streaming_table(access.clone(), DatasourceUrl::try_new(path)?, None).await?, + ), + "ndjson" | "jsonl" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new(JsonFormat::default().with_file_compression_type(compression)), + accessor.clone().list_globbed(path).await?, + ) + .await?), + _ => Err(DispatchError::String( + format!("Unsupported file type: '{}', for '{}'", file_type, path,).to_string(), + )), + } + } + + pub async fn dispatch_function( + &self, + func: &FunctionEntry, + args: Option>, + opts: Option>, + ) -> Result> { + let args = args.unwrap_or_default(); + let opts = opts.unwrap_or_default(); + let resolve_func = if func.meta.builtin { + self.function_registry.get_table_func(&func.meta.name) + } else { + // We only have builtin functions right now. + None + }; + let prov = resolve_func + .unwrap() + .create_provider( + &DefaultTableContextProvider::new(self.catalog, self.df_ctx), + args, + opts, + ) + .await?; + Ok(prov) + } + + fn get_tunnel_opts(&self, tunnel_id: Option) -> Result> { + let tunnel_options = if let Some(tunnel_id) = tunnel_id { + let ent = self + .catalog + .get_by_oid(tunnel_id) + .ok_or(DispatchError::MissingTunnel(tunnel_id))?; + + let ent = match ent { + CatalogEntry::Tunnel(ent) => ent, + _ => return Err(DispatchError::MissingTunnel(tunnel_id)), + }; + Some(ent.options.clone()) + } else { + None + }; + Ok(tunnel_options) + } - match &table.options { - TableOptions::Internal(TableOptionsInternal { .. }) => unimplemented!(), // Purposely unimplemented. - TableOptions::Debug(TableOptionsDebug { table_type }) => { + // TODO: Remove this function once everything is using the new table options + async fn dispatch_table_options_v0( + &self, + opts: &TableOptionsV0, + tunnel: Option, + schema: Option, + ) -> Result> { + match &opts { + TableOptionsV0::Debug(TableOptionsDebug { table_type }) => { let provider = DebugTableType::from_str(table_type)?; Ok(provider.into_table_provider(tunnel.as_ref())) } - TableOptions::Excel(TableOptionsExcel { + TableOptionsV0::Internal(TableOptionsInternal { .. }) => unimplemented!(), // Purposely unimplemented. + TableOptionsV0::Excel(TableOptionsExcel { location, storage_options, has_header, @@ -324,7 +452,7 @@ impl<'a> ExternalDispatcher<'a> { Ok(Arc::new(provider)) } - TableOptions::Postgres(TableOptionsPostgres { + TableOptionsV0::Postgres(TableOptionsPostgres { connection_string, schema, table, @@ -338,7 +466,7 @@ impl<'a> ExternalDispatcher<'a> { let prov = PostgresTableProvider::try_new(prov_conf).await?; Ok(Arc::new(prov)) } - TableOptions::BigQuery(TableOptionsBigQuery { + TableOptionsV0::BigQuery(TableOptionsBigQuery { service_account_key, project_id, dataset_id, @@ -355,7 +483,7 @@ impl<'a> ExternalDispatcher<'a> { let provider = accessor.into_table_provider(table_access, true).await?; Ok(Arc::new(provider)) } - TableOptions::Mysql(TableOptionsMysql { + TableOptionsV0::Mysql(TableOptionsMysql { connection_string, schema, table, @@ -369,25 +497,22 @@ impl<'a> ExternalDispatcher<'a> { let provider = accessor.into_table_provider(table_access, true).await?; Ok(Arc::new(provider)) } - TableOptions::MongoDb(TableOptionsMongoDb { + TableOptionsV0::MongoDb(TableOptionsMongoDb { connection_string, database, collection, - columns, }) => { let table_info = MongoDbTableAccessInfo { database: database.to_string(), collection: collection.to_string(), - fields: columns - .to_owned() - .map(|cs| InternalColumnDefinition::to_arrow_fields(cs.into_iter())), + fields: None, }; let accessor = MongoDbAccessor::connect(connection_string).await?; let table_accessor = accessor.into_table_accessor(table_info); let provider = table_accessor.into_table_provider().await?; Ok(Arc::new(provider)) } - TableOptions::Snowflake(TableOptionsSnowflake { + TableOptionsV0::Snowflake(TableOptionsSnowflake { account_name, login_name, password, @@ -421,7 +546,7 @@ impl<'a> ExternalDispatcher<'a> { .await?; Ok(Arc::new(provider)) } - TableOptions::Local(TableOptionsLocal { + TableOptionsV0::Local(TableOptionsLocal { location, file_type, compression, @@ -440,7 +565,7 @@ impl<'a> ExternalDispatcher<'a> { ) .await } - TableOptions::Gcs(TableOptionsGcs { + TableOptionsV0::Gcs(TableOptionsGcs { service_account_key, bucket, location, @@ -460,7 +585,7 @@ impl<'a> ExternalDispatcher<'a> { ) .await } - TableOptions::S3(TableOptionsS3 { + TableOptionsV0::S3(TableOptionsS3 { access_key_id, secret_access_key, region, @@ -484,7 +609,7 @@ impl<'a> ExternalDispatcher<'a> { ) .await } - TableOptions::Azure(TableOptionsObjectStore { + TableOptionsV0::Azure(TableOptionsObjectStore { location, storage_options, file_type, @@ -514,7 +639,7 @@ impl<'a> ExternalDispatcher<'a> { ) .await } - TableOptions::Delta(TableOptionsObjectStore { + TableOptionsV0::Delta(TableOptionsObjectStore { location, storage_options, .. @@ -523,7 +648,7 @@ impl<'a> ExternalDispatcher<'a> { Arc::new(load_table_direct(location, storage_options.clone()).await?); Ok(provider) } - TableOptions::Iceberg(TableOptionsObjectStore { + TableOptionsV0::Iceberg(TableOptionsObjectStore { location, storage_options, .. @@ -534,7 +659,7 @@ impl<'a> ExternalDispatcher<'a> { let reader = table.table_reader().await?; Ok(reader) } - TableOptions::SqlServer(TableOptionsSqlServer { + TableOptionsV0::SqlServer(TableOptionsSqlServer { connection_string, schema, table, @@ -548,7 +673,7 @@ impl<'a> ExternalDispatcher<'a> { .await?; Ok(Arc::new(table)) } - TableOptions::Clickhouse(TableOptionsClickhouse { + TableOptionsV0::Clickhouse(TableOptionsClickhouse { connection_string, database, table, @@ -559,40 +684,30 @@ impl<'a> ExternalDispatcher<'a> { let table = ClickhouseTableProvider::try_new(access, table_ref).await?; Ok(Arc::new(table)) } - TableOptions::Lance(TableOptionsObjectStore { + TableOptionsV0::Lance(TableOptionsObjectStore { location, storage_options, .. }) => Ok(Arc::new( LanceTable::new(location, storage_options.clone()).await?, )), - TableOptions::Bson(TableOptionsObjectStore { + TableOptionsV0::Bson(TableOptionsObjectStore { location, storage_options, schema_sample_size, - columns, .. }) => { - let columns = if columns.is_empty() { - None - } else { - Some(InternalColumnDefinition::to_arrow_fields( - columns.to_owned(), - )) - }; - let source_url = DatasourceUrl::try_new(location)?; let store_access = storage_options_into_store_access(&source_url, storage_options)?; - Ok(bson_streaming_table( store_access, source_url, - columns, + schema, schema_sample_size.to_owned(), ) .await?) } - TableOptions::Cassandra(TableOptionsCassandra { + TableOptionsV0::Cassandra(TableOptionsCassandra { host, keyspace, table, @@ -610,7 +725,7 @@ impl<'a> ExternalDispatcher<'a> { Ok(Arc::new(table)) } - TableOptions::Sqlite(TableOptionsObjectStore { + TableOptionsV0::Sqlite(TableOptionsObjectStore { location, storage_options, name, @@ -633,110 +748,4 @@ impl<'a> ExternalDispatcher<'a> { } } } - - async fn create_obj_store_table_provider( - &self, - access: Arc, - path: impl AsRef, - file_type: &str, - compression: Option<&String>, - ) -> Result> { - let path = path.as_ref(); - // TODO: only parquet/ndjson/csv actually support compression, - // so we'll end up attempting to handle compression for some - // types and not others. - let compression = compression - .map(|c| c.parse::()) - .transpose()? - .unwrap_or(FileCompressionType::UNCOMPRESSED); - - let accessor = ObjStoreAccessor::new(access.clone())?; - - match file_type { - "csv" => Ok(accessor - .clone() - .into_table_provider( - &self.df_ctx.state(), - Arc::new( - CsvFormat::default() - .with_file_compression_type(compression) - .with_schema_infer_max_rec(Some(20480)), - ), - accessor.clone().list_globbed(path).await?, - ) - .await?), - "parquet" => Ok(accessor - .clone() - .into_table_provider( - &self.df_ctx.state(), - Arc::new(ParquetFormat::default()), - accessor.clone().list_globbed(path).await?, - ) - .await?), - "bson" => Ok(bson_streaming_table( - access.clone(), - DatasourceUrl::try_new(path)?, - None, - Some(128), - ) - .await?), - "json" => Ok( - json_streaming_table(access.clone(), DatasourceUrl::try_new(path)?, None).await?, - ), - "ndjson" | "jsonl" => Ok(accessor - .clone() - .into_table_provider( - &self.df_ctx.state(), - Arc::new(JsonFormat::default().with_file_compression_type(compression)), - accessor.clone().list_globbed(path).await?, - ) - .await?), - _ => Err(DispatchError::String( - format!("Unsupported file type: '{}', for '{}'", file_type, path,).to_string(), - )), - } - } - - pub async fn dispatch_function( - &self, - func: &FunctionEntry, - args: Option>, - opts: Option>, - ) -> Result> { - let args = args.unwrap_or_default(); - let opts = opts.unwrap_or_default(); - let resolve_func = if func.meta.builtin { - self.function_registry.get_table_func(&func.meta.name) - } else { - // We only have builtin functions right now. - None - }; - let prov = resolve_func - .unwrap() - .create_provider( - &DefaultTableContextProvider::new(self.catalog, self.df_ctx), - args, - opts, - ) - .await?; - Ok(prov) - } - - fn get_tunnel_opts(&self, tunnel_id: Option) -> Result> { - let tunnel_options = if let Some(tunnel_id) = tunnel_id { - let ent = self - .catalog - .get_by_oid(tunnel_id) - .ok_or(DispatchError::MissingTunnel(tunnel_id))?; - - let ent = match ent { - CatalogEntry::Tunnel(ent) => ent, - _ => return Err(DispatchError::MissingTunnel(tunnel_id)), - }; - Some(ent.options.clone()) - } else { - None - }; - Ok(tunnel_options) - } } diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index e73bdd962..68ee6b7b6 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -100,8 +100,14 @@ pub enum DispatchError { #[error("{0}")] String(String), + + #[error(transparent)] + Builtin(#[from] sqlbuiltins::errors::BuiltinError), + #[error(transparent)] + Datasource(#[from] Box), } + /// Trait for planning views. /// /// Currently views aren't that sophisticated as we're only storing the SQL diff --git a/crates/sqlexec/src/extension_codec.rs b/crates/sqlexec/src/extension_codec.rs index 0b47a5c9d..ea761fcea 100644 --- a/crates/sqlexec/src/extension_codec.rs +++ b/crates/sqlexec/src/extension_codec.rs @@ -307,6 +307,10 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { if_not_exists: ext.if_not_exists, table_options: table_options.try_into()?, tunnel: ext.tunnel, + table_schema: ext + .table_schema + .map(|ref schema| schema.try_into()) + .transpose()?, }) } proto::ExecutionPlanExtensionType::CreateTunnelExec(ext) => { @@ -633,6 +637,11 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { if_not_exists: exec.if_not_exists, table_options: Some(exec.table_options.clone().try_into()?), tunnel: exec.tunnel.clone(), + table_schema: exec + .table_schema + .as_ref() + .map(|schema| schema.try_into()) + .transpose()?, }, ) } else if let Some(exec) = node.as_any().downcast_ref::() { @@ -652,7 +661,7 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { }) } else if let Some(exec) = node.as_any().downcast_ref::() { proto::ExecutionPlanExtensionType::DescribeTable(proto::DescribeTableExec { - entry: Some(exec.entry.clone().try_into()?), + entry: Some(exec.entry.clone().into()), }) } else if let Some(exec) = node.as_any().downcast_ref::() { proto::ExecutionPlanExtensionType::DropCredentialsExec(proto::DropCredentialsExec { @@ -673,9 +682,8 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { .tbl_entries .clone() .into_iter() - .map(|r| r.try_into()) - .collect::>() - .expect("failed to encode table entries"), + .map(|r| r.into()) + .collect(), if_exists: exec.if_exists, }) } else if let Some(exec) = node.as_any().downcast_ref::() { @@ -697,7 +705,7 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { } proto::ExecutionPlanExtensionType::UpdateExec(proto::UpdateExec { - table: Some(exec.table.clone().try_into()?), + table: Some(exec.table.clone().into()), updates, where_expr: exec .where_expr @@ -720,7 +728,7 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> { }) } else if let Some(exec) = node.as_any().downcast_ref::() { proto::ExecutionPlanExtensionType::DeleteExec(proto::DeleteExec { - table: Some(exec.table.clone().try_into()?), + table: Some(exec.table.clone().into()), where_expr: exec .where_expr .as_ref() diff --git a/crates/sqlexec/src/planner/context_builder.rs b/crates/sqlexec/src/planner/context_builder.rs index 967d1963c..194b1373e 100644 --- a/crates/sqlexec/src/planner/context_builder.rs +++ b/crates/sqlexec/src/planner/context_builder.rs @@ -17,7 +17,7 @@ use datafusion_ext::planner::AsyncContextProvider; use datafusion_ext::runtime::table_provider::RuntimeAwareTableProvider; use datafusion_ext::vars::CredentialsVarProvider; use protogen::metastore::types::catalog::{CatalogEntry, RuntimePreference}; -use protogen::metastore::types::options::TableOptions; +use protogen::metastore::types::options::TableOptionsV0; use protogen::rpcsrv::types::service::ResolvedTableReference; use crate::context::local::LocalSessionContext; @@ -196,8 +196,8 @@ impl<'a> PartialContextProvider<'a> { let run_local = table.meta.is_temp || table.meta.builtin || matches!( - &table.options, - TableOptions::Debug(_) | TableOptions::Local(_) + table.options, + TableOptionsV0::Debug(_) | TableOptionsV0::Local(_) ); if run_local { diff --git a/crates/sqlexec/src/planner/errors.rs b/crates/sqlexec/src/planner/errors.rs index 3a2484791..167134dda 100644 --- a/crates/sqlexec/src/planner/errors.rs +++ b/crates/sqlexec/src/planner/errors.rs @@ -103,6 +103,9 @@ pub enum PlanError { #[error("{0}")] String(String), + + #[error(transparent)] + Builtin(#[from] sqlbuiltins::errors::BuiltinError), } impl From for datafusion::error::DataFusionError { diff --git a/crates/sqlexec/src/planner/logical_plan/create_external_table.rs b/crates/sqlexec/src/planner/logical_plan/create_external_table.rs index 4238cad1d..aa9e01802 100644 --- a/crates/sqlexec/src/planner/logical_plan/create_external_table.rs +++ b/crates/sqlexec/src/planner/logical_plan/create_external_table.rs @@ -1,10 +1,10 @@ -use datafusion::arrow::datatypes::FieldRef; +use datafusion::arrow::datatypes::Schema; +use protogen::metastore::types::options::TableOptionsV0; use super::{ DfLogicalPlan, ExtensionNode, OwnedFullObjectReference, - TableOptions, UserDefinedLogicalNodeCore, GENERIC_OPERATION_LOGICAL_SCHEMA, }; @@ -14,9 +14,9 @@ pub struct CreateExternalTable { pub tbl_reference: OwnedFullObjectReference, pub or_replace: bool, pub if_not_exists: bool, - pub table_options: TableOptions, + pub table_options: TableOptionsV0, pub tunnel: Option, - pub columns: Option>, + pub schema: Option, } impl UserDefinedLogicalNodeCore for CreateExternalTable { diff --git a/crates/sqlexec/src/planner/logical_plan/mod.rs b/crates/sqlexec/src/planner/logical_plan/mod.rs index 171fb7f34..e2702fde1 100644 --- a/crates/sqlexec/src/planner/logical_plan/mod.rs +++ b/crates/sqlexec/src/planner/logical_plan/mod.rs @@ -66,7 +66,6 @@ use protogen::metastore::types::options::{ CopyToFormatOptions, CredentialsOptions, DatabaseOptions, - TableOptions, TunnelOptions, }; pub use set_variable::*; diff --git a/crates/sqlexec/src/planner/physical_plan/create_external_table.rs b/crates/sqlexec/src/planner/physical_plan/create_external_table.rs index ce27065a6..c64298a0b 100644 --- a/crates/sqlexec/src/planner/physical_plan/create_external_table.rs +++ b/crates/sqlexec/src/planner/physical_plan/create_external_table.rs @@ -18,7 +18,7 @@ use datafusion::physical_plan::{ Statistics, }; use futures::stream; -use protogen::metastore::types::options::TableOptions; +use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsV0}; use protogen::metastore::types::service::{self, Mutation}; use super::{new_operation_batch, GENERIC_OPERATION_PHYSICAL_SCHEMA}; @@ -30,8 +30,9 @@ pub struct CreateExternalTableExec { pub tbl_reference: OwnedFullObjectReference, pub or_replace: bool, pub if_not_exists: bool, - pub table_options: TableOptions, + pub table_options: TableOptionsV0, pub tunnel: Option, + pub table_schema: Option, } impl ExecutionPlan for CreateExternalTableExec { @@ -107,6 +108,12 @@ async fn create_external_table( mutator: Arc, plan: CreateExternalTableExec, ) -> DataFusionResult { + // We dont want to tightly couple the metastore types with the arrow types + // so we convert the arrow schema to metastore schema + let columns = plan.table_schema.map(|schema| { + InternalColumnDefinition::from_arrow_fields(schema.fields()).collect::>() + }); + mutator .mutate( plan.catalog_version, @@ -118,6 +125,7 @@ async fn create_external_table( or_replace: plan.or_replace, if_not_exists: plan.if_not_exists, tunnel: plan.tunnel, + columns, }, )], ) diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index f800e2714..5989d5d5f 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -15,7 +15,7 @@ use datafusion::common::parsers::CompressionTypeVariant; use datafusion::common::{OwnedSchemaReference, OwnedTableReference, ToDFSchema}; use datafusion::logical_expr::{cast, col, LogicalPlanBuilder}; use datafusion::sql::planner::{object_name_to_table_reference, IdentNormalizer, PlannerContext}; -use datafusion::sql::sqlparser::ast::{self, Ident, ObjectName, ObjectType}; +use datafusion::sql::sqlparser::ast::{self, ColumnOption, Ident, ObjectName, ObjectType}; use datafusion::sql::TableReference; use datafusion_ext::planner::SqlQueryPlanner; use datafusion_ext::AsyncContextProvider; @@ -103,9 +103,7 @@ use protogen::metastore::types::options::{ DatabaseOptionsSqlite, DeltaLakeCatalog, DeltaLakeUnityCatalog, - InternalColumnDefinition, StorageOptions, - TableOptions, TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, @@ -120,6 +118,7 @@ use protogen::metastore::types::options::{ TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, + TableOptionsV0, TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, @@ -335,10 +334,7 @@ impl<'a> SessionPlanner<'a> { .map_err(|e| PlanError::InvalidExternalDatabase { source: Box::new(e), })?; - DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { - connection_string, - columns: Vec::new(), - }) + DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { connection_string }) } DatabaseOptions::SNOWFLAKE => { let account_name: String = m.remove_required("account")?; @@ -458,54 +454,33 @@ impl<'a> SessionPlanner<'a> { Ok(plan.into_logical_plan()) } - async fn plan_create_external_table( + /// TODO: This is a temporary implementation. + /// The datasource should resolve it's own table options + /// This is mostly for compatibility with the old table options implementation. + /// Once the datasources are updated to use the new table options, this should be removed. + async fn get_tbl_opts_from_v0( &self, - mut stmt: CreateExternalTableStmt, - ) -> Result { - let datasource = normalize_ident(stmt.datasource); - - let tunnel = stmt.tunnel.map(normalize_ident); - let tunnel_options = self.get_tunnel_opts(&tunnel)?; - if let Some(tunnel_options) = &tunnel_options { - // Validate if the tunnel type is supported by the datasource - validate_table_tunnel_support(&datasource, tunnel_options.as_str()).map_err(|e| { - PlanError::InvalidExternalTable { - source: Box::new(e), - } - })?; - } + datasource: &str, + m: &mut StatementOptions, + creds_options: Option, + tunnel_options: Option, + ) -> Result { + Ok(match datasource { + TableOptionsV0::DEBUG => { + datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?; - let creds = stmt.credentials.map(normalize_ident); - let creds_options = self.get_credentials_opts(&creds)?; - if let Some(creds_options) = &creds_options { - validate_table_creds_support(&datasource, creds_options.as_str()).map_err(|e| { - PlanError::InvalidExternalTable { - source: Box::new(e), + let typ: Option = match creds_options { + Some(CredentialsOptions::Debug(c)) => Some(c.table_type.parse()?), + Some(other) => unreachable!("invalid credentials {other} for debug datasource"), + None => None, + }; + let table_type: DebugTableType = m.remove_required_or("table_type", typ)?; + TableOptionsDebug { + table_type: table_type.as_str().to_string(), } - })?; - } - - let m = &mut stmt.options; - - let columns = if let Some(columns) = stmt.columns.map(|coll_def| { - coll_def - .into_iter() - .map(|coll| -> Result, PlanError> { - Ok(Arc::new(Field::new( - coll.name.to_string(), - convert_data_type(&coll.data_type)?, - false, - ))) - }) - .collect::, PlanError>>() - }) { - Some(columns?) - } else { - None - }; - - let external_table_options = match datasource.as_str() { - TableOptions::POSTGRES => { + .into() + } + TableOptionsV0::POSTGRES => { let connection_string = get_pg_conn_str(m)?; let schema: String = m.remove_required("schema")?; let table: String = m.remove_required("table")?; @@ -519,13 +494,14 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; - TableOptions::Postgres(TableOptionsPostgres { + TableOptionsPostgres { connection_string, schema, table, - }) + } + .into() } - TableOptions::BIGQUERY => { + TableOptionsV0::BIGQUERY => { let service_account_key = creds_options.as_ref().map(|c| match c { CredentialsOptions::Gcp(c) => c.service_account_key.clone(), other => unreachable!("invalid credentials {other} for bigquery"), @@ -549,14 +525,15 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; - TableOptions::BigQuery(TableOptionsBigQuery { + TableOptionsBigQuery { service_account_key, project_id, dataset_id: access.dataset_id, table_id: access.table_id, - }) + } + .into() } - TableOptions::MYSQL => { + TableOptionsV0::MYSQL => { let connection_string = get_mysql_conn_str(m)?; let schema = m.remove_required("schema")?; let table = m.remove_required("table")?; @@ -572,30 +549,26 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; - TableOptions::Mysql(TableOptionsMysql { + TableOptionsMysql { connection_string, schema: access.schema, table: access.name, - }) + } + .into() } - TableOptions::MONGODB => { + TableOptionsV0::MONGODB => { let connection_string = get_mongodb_conn_str(m)?; let database = m.remove_required("database")?; let collection = m.remove_required("collection")?; - // TODO: Validate - let cols = columns - .clone() - .map(InternalColumnDefinition::from_arrow_fields); - - TableOptions::MongoDb(TableOptionsMongoDb { + TableOptionsMongoDb { connection_string, database, collection, - columns: cols, - }) + } + .into() } - TableOptions::SNOWFLAKE => { + TableOptionsV0::SNOWFLAKE => { let account_name: String = m.remove_required("account")?; let login_name: String = m.remove_required("username")?; let password: String = m.remove_required("password")?; @@ -625,7 +598,7 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; - TableOptions::Snowflake(TableOptionsSnowflake { + TableOptionsSnowflake { account_name, login_name, password, @@ -634,9 +607,10 @@ impl<'a> SessionPlanner<'a> { role_name: role_name.unwrap_or_default(), schema_name: access_info.schema_name, table_name: access_info.table_name, - }) + } + .into() } - TableOptions::SQL_SERVER => { + TableOptionsV0::SQL_SERVER => { let connection_string: String = m.remove_required("connection_string")?; let schema_name: String = m.remove_required("schema")?; let table_name: String = m.remove_required("table")?; @@ -647,13 +621,14 @@ impl<'a> SessionPlanner<'a> { .validate_table_access(&schema_name, &table_name) .await?; - TableOptions::SqlServer(TableOptionsSqlServer { + TableOptionsSqlServer { connection_string, schema: schema_name, table: table_name, - }) + } + .into() } - TableOptions::CLICKHOUSE => { + TableOptionsV0::CLICKHOUSE => { let connection_string: String = m.remove_required("connection_string")?; let table_name: String = m.remove_required("table")?; @@ -670,13 +645,14 @@ impl<'a> SessionPlanner<'a> { access.validate_table_access(table_ref.as_ref()).await?; - TableOptions::Clickhouse(TableOptionsClickhouse { + TableOptionsClickhouse { connection_string, table: table_name, database: database_name, - }) + } + .into() } - TableOptions::CASSANDRA => { + TableOptionsV0::CASSANDRA => { let host: String = m.remove_required("host")?; let keyspace: String = m.remove_required("keyspace")?; let table: String = m.remove_required("table")?; @@ -687,15 +663,16 @@ impl<'a> SessionPlanner<'a> { .await?; access.validate_table_access(&keyspace, &table).await?; - TableOptions::Cassandra(TableOptionsCassandra { + TableOptionsCassandra { host, keyspace, table, username, password, - }) + } + .into() } - TableOptions::SQLITE => { + TableOptionsV0::SQLITE => { let location: String = m.remove_required("location")?; let table: String = m.remove_required("table")?; let mut storage_options = StorageOptions::try_from(m)?; @@ -703,30 +680,30 @@ impl<'a> SessionPlanner<'a> { storage_options_with_credentials(&mut storage_options, creds); } - TableOptions::Sqlite(TableOptionsObjectStore { + TableOptionsV0::Sqlite(TableOptionsObjectStore { location, storage_options, name: table.into(), file_type: None, compression: None, schema_sample_size: None, - columns: Vec::new(), }) } - TableOptions::LOCAL => { + TableOptionsV0::LOCAL => { let location: String = m.remove_required("location")?; let access = Arc::new(LocalStoreAccess); let (file_type, compression) = validate_and_get_file_type_and_compression(access, &location, m).await?; - TableOptions::Local(TableOptionsLocal { + TableOptionsLocal { location, file_type: file_type.to_string().to_lowercase(), compression: compression.map(|c| c.to_string()), - }) + } + .into() } - TableOptions::GCS => { + TableOptionsV0::GCS => { let service_account_key = creds_options.as_ref().map(|c| match c { CredentialsOptions::Gcp(c) => c.service_account_key.clone(), other => unreachable!("invalid credentials {other} for google cloud storage"), @@ -746,15 +723,16 @@ impl<'a> SessionPlanner<'a> { let (file_type, compression) = validate_and_get_file_type_and_compression(access, &location, m).await?; - TableOptions::Gcs(TableOptionsGcs { + TableOptionsGcs { bucket, service_account_key, location, file_type, compression: compression.map(|c| c.to_string()), - }) + } + .into() } - TableOptions::S3_STORAGE => { + TableOptionsV0::S3_STORAGE => { let creds = creds_options.as_ref().map(|c| match c { CredentialsOptions::Aws(c) => c, other => unreachable!("invalid credentials {other} for aws s3"), @@ -787,7 +765,7 @@ impl<'a> SessionPlanner<'a> { let (file_type, compression) = validate_and_get_file_type_and_compression(access, &location, m).await?; - TableOptions::S3(TableOptionsS3 { + TableOptionsS3 { region, bucket, access_key_id, @@ -795,9 +773,10 @@ impl<'a> SessionPlanner<'a> { location, file_type: file_type.to_string(), compression: compression.map(|c| c.to_string()), - }) + } + .into() } - TableOptions::AZURE => { + TableOptionsV0::AZURE => { let (account, access_key) = match creds_options { Some(CredentialsOptions::Azure(c)) => { (Some(c.account_name.clone()), Some(c.access_key.clone())) @@ -841,17 +820,17 @@ impl<'a> SessionPlanner<'a> { opts.inner .insert(AzureConfigKey::AccessKey.as_ref().to_string(), access_key); - TableOptions::Azure(TableOptionsObjectStore { + + TableOptionsV0::Azure(TableOptionsObjectStore { name: None, location: source_url, storage_options: opts, file_type: Some(file_type.to_string()), compression: compression.map(|c| c.to_string()), schema_sample_size: None, - columns: Vec::new(), }) } - TableOptions::DELTA | TableOptions::ICEBERG => { + TableOptionsV0::DELTA | TableOptionsV0::ICEBERG => { let location: String = m.remove_required("location")?; let mut storage_options = StorageOptions::try_from(m)?; @@ -859,16 +838,15 @@ impl<'a> SessionPlanner<'a> { storage_options_with_credentials(&mut storage_options, creds); } - if datasource.as_str() == TableOptions::DELTA { + if datasource == TableOptionsV0::DELTA { let _table = load_table_direct(&location, storage_options.clone()).await?; - TableOptions::Delta(TableOptionsObjectStore { + TableOptionsV0::Delta(TableOptionsObjectStore { location, storage_options, name: None, file_type: None, compression: None, - columns: Vec::new(), schema_sample_size: None, }) } else { @@ -876,32 +854,17 @@ impl<'a> SessionPlanner<'a> { let store = storage_options_into_object_store(&url, &storage_options)?; let _table = IcebergTable::open(url, store).await?; - TableOptions::Iceberg(TableOptionsObjectStore { + TableOptionsV0::Iceberg(TableOptionsObjectStore { location, storage_options, name: None, file_type: None, compression: None, schema_sample_size: None, - columns: Vec::new(), }) } } - TableOptions::DEBUG => { - datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?; - - let typ: Option = match creds_options { - Some(CredentialsOptions::Debug(c)) => Some(c.table_type.parse()?), - Some(other) => unreachable!("invalid credentials {other} for debug datasource"), - None => None, - }; - let typ: DebugTableType = m.remove_required_or("table_type", typ)?; - - TableOptions::Debug(TableOptionsDebug { - table_type: typ.to_string(), - }) - } - TableOptions::LANCE => { + TableOptionsV0::LANCE => { let location: String = m.remove_required("location")?; let mut storage_options = StorageOptions::try_from(m)?; if let Some(creds) = creds_options { @@ -909,17 +872,16 @@ impl<'a> SessionPlanner<'a> { } // Validate that the table exists. let _table = LanceTable::new(&location, storage_options.clone()).await?; - TableOptions::Lance(TableOptionsObjectStore { + TableOptionsV0::Lance(TableOptionsObjectStore { location, storage_options, name: None, file_type: None, compression: None, schema_sample_size: None, - columns: Vec::new(), }) } - TableOptions::BSON => { + TableOptionsV0::BSON => { let location: String = m.remove_required("location")?; let mut storage_options = StorageOptions::try_from(m)?; if let Some(creds) = creds_options { @@ -932,21 +894,16 @@ impl<'a> SessionPlanner<'a> { .map(|strint| strint.parse()) .unwrap_or(Ok(100))?, ); - - TableOptions::Bson(TableOptionsObjectStore { + TableOptionsV0::Bson(TableOptionsObjectStore { location, storage_options, name: None, file_type: None, compression: None, schema_sample_size, - columns: columns - .clone() - .map(InternalColumnDefinition::from_arrow_fields) - .unwrap_or(Vec::new()), }) } - TableOptions::EXCEL => { + TableOptionsV0::EXCEL => { let location: String = m.remove_required("location")?; let mut storage_options = StorageOptions::try_from(m)?; if let Some(creds) = creds_options { @@ -971,17 +928,84 @@ impl<'a> SessionPlanner<'a> { } }; - TableOptions::Excel(TableOptionsExcel { + TableOptionsExcel { location, storage_options, file_type: None, compression: None, sheet_name, has_header, - }) + } + .into() } other => return Err(internal!("unsupported datasource: {}", other)), - }; + }) + } + + async fn plan_create_external_table( + &self, + mut stmt: CreateExternalTableStmt, + ) -> Result { + let datasource = normalize_ident(stmt.datasource); + + let tunnel = stmt.tunnel.map(normalize_ident); + let tunnel_options = self.get_tunnel_opts(&tunnel)?; + if let Some(tunnel_options) = &tunnel_options { + // Validate if the tunnel type is supported by the datasource + validate_table_tunnel_support(&datasource, tunnel_options.as_str()).map_err(|e| { + PlanError::InvalidExternalTable { + source: Box::new(e), + } + })?; + } + + let creds = stmt.credentials.map(normalize_ident); + let creds_options = self.get_credentials_opts(&creds)?; + if let Some(creds_options) = &creds_options { + validate_table_creds_support(&datasource, creds_options.as_str()).map_err(|e| { + PlanError::InvalidExternalTable { + source: Box::new(e), + } + })?; + } + + + let schema = stmt + .columns + .map(|columns| { + let fields = columns + .into_iter() + .map(|coll| -> Result { + // check if there is a NOT NULL constraint + let has_not_null_constraint = coll + .options + .into_iter() + .any(|k| matches!(k.option, ColumnOption::NotNull)); + + if has_not_null_constraint { + Err(PlanError::String( + "'NOT NULL' constraint is not supported".to_string(), + )) + } else { + Ok(Field::new( + coll.name.to_string(), + convert_data_type(&coll.data_type)?, + true, + )) + } + }) + .collect::, PlanError>>()?; + Ok::<_, PlanError>(Schema::new(fields)) + }) + .transpose()?; + let m = &mut stmt.options; + + // The mutator uses the new table options, but the catalog uses the old ones. + // so we need to convert the old options to the new ones. + let external_table_options = self + .get_tbl_opts_from_v0(datasource.as_str(), m, creds_options, tunnel_options) + .await?; + let table_name = object_name_to_table_ref(stmt.name)?; @@ -991,7 +1015,7 @@ impl<'a> SessionPlanner<'a> { if_not_exists: stmt.if_not_exists, table_options: external_table_options, tunnel, - columns: columns.clone(), + schema, }; Ok(plan.into_logical_plan()) diff --git a/crates/sqlexec/src/remote/planner.rs b/crates/sqlexec/src/remote/planner.rs index c098802c1..b1abadc2e 100644 --- a/crates/sqlexec/src/remote/planner.rs +++ b/crates/sqlexec/src/remote/planner.rs @@ -160,6 +160,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { if_not_exists: lp.if_not_exists, tunnel: lp.tunnel.clone(), table_options: lp.table_options.clone(), + table_schema: lp.schema.clone(), }; RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } diff --git a/external_schema.bson b/external_schema.bson new file mode 100644 index 000000000..5d7f693d1 Binary files /dev/null and b/external_schema.bson differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000000.json b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..a3cce5a32 --- /dev/null +++ b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"5d6a594d-1221-4f48-8b24-aba01bcb6828","name":"test","description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"f\",\"type\":{\"type\":\"array\",\"elementType\":\"long\",\"containsNull\":true},\"nullable\":true,\"metadata\":{\"arrow_type\":\"{\\\"FixedSizeList\\\":[{\\\"name\\\":\\\"item\\\",\\\"data_type\\\":\\\"Int64\\\",\\\"nullable\\\":true,\\\"dict_id\\\":0,\\\"dict_is_ordered\\\":false,\\\"metadata\\\":{}},2]}\"}}]}","partitionColumns":[],"createdTime":1710266706730,"configuration":{}}} +{"commitInfo":{"timestamp":1710266706731,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"id\":\"5d6a594d-1221-4f48-8b24-aba01bcb6828\",\"name\":\"test\",\"description\":null,\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"f\\\",\\\"type\\\":{\\\"type\\\":\\\"array\\\",\\\"elementType\\\":\\\"long\\\",\\\"containsNull\\\":true},\\\"nullable\\\":true,\\\"metadata\\\":{\\\"arrow_type\\\":\\\"{\\\\\\\"FixedSizeList\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"item\\\\\\\",\\\\\\\"data_type\\\\\\\":\\\\\\\"Int64\\\\\\\",\\\\\\\"nullable\\\\\\\":true,\\\\\\\"dict_id\\\\\\\":0,\\\\\\\"dict_is_ordered\\\\\\\":false,\\\\\\\"metadata\\\\\\\":{}},2]}\\\"}}]}\",\"partitionColumns\":[],\"createdTime\":1710266706730,\"configuration\":{}}","location":"file:///Users/corygrinstead/Development/glaredb/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.0"}} \ No newline at end of file diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000001.json b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..c3f8b3ab9 --- /dev/null +++ b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-0c38537e-ad13-48ff-a7cc-583720c2fd0a-c000.snappy.parquet","partitionValues":{},"size":971,"modificationTime":1710266706735,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{},\"maxValues\":{},\"nullCount\":{\"f\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1710266706735,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.17.0"}} \ No newline at end of file diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/part-00001-0c38537e-ad13-48ff-a7cc-583720c2fd0a-c000.snappy.parquet b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/part-00001-0c38537e-ad13-48ff-a7cc-583720c2fd0a-c000.snappy.parquet new file mode 100644 index 000000000..91c9f692c Binary files /dev/null and b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/tables/20000/part-00001-0c38537e-ad13-48ff-a7cc-583720c2fd0a-c000.snappy.parquet differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.0 b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.0 new file mode 100644 index 000000000..7ba9a8777 Binary files /dev/null and b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.0 differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.1 b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.1 new file mode 100644 index 000000000..f5040047b Binary files /dev/null and b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.1 differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.2 b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.2 new file mode 100644 index 000000000..6e73d1989 Binary files /dev/null and b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.2 differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.3 b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.3 new file mode 100644 index 000000000..de4db3675 Binary files /dev/null and b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/catalog.3 differ diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/lease b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/lease new file mode 100644 index 000000000..a1a13e133 --- /dev/null +++ b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/lease @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/metadata b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/metadata new file mode 100644 index 000000000..7f0113fc2 --- /dev/null +++ b/testdata/catalog_compat/v0/databases/00000000-0000-0000-0000-000000000000/visible/metadata @@ -0,0 +1 @@ +dr%<ª£J¦®ß—$vé \ No newline at end of file diff --git a/testdata/sqllogictests/external_schema.slt b/testdata/sqllogictests/external_schema.slt index bd4fa2e56..a261a9d36 100644 --- a/testdata/sqllogictests/external_schema.slt +++ b/testdata/sqllogictests/external_schema.slt @@ -1,3 +1,5 @@ + + statement ok create table src (x int, y text, z int); diff --git a/testdata/sqllogictests_bigquery/basic.slt b/testdata/sqllogictests_bigquery/basic.slt index f07f49013..7e5237924 100644 --- a/testdata/sqllogictests_bigquery/basic.slt +++ b/testdata/sqllogictests_bigquery/basic.slt @@ -1,8 +1,8 @@ # Basic tests for bigquery external tables statement ok -CREATE EXTERNAL TABLE basic - FROM bigquery +CREATE EXTERNAL TABLE basic +FROM bigquery OPTIONS ( service_account_key = '${GCP_SERVICE_ACCOUNT_KEY}', project_id = '${GCP_PROJECT_ID}', diff --git a/testdata/sqllogictests_clickhouse/datatypes.slt b/testdata/sqllogictests_clickhouse/datatypes.slt index 90ed612d8..19cc9b20a 100644 --- a/testdata/sqllogictests_clickhouse/datatypes.slt +++ b/testdata/sqllogictests_clickhouse/datatypes.slt @@ -2,12 +2,12 @@ # Create an external table that connects to the datatypes table. statement ok -CREATE EXTERNAL TABLE datatypes - FROM clickhouse - OPTIONS ( - connection_string = '${CLICKHOUSE_CONN_STRING}', - table = 'datatypes' - ); +CREATE EXTERNAL TABLE datatypes +FROM clickhouse +OPTIONS ( + connection_string = '${CLICKHOUSE_CONN_STRING}', + table = 'datatypes' +); # Check if we can fetch contents of the datatype table. # diff --git a/testdata/sqllogictests_object_store/local/basic.slt b/testdata/sqllogictests_object_store/local/basic.slt index b51a0a0f7..d99dba26a 100644 --- a/testdata/sqllogictests_object_store/local/basic.slt +++ b/testdata/sqllogictests_object_store/local/basic.slt @@ -7,11 +7,11 @@ statement ok set search_path = local_object_store; statement ok -CREATE EXTERNAL TABLE basic - FROM local - OPTIONS ( - location = '${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv' - ); +CREATE EXTERNAL TABLE basic +FROM local +OPTIONS ( + location = '${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv' +); # TODO re-enable when CSV related querying is improved, (timestamp issue) halt