Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: move SessionStateDefaults into its own module #11566

Merged
merged 2 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ mod tests {
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
use crate::datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use parquet::arrow::ArrowWriter;
use tempfile::TempDir;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

pub mod context;
pub mod session_state;
mod session_state_defaults;

pub use session_state_defaults::SessionStateDefaults;

// backwards compatibility
pub use crate::datasource::file_format::options;
Expand Down
185 changes: 4 additions & 181 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,17 @@
//! [`SessionState`]: information required to run queries in a session

use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
use crate::catalog::{
CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
MemoryCatalogProviderList,
};
use crate::catalog::schema::SchemaProvider;
use crate::catalog::{CatalogProviderList, MemoryCatalogProviderList};
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::arrow::ArrowFormatFactory;
use crate::datasource::file_format::avro::AvroFormatFactory;
use crate::datasource::file_format::csv::CsvFormatFactory;
use crate::datasource::file_format::json::JsonFormatFactory;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormatFactory;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
#[cfg(feature = "array_expressions")]
use crate::functions_array;
use crate::execution::SessionStateDefaults;
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use crate::{functions, functions_aggregate};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand All @@ -54,7 +42,6 @@ use datafusion_common::{
ResolvedTableReference, TableReference,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -85,7 +72,6 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;

/// Execution context for registering data sources and executing queries.
Expand Down Expand Up @@ -1420,169 +1406,6 @@ impl From<SessionState> for SessionStateBuilder {
}
}

/// Defaults that are used as part of creating a SessionState such as table providers,
/// file formats, registering of builtin functions, etc.
pub struct SessionStateDefaults {}

impl SessionStateDefaults {
/// returns a map of the default [`TableProviderFactory`]s
pub fn default_table_factories() -> HashMap<String, Arc<dyn TableProviderFactory>> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
#[cfg(feature = "parquet")]
table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));

table_factories
}

/// returns the default MemoryCatalogProvider
pub fn default_catalog(
config: &SessionConfig,
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
runtime: &Arc<RuntimeEnv>,
) -> MemoryCatalogProvider {
let default_catalog = MemoryCatalogProvider::new();

default_catalog
.register_schema(
&config.options().catalog.default_schema,
Arc::new(MemorySchemaProvider::new()),
)
.expect("memory catalog provider can register schema");

Self::register_default_schema(config, table_factories, runtime, &default_catalog);

default_catalog
}

/// returns the list of default [`ExprPlanner`]s
pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::ArrayFunctionPlanner),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(any(
feature = "datetime_expressions",
feature = "unicode_expressions"
))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
];

expr_planners
}

/// returns the list of default [`ScalarUDF']'s
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();
#[cfg(feature = "array_expressions")]
functions.append(&mut functions_array::all_default_array_functions());

functions
}

/// returns the list of default [`AggregateUDF']'s
pub fn default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
functions_aggregate::all_default_aggregate_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
#[cfg(feature = "parquet")]
Arc::new(ParquetFormatFactory::new()),
Arc::new(JsonFormatFactory::new()),
Arc::new(CsvFormatFactory::new()),
Arc::new(ArrowFormatFactory::new()),
Arc::new(AvroFormatFactory::new()),
];

file_formats
}

/// registers all builtin functions - scalar, array and aggregate
pub fn register_builtin_functions(state: &mut SessionState) {
Self::register_scalar_functions(state);
Self::register_array_functions(state);
Self::register_aggregate_functions(state);
}

/// registers all the builtin scalar functions
pub fn register_scalar_functions(state: &mut SessionState) {
functions::register_all(state).expect("can not register built in functions");
}

/// registers all the builtin array functions
pub fn register_array_functions(state: &mut SessionState) {
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
functions_array::register_all(state).expect("can not register array expressions");
}

/// registers all the builtin aggregate functions
pub fn register_aggregate_functions(state: &mut SessionState) {
functions_aggregate::register_all(state)
.expect("can not register aggregate functions");
}

/// registers the default schema
pub fn register_default_schema(
config: &SessionConfig,
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
runtime: &Arc<RuntimeEnv>,
default_catalog: &MemoryCatalogProvider,
) {
let url = config.options().catalog.location.as_ref();
let format = config.options().catalog.format.as_ref();
let (url, format) = match (url, format) {
(Some(url), Some(format)) => (url, format),
_ => return,
};
let url = url.to_string();
let format = format.to_string();

let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
None => format!("{}://", url.scheme()),
};
let path = &url.as_str()[authority.len()..];
let path = object_store::path::Path::parse(path).expect("Can't parse path");
let store = ObjectStoreUrl::parse(authority.as_str())
.expect("Invalid default catalog url");
let store = match runtime.object_store(store) {
Ok(store) => store,
_ => return,
};
let factory = match table_factories.get(format.as_str()) {
Some(factory) => factory,
_ => return,
};
let schema =
ListingSchemaProvider::new(authority, path, factory.clone(), store, format);
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
}

/// registers the default [`FileFormatFactory`]s
pub fn register_default_file_formats(state: &mut SessionState) {
let formats = SessionStateDefaults::default_file_formats();
for format in formats {
if let Err(e) = state.register_file_format(format, false) {
log::info!("Unable to register default file format: {e}")
};
}
}
}

/// Adapter that implements the [`ContextProvider`] trait for a [`SessionState`]
///
/// This is used so the SQL planner can access the state of the session without
Expand Down
Loading