Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(table options): backwards compat for catalog #2775

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/datasources/src/debug/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_ext::functions::VirtualLister;
use errors::DebugError;
use futures::Stream;
use parser::options::StatementOptions;
use protogen::metastore::types::options::{CredentialsOptions, TableOptions, TunnelOptions};
use protogen::metastore::types::options::{CredentialsOptions, TableOptionsV1, TunnelOptions};

pub use self::options::{DebugTableType, TableOptionsDebug};
use crate::DatasourceError;
Expand Down Expand Up @@ -259,7 +259,7 @@ impl crate::Datasource for DebugDatasource {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError> {
) -> Result<TableOptionsV1, DatasourceError> {
validate_tunnel_connections(tunnel_opts.as_ref()).unwrap();

let typ: Option<DebugTableType> = match creds {
Expand All @@ -276,7 +276,7 @@ impl crate::Datasource for DebugDatasource {

async fn create_table_provider(
&self,
options: &TableOptions,
options: &TableOptionsV1,
tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError> {
let options: TableOptionsDebug = options.extract()?;
Expand Down
9 changes: 5 additions & 4 deletions crates/datasources/src/debug/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ use serde::{Deserialize, Serialize};
use super::errors::DebugError;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DebugTableType {
/// A table that will always return an error on the record batch stream.
ErrorDuringExecution,
/// A table that never stops sending record batches.
NeverEnding,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableOptionsDebug {
pub table_type: DebugTableType,
}

impl ParseOptionValue<DebugTableType> for SqlOptionValue {
fn parse_opt(self) -> Result<DebugTableType, ParserError> {
Expand Down Expand Up @@ -124,10 +129,6 @@ impl DebugTableType {
}


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableOptionsDebug {
pub table_type: DebugTableType,
}
impl TableOptionsImpl for TableOptionsDebug {
const NAME: &'static str = "debug";
}
Expand Down
6 changes: 3 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use parser::options::StatementOptions;
use protogen::metastore::types::options::{
CredentialsOptions,
StorageOptions,
TableOptions,
TableOptionsObjectStore,
TableOptionsV1,
TunnelOptions,
};

Expand Down Expand Up @@ -93,7 +93,7 @@ impl Datasource for LanceDatasource {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
_tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError> {
) -> Result<TableOptionsV1, DatasourceError> {
let location: String = opts.remove_required("location")?;
let mut storage_options = StorageOptions::try_from(opts)?;
if let Some(creds) = creds {
Expand All @@ -113,7 +113,7 @@ impl Datasource for LanceDatasource {

async fn create_table_provider(
&self,
options: &TableOptions,
options: &TableOptionsV1,
_tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError> {
let TableOptionsObjectStore {
Expand Down
6 changes: 3 additions & 3 deletions crates/datasources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use parser::options::StatementOptions;
use protogen::metastore::types::options::{CredentialsOptions, TableOptions, TunnelOptions};
use protogen::metastore::types::options::{CredentialsOptions, TableOptionsV1, TunnelOptions};
pub mod bigquery;
pub mod bson;
pub mod cassandra;
Expand Down Expand Up @@ -46,12 +46,12 @@ pub trait Datasource: Send + Sync {
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError>;
) -> Result<TableOptionsV1, DatasourceError>;


async fn create_table_provider(
&self,
tbl_options: &TableOptions,
tbl_options: &TableOptionsV1,
_tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError>;
}
2 changes: 1 addition & 1 deletion crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl NativeTableStorage {

fn opts_from_ent(table: &TableEntry) -> Result<TableOptionsInternal> {
let opts = match table.options.name.as_ref() {
"internal" => (&table.options).extract_unchecked(), // variant is already checked
"internal" => table.options.extract_unchecked(), // variant is already checked
_ => return Err(NativeError::NotNative(table.clone())),
};
Ok(opts)
Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use object_store::{ObjectMeta, ObjectStore};
use protogen::metastore::types::options::{
CredentialsOptions,
StorageOptions,
TableOptions,
TableOptionsGcs,
TableOptionsObjectStore,
TableOptionsS3,
TableOptionsV1,
};

use self::azure::AzureStoreAccess;
Expand Down Expand Up @@ -390,7 +390,7 @@ pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {

pub fn init_session_registry<'a>(
runtime: &RuntimeEnv,
entries: impl Iterator<Item = &'a TableOptions>,
entries: impl Iterator<Item = &'a TableOptionsV1>,
) -> Result<()> {
for opts in entries {
let access: Arc<dyn ObjStoreAccess> = match opts.name.as_ref() {
Expand Down
37 changes: 37 additions & 0 deletions crates/glaredb/tests/catalog_compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
mod setup;
use crate::setup::make_cli;


#[test]
/// Assert that we can still load the old catalog without panicking
fn test_catalog_backwards_compat() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");
cmd.args(["-l", old_catalog.to_str().unwrap()])
.assert()
.success();
}


#[test]
/// Make sure that we can read the table options from the old catalog
/// The v0 catalog has a table created from the following SQL:
/// ```sql
/// CREATE EXTERNAL TABLE debug_table
/// FROM debug OPTIONS (
/// table_type 'never_ending'
/// );
/// ```
fn test_catalog_backwards_compat_tbl_options() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");

let query = "SELECT * FROM debug_table LIMIT 1";
cmd.args(["-l", old_catalog.to_str().unwrap(), "-q", query])
.assert()
.success();
}
3 changes: 3 additions & 0 deletions crates/metastore/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use protogen::metastore::types::catalog::{
TableEntry,
TunnelEntry,
ViewEntry,
CURRENT_CATALOG_VERSION,
};
use protogen::metastore::types::options::{
DatabaseOptions,
Expand Down Expand Up @@ -183,6 +184,7 @@ impl DatabaseCatalog {
version: guard.version,
entries: guard.entries.as_ref().clone(),
deployment: guard.deployment.clone(),
catalog_version: CURRENT_CATALOG_VERSION,
}
}

Expand Down Expand Up @@ -530,6 +532,7 @@ impl State {
.into_iter()
.filter(|(_, ent)| !ent.get_meta().builtin)
.collect(),
catalog_version: CURRENT_CATALOG_VERSION,
},
extra: ExtraState {
oid_counter: self.oid_counter,
Expand Down
7 changes: 6 additions & 1 deletion crates/metastore/src/storage/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use object_store::{Error as ObjectStoreError, ObjectStore};
use pgrepr::oid::FIRST_AVAILABLE_ID;
use prost::Message;
use protogen::gen::metastore::storage;
use protogen::metastore::types::catalog::{CatalogState, DeploymentMetadata};
use protogen::metastore::types::catalog::{
CatalogState,
DeploymentMetadata,
CURRENT_CATALOG_VERSION,
};
use protogen::metastore::types::storage::{CatalogMetadata, ExtraState, PersistedCatalog};
use tracing::{debug, error};
use uuid::Uuid;
Expand Down Expand Up @@ -75,6 +79,7 @@ impl Storage {
version: 0,
entries: HashMap::new(),
deployment: DeploymentMetadata { storage_size: 0 },
catalog_version: CURRENT_CATALOG_VERSION,
},
extra: ExtraState {
oid_counter: FIRST_AVAILABLE_ID,
Expand Down
15 changes: 12 additions & 3 deletions crates/protogen/proto/metastore/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ message CatalogState {

// Metadata for the deployment.
DeploymentMetadata deployment = 3;

// next: 4

// The version of the actual catalog implementation.
// This is incremented when there are incompatible changes to the physical
// catalog layout.
// Purposely set to a high number to avoid accidental collisions
optional uint32 catalog_version = 999;
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
}

// Metadata for the deployment.
Expand Down Expand Up @@ -160,11 +165,15 @@ message TableEntry {

reserved 2; // Column fields

options.TableOptions options = 3;
// The old table options. This is deprecated and should not be used.
// It is only here for backwards compatibility.
options.TableOptionsV0 options_v0 = 3;
optional uint32 tunnel_id = 4;
SourceAccessMode access_mode = 5;
optional common.arrow.Schema schema = 6;
// next: 7
// The new table options.
options.TableOptionsV1 options = 7;
// next: 8
}

message ViewEntry {
Expand Down
4 changes: 2 additions & 2 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ message StorageOptions {
map<string, string> inner = 1;
}

message TableOptions {
message TableOptionsV1 {
string name = 1;
bytes options = 2;
}

// Table options

message TableOptionsOld {
message TableOptionsV0 {
oneof options {
TableOptionsInternal internal = 1;
TableOptionsDebug debug = 2;
Expand Down
2 changes: 1 addition & 1 deletion crates/protogen/proto/metastore/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ message CreateFunction {
message CreateExternalTable {
string schema = 1;
string name = 2;
options.TableOptions options = 3;
options.TableOptionsV1 options = 3;
bool if_not_exists = 4;
optional string tunnel = 5;
bool or_replace = 6;
Expand Down
29 changes: 25 additions & 4 deletions crates/protogen/src/metastore/types/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ use super::options::{
CredentialsOptions,
DatabaseOptions,
InternalColumnDefinition,
TableOptions,
TableOptionsInternal,
TableOptionsV0,
TableOptionsV1,
TunnelOptions,
};
use crate::gen::common::arrow::ArrowType;
use crate::gen::metastore::catalog::{self, type_signature};
use crate::{FromOptionalField, ProtoConvError};

/// The current version of the catalog.
/// this is incremented every time there is a breaking change to the catalog.
pub const CURRENT_CATALOG_VERSION: u32 = 1;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CatalogState {
/// the version of the catalog state. This increments on every Mutation
pub version: u64,
pub entries: HashMap<u32, CatalogEntry>,
pub deployment: DeploymentMetadata,
/// This is the version of the catalog implementation.
/// This is incremented when the catalog changes in a way that is not backwards compatible.
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
pub catalog_version: u32,
}

impl TryFrom<catalog::CatalogState> for CatalogState {
Expand All @@ -47,6 +56,7 @@ impl TryFrom<catalog::CatalogState> for CatalogState {
version: value.version,
entries,
deployment,
catalog_version: value.catalog_version.unwrap_or(0),
})
}
}
Expand All @@ -65,6 +75,7 @@ impl TryFrom<CatalogState> for catalog::CatalogState {
})
.collect::<Result<_, _>>()?,
deployment: Some(value.deployment.try_into()?),
catalog_version: Some(value.catalog_version),
})
}
}
Expand Down Expand Up @@ -438,7 +449,7 @@ impl From<SchemaEntry> for catalog::SchemaEntry {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableEntry {
pub meta: EntryMeta,
pub options: TableOptions,
pub options: TableOptionsV1,
pub tunnel_id: Option<u32>,
pub access_mode: SourceAccessMode,
pub schema: Option<Schema>,
Expand All @@ -464,11 +475,18 @@ impl TryFrom<catalog::TableEntry> for TableEntry {
type Error = ProtoConvError;
fn try_from(value: catalog::TableEntry) -> Result<Self, Self::Error> {
let meta: EntryMeta = value.meta.required("meta")?;

let options_old = value.options_v0;
let options: TableOptionsV1 = match options_old {
Some(options) => {
let options_v0: TableOptionsV0 = options.try_into()?;
options_v0.into()
}
None => value.options.required("options")?,
};

Ok(TableEntry {
meta,
options: value.options.required("options")?,
options,
tunnel_id: value.tunnel_id,
access_mode: value.access_mode.try_into()?,
schema: None,
Expand All @@ -484,6 +502,7 @@ impl From<TableEntry> for catalog::TableEntry {
tunnel_id: value.tunnel_id,
access_mode: value.access_mode.into(),
schema: None,
options_v0: None,
}
}
}
Expand Down Expand Up @@ -863,13 +882,15 @@ mod tests {
version: 4,
entries: HashMap::new(),
deployment: None,
catalog_version: None,
};

let converted: CatalogState = state.try_into().unwrap();
let expected = CatalogState {
version: 4,
entries: HashMap::new(),
deployment: DeploymentMetadata { storage_size: 0 },
catalog_version: 0,
};

assert_eq!(expected, converted);
Expand Down
Loading
Loading