Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the DynamicFileCatalog in datafusion-catalog #11035

Merged
merged 60 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
37b5526
early draft
goldmedal Jun 20, 2024
ad1a854
fmt
goldmedal Jun 20, 2024
97ea11c
add example for dynamic file query
goldmedal Jun 21, 2024
2729c49
add test and refactor
goldmedal Jun 21, 2024
6f86577
clippy and add doc
goldmedal Jun 21, 2024
c91cdc6
cargo fmt
goldmedal Jun 21, 2024
3306df6
extract substitute_tilde function
goldmedal Jun 21, 2024
d82b273
fix the error handling
goldmedal Jun 21, 2024
c0491d5
fmt and clippy
goldmedal Jun 21, 2024
a60eeea
fix test
goldmedal Jun 21, 2024
9fa01aa
fix sqllogictests
goldmedal Jun 22, 2024
2ab3639
ignore dirs for windows test
goldmedal Jun 22, 2024
a8ee733
enhance the test for every file format
goldmedal Jun 22, 2024
7faab9f
disable the test for windows
goldmedal Jun 22, 2024
e1f3908
make dynamic file query configurable
goldmedal Jul 1, 2024
cf73ba2
revert array_query.slt
goldmedal Jul 1, 2024
c641e6b
modified the test and add example
goldmedal Jul 1, 2024
0806263
make dirs be optional
goldmedal Jul 1, 2024
f4d24e6
enable dynamic file query in cli
goldmedal Jul 1, 2024
4b71e59
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Jul 1, 2024
9964150
cargo fmt
goldmedal Jul 1, 2024
da1e5d3
modified example
goldmedal Jul 1, 2024
ed670fe
fix test
goldmedal Jul 1, 2024
ea5816e
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 13, 2024
fb8b9e0
fix merge conflict
goldmedal Aug 13, 2024
fa73ae7
tmp
goldmedal Aug 14, 2024
04cc155
tmp
goldmedal Aug 14, 2024
1ede35e
tmp
goldmedal Aug 14, 2024
51b1d41
fix the catalog and schema
goldmedal Aug 15, 2024
75b0b84
move dynamic file catalog to datafusion-catalog
goldmedal Aug 15, 2024
3e8d094
add copyright
goldmedal Aug 15, 2024
4eb8ca5
fix tests
goldmedal Aug 16, 2024
9913405
rename catalog in cli and update lock
goldmedal Aug 16, 2024
5d861b8
enable home_dir feature
goldmedal Aug 16, 2024
ea1c075
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 16, 2024
16be2e7
update lock
goldmedal Aug 16, 2024
db90c28
fix compile
goldmedal Aug 16, 2024
9353123
fix clippy
goldmedal Aug 16, 2024
daa7ed8
fmt toml
goldmedal Aug 16, 2024
e4a2174
fix doc test and add more doc
goldmedal Aug 16, 2024
506d1d6
fix clippy
goldmedal Aug 16, 2024
72ce464
add home_dir feature doc
goldmedal Aug 16, 2024
fb1b6ce
rollback the unused changed
goldmedal Aug 16, 2024
8f0952d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 18, 2024
f062fec
update lock
goldmedal Aug 18, 2024
b1baa84
fix sqllogictest
goldmedal Aug 18, 2024
76d7fee
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 20, 2024
f0f070b
separate dynamic file test to another slt
goldmedal Aug 20, 2024
6b77b6b
add test for querying url table but disabled this feature
goldmedal Aug 20, 2024
4e51a77
add dynamic_file.slt
goldmedal Aug 20, 2024
fafc9dc
remove home_dir feature
goldmedal Aug 20, 2024
a3a4f4d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 21, 2024
7dc238f
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Sep 4, 2024
f7b4b8c
update cli lock
goldmedal Sep 4, 2024
25d0ff6
fix msrv check
goldmedal Sep 4, 2024
a78bd3c
fix msrv check
goldmedal Sep 4, 2024
edeff33
rollback the lock change
goldmedal Sep 4, 2024
b1a922c
address review comment and enhance the doc
goldmedal Sep 7, 2024
e5ab14d
remove the legacy comment
goldmedal Sep 7, 2024
87d7503
add missing doc
goldmedal Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 86 additions & 71 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

76 changes: 35 additions & 41 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use std::sync::{Arc, Weak};
use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};

