Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the DynamicFileCatalog in datafusion-catalog #11035

Merged
merged 60 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
37b5526
early draft
goldmedal Jun 20, 2024
ad1a854
fmt
goldmedal Jun 20, 2024
97ea11c
add example for dynamic file query
goldmedal Jun 21, 2024
2729c49
add test and refactor
goldmedal Jun 21, 2024
6f86577
clippy and add doc
goldmedal Jun 21, 2024
c91cdc6
cargo fmt
goldmedal Jun 21, 2024
3306df6
extract substitute_tilde function
goldmedal Jun 21, 2024
d82b273
fix the error handling
goldmedal Jun 21, 2024
c0491d5
fmt and clippy
goldmedal Jun 21, 2024
a60eeea
fix test
goldmedal Jun 21, 2024
9fa01aa
fix sqllogictests
goldmedal Jun 22, 2024
2ab3639
ignore dirs for windows test
goldmedal Jun 22, 2024
a8ee733
enhance the test for every file format
goldmedal Jun 22, 2024
7faab9f
disable the test for windows
goldmedal Jun 22, 2024
e1f3908
make dynamic file query configurable
goldmedal Jul 1, 2024
cf73ba2
revert array_query.slt
goldmedal Jul 1, 2024
c641e6b
modified the test and add example
goldmedal Jul 1, 2024
0806263
make dirs be optional
goldmedal Jul 1, 2024
f4d24e6
enable dynamic file query in cli
goldmedal Jul 1, 2024
4b71e59
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Jul 1, 2024
9964150
cargo fmt
goldmedal Jul 1, 2024
da1e5d3
modified example
goldmedal Jul 1, 2024
ed670fe
fix test
goldmedal Jul 1, 2024
ea5816e
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 13, 2024
fb8b9e0
fix merge conflict
goldmedal Aug 13, 2024
fa73ae7
tmp
goldmedal Aug 14, 2024
04cc155
tmp
goldmedal Aug 14, 2024
1ede35e
tmp
goldmedal Aug 14, 2024
51b1d41
fix the catalog and schema
goldmedal Aug 15, 2024
75b0b84
move dynamic file catalog to datafusion-catalog
goldmedal Aug 15, 2024
3e8d094
add copyright
goldmedal Aug 15, 2024
4eb8ca5
fix tests
goldmedal Aug 16, 2024
9913405
rename catalog in cli and update lock
goldmedal Aug 16, 2024
5d861b8
enable home_dir feature
goldmedal Aug 16, 2024
ea1c075
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 16, 2024
16be2e7
update lock
goldmedal Aug 16, 2024
db90c28
fix compile
goldmedal Aug 16, 2024
9353123
fix clippy
goldmedal Aug 16, 2024
daa7ed8
fmt toml
goldmedal Aug 16, 2024
e4a2174
fix doc test and add more doc
goldmedal Aug 16, 2024
506d1d6
fix clippy
goldmedal Aug 16, 2024
72ce464
add home_dir feature doc
goldmedal Aug 16, 2024
fb1b6ce
rollback the unused changed
goldmedal Aug 16, 2024
8f0952d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 18, 2024
f062fec
update lock
goldmedal Aug 18, 2024
b1baa84
fix sqllogictest
goldmedal Aug 18, 2024
76d7fee
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 20, 2024
f0f070b
separate dynamic file test to another slt
goldmedal Aug 20, 2024
6b77b6b
add test for querying url table but disabled this feature
goldmedal Aug 20, 2024
4e51a77
add dynamic_file.slt
goldmedal Aug 20, 2024
fafc9dc
remove home_dir feature
goldmedal Aug 20, 2024
a3a4f4d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 21, 2024
7dc238f
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Sep 4, 2024
f7b4b8c
update cli lock
goldmedal Sep 4, 2024
25d0ff6
fix msrv check
goldmedal Sep 4, 2024
a78bd3c
fix msrv check
goldmedal Sep 4, 2024
edeff33
rollback the lock change
goldmedal Sep 4, 2024
b1a922c
address review comment and enhance the doc
goldmedal Sep 7, 2024
e5ab14d
remove the legacy comment
goldmedal Sep 7, 2024
87d7503
add missing doc
goldmedal Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions datafusion-examples/examples/dynamic_csv_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;
use datafusion::prelude::*;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion::test_util::arrow_test_data();
let path = &format!("file:///{testdata}/csv/aggregate_test_100.csv");
// execute the query
let df = ctx
.sql(
format!(
r#"SELECT column_1, MIN(column_12), MAX(column_12)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column is c1 actually. There're some issues about getting CSV header automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I found that this is the default behavior for ListTable. The dynamic query in datafusion-cli is the same as this. I think we don't need to change it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is controlled by the config option in the current session context. We can create the session context like

let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
let ctx = SessionContext::new_with_config(cfg);

to enable the header scanning.

FROM '{}'
WHERE column_11 > 0.1 AND column_11 < 0.9
GROUP BY column_1"#,
path
)
.as_str(),
)
.await?;