use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
Expand All @@ -34,14 +33,13 @@ use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::RwLock;

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
pub struct DynamicFileCatalog {
/// Wraps another catalog, automatically register require object stores for the file locations
pub struct DynamicObjectStoreCatalog {
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileCatalog {
impl DynamicObjectStoreCatalog {
pub fn new(
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -50,7 +48,7 @@ impl DynamicFileCatalog {
}
}

impl CatalogProviderList for DynamicFileCatalog {
impl CatalogProviderList for DynamicObjectStoreCatalog {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -69,19 +67,19 @@ impl CatalogProviderList for DynamicFileCatalog {

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let state = self.state.clone();
self.inner
.catalog(name)
.map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _)
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _
})
}
}

/// Wraps another catalog provider
struct DynamicFileCatalogProvider {
struct DynamicObjectStoreCatalogProvider {
inner: Arc<dyn CatalogProvider>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileCatalogProvider {
impl DynamicObjectStoreCatalogProvider {
pub fn new(
inner: Arc<dyn CatalogProvider>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -90,7 +88,7 @@ impl DynamicFileCatalogProvider {
}
}

impl CatalogProvider for DynamicFileCatalogProvider {
impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -101,9 +99,9 @@ impl CatalogProvider for DynamicFileCatalogProvider {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let state = self.state.clone();
self.inner
.schema(name)
.map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _)
self.inner.schema(name).map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
}

fn register_schema(
Expand All @@ -115,13 +113,14 @@ impl CatalogProvider for DynamicFileCatalogProvider {
}
}

/// Wraps another schema provider
struct DynamicFileSchemaProvider {
/// Wraps another schema provider. [DynamicObjectStoreSchemaProvider] is responsible for registering the required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand why datafusion-cli can't simply use DynamicFileCatalog -- why does it need another layer of wrapping?

Is it the need to dynamically create ObjectStores?

I have found that dynamic creation to be one of the more complicated things about datafusion-cli (and what users of DataFusion would have to figure out to recreate it). Maybe we can figure out some simpler API for that too (as another PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can provide something like installing extensions for datafusion-cli? There're some similar use cases in DuckDB httpfs. Before scanning the remote file, user should load the corresponding extensions (BTW, they also do some autolaoding for specific extensions, e.g. httpfs)

I think it maybe related to the dfdb purpose #11979 . It's a nice way to provide more feature for the user.

/// object stores for the file locations.
struct DynamicObjectStoreSchemaProvider {
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileSchemaProvider {
impl DynamicObjectStoreSchemaProvider {
pub fn new(
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -131,7 +130,7 @@ impl DynamicFileSchemaProvider {
}

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
impl SchemaProvider for DynamicObjectStoreSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -149,9 +148,11 @@ impl SchemaProvider for DynamicFileSchemaProvider {
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return Ok(inner_table);
let inner_table = self.inner.table(name).await;
if inner_table.is_ok() {
if let Some(inner_table) = inner_table? {
return Ok(Some(inner_table));
}
}

// if the inner schema provider didn't have a table by
Expand Down Expand Up @@ -201,16 +202,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
state.runtime_env().register_object_store(url, store);
}
}

let config = match ListingTableConfig::new(table_url).infer(&state).await {
Ok(cfg) => cfg,
Err(_) => {
// treat as non-existing
return Ok(None);
}
};

Ok(Some(Arc::new(ListingTable::try_new(config)?)))
self.inner.table(name).await
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -221,7 +213,8 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.table_exist(name)
}
}
fn substitute_tilde(cur: String) -> String {

pub fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
if cur.starts_with('~') && !usr_dir.is_empty() {
Expand All @@ -231,22 +224,22 @@ fn substitute_tilde(cur: String) -> String {
}
cur
}

#[cfg(test)]
mod tests {

use super::*;

use datafusion::catalog::SchemaProvider;
use datafusion::prelude::SessionContext;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
)));

let provider = &DynamicFileCatalog::new(
let provider = &DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
Expand All @@ -269,7 +262,7 @@ mod tests {
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
Expand All @@ -293,7 +286,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -315,7 +308,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -337,6 +330,7 @@ mod tests {

assert!(schema.table(location).await.is_err());
}

#[cfg(not(target_os = "windows"))]
#[test]
fn test_substitute_tilde() {
Expand Down
10 changes: 6 additions & 4 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::{
exec,
Expand Down Expand Up @@ -173,11 +173,13 @@ async fn main_inner() -> Result<()> {

let runtime_env = create_runtime_env(rt_config.clone())?;

// enable dynamic file query
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
.enable_url_table();
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that knows how to open files
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
)));
Expand Down
6 changes: 6 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ async fn main() -> Result<()> {
.await?;
parquet_df.describe().await.unwrap().show().await?;

let dyn_ctx = ctx.enable_url_table();
let df = dyn_ctx
.sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap()))
.await?;
df.show().await?;

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,14 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// dynamic query by the file path
ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;

// print the results
df.show().await?;
Comment on lines +66 to +73
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @alamb. I added a simple s3 example here. I hope it is what you want or that it could inspire you for a new example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No actually this is perfect -- thank you @goldmedal


Ok(())
}
1 change: 1 addition & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
parking_lot = { workspace = true }

[lints]
workspace = true
Loading
Loading