// print the results
df.show().await?;
Ok(())
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
dirs = "4.0.0"
goldmedal marked this conversation as resolved.
Show resolved Hide resolved
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
Expand Down
118 changes: 118 additions & 0 deletions datafusion/core/src/catalog/dynamic_file_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::any::Any;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::{Mutex, RwLock};

use datafusion_common::plan_datafusion_err;

use crate::catalog::schema::SchemaProvider;
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::datasource::TableProvider;
use crate::error::Result;
use crate::execution::context::SessionState;

/// Wraps another schema provider
pub struct DynamicFileSchemaProvider {
inner: Arc<dyn SchemaProvider>,
state_store: StateStore,
}

impl DynamicFileSchemaProvider {
pub fn new(inner: Arc<dyn SchemaProvider>) -> Self {
Self {
inner,
state_store: StateStore::new(),
}
}

pub fn with_state(&self, state: Weak<RwLock<SessionState>>) {
self.state_store.with_state(state);
}
}

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return Ok(inner_table);
}
let optimized_url = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_url.as_str())?;
let state = &self
.state_store
.get_state()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaProvider::table won't have SessionState by default. We should get it from StateStore.

.upgrade()
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let cfg = ListingTableConfig::new(table_url.clone())
.infer(state)
.await?;

Ok(Some(Arc::new(ListingTable::try_new(cfg)?)))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
}
fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
if cur.starts_with('~') && !usr_dir.is_empty() {
return cur.replacen('~', usr_dir, 1);
}
}
}
cur
}

pub struct StateStore {
state: Arc<Mutex<Option<Weak<RwLock<SessionState>>>>>,
}

impl StateStore {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(None)),
}
}

pub fn with_state(&self, state: Weak<RwLock<SessionState>>) {
let mut lock = self.state.lock();
*lock = Some(state);
}

pub fn get_state(&self) -> Weak<RwLock<SessionState>> {
self.state.lock().clone().unwrap()
}
}

impl Default for StateStore {
fn default() -> Self {
Self::new()
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Interfaces and default implementations of catalogs and schemas.

pub mod dynamic_file_schema;
pub mod information_schema;
pub mod listing_schema;
pub mod schema;
Expand Down
13 changes: 11 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;

use crate::catalog::dynamic_file_schema::DynamicFileSchemaProvider;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -305,10 +306,18 @@ impl SessionContext {

/// Creates a new `SessionContext` using the provided [`SessionState`]
pub fn new_with_state(state: SessionState) -> Self {
let state_ref = Arc::new(RwLock::new(state.clone()));
state
.schema_for_ref("datafusion.public.xx")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just a workaround to get DynamicFileSchemaProvider. I'll refactor it later.

.unwrap()
.as_any()
.downcast_ref::<DynamicFileSchemaProvider>()
.unwrap()
.with_state(Arc::downgrade(&state_ref));
Self {
session_id: state.session_id().to_string(),
session_id: state_ref.clone().read().session_id().to_string(),
session_start_time: Utc::now(),
state: Arc::new(RwLock::new(state)),
state: state_ref,
}
}

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`SessionState`]: information required to run queries in a session

use crate::catalog::dynamic_file_schema::DynamicFileSchemaProvider;
use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
Expand Down Expand Up @@ -196,11 +197,13 @@ impl SessionState {

if config.create_default_catalog_and_schema() {
let default_catalog = MemoryCatalogProvider::new();
let schema =
DynamicFileSchemaProvider::new(Arc::new(MemorySchemaProvider::new()));

default_catalog
.register_schema(
&config.options().catalog.default_schema,
Arc::new(MemorySchemaProvider::new()),
Arc::new(schema),
)
.expect("memory catalog provider can register schema");

Expand Down
Loading