diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index a3ff7bee529e..33b490b5a636 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -20,7 +20,7 @@ use datafusion::common::Result; use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; use datafusion::physical_plan::collect; -use datafusion::prelude::{col, SessionConfig, SessionContext}; +use datafusion::prelude::{col, SessionContext}; use parquet::file::properties::WriterProperties; use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use std::path::PathBuf; @@ -69,9 +69,6 @@ async fn main() -> Result<()> { let opt: Opt = Opt::from_args(); println!("Running benchmarks with the following options: {:?}", opt); - let config = SessionConfig::new().with_target_partitions(opt.partitions); - let mut ctx = SessionContext::with_config(config); - let path = opt.path.join("logs.parquet"); let mut props_builder = WriterProperties::builder(); @@ -88,17 +85,12 @@ async fn main() -> Result<()> { let test_file = gen_data(path, opt.scale_factor, props_builder.build())?; - run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?; + run_benchmarks(opt, &test_file).await?; Ok(()) } -async fn run_benchmarks( - ctx: &mut SessionContext, - test_file: &TestParquetFile, - iterations: usize, - debug: bool, -) -> Result<()> { +async fn run_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> { let scan_options_matrix = vec![ ParquetScanOptions { pushdown_filters: false, @@ -148,11 +140,14 @@ async fn run_benchmarks( println!("Executing with filter '{}'", filter_expr); for scan_options in &scan_options_matrix { println!("Using scan options {:?}", scan_options); - for i in 0..iterations { + for i in 0..opt.iterations { let start = Instant::now(); + + let config = scan_options.config().with_target_partitions(opt.partitions); + let ctx = SessionContext::with_config(config); + let rows = - exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug) - .await?; + exec_scan(&ctx, test_file, filter_expr.clone(), opt.debug).await?; println!( "Iteration {} returned {} rows in {} ms", i, @@ -170,10 +165,9 @@ async fn exec_scan( ctx: &SessionContext, test_file: &TestParquetFile, filter: Expr, - scan_options: ParquetScanOptions, debug: bool, ) -> Result { - let exec = test_file.create_scan(filter, scan_options).await?; + let exec = test_file.create_scan(filter).await?; let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index e88e403f62ee..8a78c357b951 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -395,8 +395,7 @@ async fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - let format = ParquetFormat::new(state.config_options()) - .with_enable_pruning(Some(true)); + let format = ParquetFormat::default().with_enable_pruning(Some(true)); (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 66dcd4583ed2..26507661cb9e 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::pin::Pin; use std::sync::Arc; use arrow_flight::SchemaAsIpc; use datafusion::arrow::error::ArrowError; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; -use futures::Stream; +use futures::stream::BoxStream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -39,27 +38,13 @@ pub struct FlightServiceImpl {} #[tonic::async_trait] impl FlightService for FlightServiceImpl { - type HandshakeStream = Pin< - Box> + Send + Sync + 'static>, - >; - type ListFlightsStream = - Pin> + Send + Sync + 'static>>; - type DoGetStream = - Pin> + Send + Sync + 'static>>; - type DoPutStream = - Pin> + Send + Sync + 'static>>; - type DoActionStream = Pin< - Box< - dyn Stream> - + Send - + Sync - + 'static, - >, - >; - type ListActionsStream = - Pin> + Send + Sync + 'static>>; - type DoExchangeStream = - Pin> + Send + Sync + 'static>>; + type HandshakeStream = BoxStream<'static, Result>; + type ListFlightsStream = BoxStream<'static, Result>; + type DoGetStream = BoxStream<'static, Result>; + type DoPutStream = BoxStream<'static, Result>; + type DoActionStream = BoxStream<'static, Result>; + type ListActionsStream = BoxStream<'static, Result>; + type DoExchangeStream = BoxStream<'static, Result>; async fn get_schema( &self, @@ -67,9 +52,7 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let config = SessionConfig::new(); - let listing_options = - ListingOptions::new(Arc::new(ParquetFormat::new(config.config_options()))); + let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; @@ -79,10 +62,10 @@ impl FlightService for FlightServiceImpl { .await .unwrap(); - let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema_result = SchemaAsIpc::new(&schema, &options) .try_into() - .map_err(|e: ArrowError| tonic::Status::internal(e.to_string()))?; + .map_err(|e: ArrowError| Status::internal(e.to_string()))?; Ok(Response::new(schema_result)) } diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 7e2038c8a3e6..241449dc47b2 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -32,8 +32,7 @@ async fn main() -> Result<()> { let testdata = datafusion::test_util::parquet_test_data(); // Configure listing options - let file_format = - ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true)); + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(FileType::PARQUET.get_ext()); diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index f7ef6b93dea9..b14cc2f8f37d 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -29,13 +29,13 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use parking_lot::RwLock; use datafusion_common::Result; use crate::config::ConfigOptions; use crate::datasource::streaming::{PartitionStream, StreamingTable}; use crate::datasource::TableProvider; +use crate::execution::context::TaskContext; use crate::logical_expr::TableType; use crate::physical_plan::stream::RecordBatchStreamAdapter; use crate::physical_plan::SendableRecordBatchStream; @@ -55,7 +55,6 @@ const DF_SETTINGS: &str = "df_settings"; /// schema that can introspect on tables in the catalog_list pub(crate) struct CatalogWithInformationSchema { catalog_list: Weak, - config_options: Weak>, /// wrapped provider inner: Arc, } @@ -63,12 +62,10 @@ pub(crate) struct CatalogWithInformationSchema { impl CatalogWithInformationSchema { pub(crate) fn new( catalog_list: Weak, - config_options: Weak>, inner: Arc, ) -> Self { Self { catalog_list, - config_options, inner, } } @@ -89,15 +86,10 @@ impl CatalogProvider for CatalogWithInformationSchema { fn schema(&self, name: &str) -> Option> { if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) { - Weak::upgrade(&self.catalog_list).and_then(|catalog_list| { - Weak::upgrade(&self.config_options).map(|config_options| { - Arc::new(InformationSchemaProvider { - config: InformationSchemaConfig { - catalog_list, - config_options, - }, - }) as Arc - }) + Weak::upgrade(&self.catalog_list).map(|catalog_list| { + Arc::new(InformationSchemaProvider { + config: InformationSchemaConfig { catalog_list }, + }) as Arc }) } else { self.inner.schema(name) @@ -127,7 +119,6 @@ struct InformationSchemaProvider { #[derive(Clone)] struct InformationSchemaConfig { catalog_list: Arc, - config_options: Arc>, } impl InformationSchemaConfig { @@ -220,8 +211,12 @@ impl InformationSchemaConfig { } /// Construct the `information_schema.df_settings` virtual table - fn make_df_settings(&self, builder: &mut InformationSchemaDfSettingsBuilder) { - for (name, setting) in self.config_options.read().options() { + fn make_df_settings( + &self, + config_options: &ConfigOptions, + builder: &mut InformationSchemaDfSettingsBuilder, + ) { + for (name, setting) in config_options.options() { builder.add_setting(name, setting.to_string()); } } @@ -298,7 +293,7 @@ impl PartitionStream for InformationSchemaTables { &self.schema } - fn execute(&self) -> SendableRecordBatchStream { + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let mut builder = self.builder(); let config = self.config.clone(); Box::pin(RecordBatchStreamAdapter::new( @@ -389,7 +384,7 @@ impl PartitionStream for InformationSchemaViews { &self.schema } - fn execute(&self) -> SendableRecordBatchStream { + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let mut builder = self.builder(); let config = self.config.clone(); Box::pin(RecordBatchStreamAdapter::new( @@ -503,7 +498,7 @@ impl PartitionStream for InformationSchemaColumns { &self.schema } - fn execute(&self) -> SendableRecordBatchStream { + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let mut builder = self.builder(); let config = self.config.clone(); Box::pin(RecordBatchStreamAdapter::new( @@ -690,15 +685,18 @@ impl PartitionStream for InformationSchemaDfSettings { &self.schema } - fn execute(&self) -> SendableRecordBatchStream { - let mut builder = self.builder(); + fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { let config = self.config.clone(); + let mut builder = self.builder(); Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), // TODO: Stream this futures::stream::once(async move { // create a mem table with the names of tables - config.make_df_settings(&mut builder); + config.make_df_settings( + ctx.session_config().config_options(), + &mut builder, + ); Ok(builder.finish()) }), )) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index 1c98c83ca7ba..4dabb53d2425 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -21,11 +21,9 @@ use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use itertools::Itertools; use log::warn; -use parking_lot::RwLock; use std::collections::{BTreeMap, HashMap}; use std::env; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; /*-************************************ * Catalog related @@ -484,11 +482,6 @@ impl ConfigOptions { Self { options } } - /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc - pub fn into_shareable(self) -> Arc> { - Arc::new(RwLock::new(self)) - } - /// Create new ConfigOptions struct, taking values from /// environment variables where possible. /// diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index ee9795a77563..fe67fc0372fc 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -128,7 +128,6 @@ pub(crate) mod test_util { projection, limit, table_partition_cols: vec![], - config_options: state.config_options(), output_ordering: None, }, &[], diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 15a90ec41d00..9f238a43cab8 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -28,7 +28,6 @@ use datafusion_common::DataFusionError; use datafusion_optimizer::utils::conjunction; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; -use parking_lot::RwLock; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; @@ -56,25 +55,26 @@ use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; /// The Apache Parquet `FileFormat` implementation -#[derive(Debug)] +/// +/// Note it is recommended these are instead configured on the [`ConfigOptions`] +/// associated with the [`SessionState`] instead of overridden on a format-basis +/// +/// TODO: Deprecate and remove overrides +/// +#[derive(Debug, Default)] pub struct ParquetFormat { - // Session level configuration - config_options: Arc>, - // local overides + /// Override the global setting for enable_pruning enable_pruning: Option, + /// Override the global setting for metadata_size_hint metadata_size_hint: Option, + /// Override the global setting for skip_metadata skip_metadata: Option, } impl ParquetFormat { - /// construct a new Format with the specified `ConfigOptions` - pub fn new(config_options: Arc>) -> Self { - Self { - config_options, - enable_pruning: None, - metadata_size_hint: None, - skip_metadata: None, - } + /// construct a new Format with no local overrides + pub fn new() -> Self { + Self::default() } /// Activate statistics based row group level pruning @@ -85,13 +85,9 @@ impl ParquetFormat { } /// Return true if pruning is enabled - pub fn enable_pruning(&self) -> bool { + pub fn enable_pruning(&self, config_options: &ConfigOptions) -> bool { self.enable_pruning - .or_else(|| { - self.config_options - .read() - .get_bool(OPT_PARQUET_ENABLE_PRUNING) - }) + .or_else(|| config_options.get_bool(OPT_PARQUET_ENABLE_PRUNING)) .unwrap_or(true) } @@ -107,12 +103,9 @@ impl ParquetFormat { } /// Return the metadata size hint if set - pub fn metadata_size_hint(&self) -> Option { - self.metadata_size_hint.or_else(|| { - self.config_options - .read() - .get_usize(OPT_PARQUET_METADATA_SIZE_HINT) - }) + pub fn metadata_size_hint(&self, config_options: &ConfigOptions) -> Option { + self.metadata_size_hint + .or_else(|| config_options.get_usize(OPT_PARQUET_METADATA_SIZE_HINT)) } /// Tell the parquet reader to skip any metadata that may be in @@ -127,13 +120,9 @@ impl ParquetFormat { /// returns true if schema metadata will be cleared prior to /// schema merging. - pub fn skip_metadata(&self) -> bool { + pub fn skip_metadata(&self, config_options: &ConfigOptions) -> bool { self.skip_metadata - .or_else(|| { - self.config_options - .read() - .get_bool(OPT_PARQUET_SKIP_METADATA) - }) + .or_else(|| config_options.get_bool(OPT_PARQUET_SKIP_METADATA)) .unwrap_or(true) } } @@ -163,7 +152,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, - _state: &SessionState, + state: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -174,7 +163,7 @@ impl FileFormat for ParquetFormat { schemas.push(schema) } - let schema = if self.skip_metadata() { + let schema = if self.skip_metadata(state.config_options()) { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -202,14 +191,14 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &SessionState, + state: &SessionState, conf: FileScanConfig, filters: &[Expr], ) -> Result> { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = if self.enable_pruning() { + let predicate = if self.enable_pruning(state.config_options()) { conjunction(filters.to_vec()) } else { None @@ -218,7 +207,7 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(ParquetExec::new( conf, predicate, - self.metadata_size_hint(), + self.metadata_size_hint(state.config_options()), ))) } } @@ -655,7 +644,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = ParquetFormat::new(ctx.config_options()); + let format = ParquetFormat::default(); let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = @@ -804,8 +793,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); - let format = - ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(Some(9)); + let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -835,8 +823,7 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::new(ctx.config_options()) - .with_metadata_size_hint(Some(size_hint)); + let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint)); let schema = format .infer_schema(&ctx, &store.upcast(), &meta) .await @@ -1272,7 +1259,7 @@ mod tests { limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::new(state.config_options()); + let format = ParquetFormat::default(); scan_format(state, &format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index af3f9a8b1190..26ee98ea4ae8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -29,9 +29,7 @@ use datafusion_physical_expr::PhysicalSortExpr; use futures::{future, stream, StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectMeta; -use parking_lot::RwLock; -use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ @@ -110,10 +108,7 @@ impl ListingTableConfig { } } - fn infer_format( - config_options: Arc>, - path: &str, - ) -> Result<(Arc, String)> { + fn infer_format(path: &str) -> Result<(Arc, String)> { let err_msg = format!("Unable to infer file type from path: {}", path); let mut exts = path.rsplit('.'); @@ -142,7 +137,7 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)), + FileType::PARQUET => Arc::new(ParquetFormat::default()), }; Ok((file_format, ext)) @@ -163,10 +158,8 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = ListingTableConfig::infer_format( - state.config_options(), - file.location.as_ref(), - )?; + let (format, file_extension) = + ListingTableConfig::infer_format(file.location.as_ref())?; let listing_options = ListingOptions::new(format) .with_file_extension(file_extension) @@ -263,9 +256,8 @@ impl ListingOptions { /// # use datafusion::prelude::SessionContext; /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let ctx = SessionContext::new(); /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::new(ctx.config_options()) + /// ParquetFormat::default() /// )) /// .with_file_extension(".parquet"); /// @@ -281,12 +273,11 @@ impl ListingOptions { /// ``` /// # use std::sync::Arc; /// # use arrow::datatypes::DataType; - /// # use datafusion::prelude::{col, SessionContext}; + /// # use datafusion::prelude::col; /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let ctx = SessionContext::new(); /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::new(ctx.config_options()) + /// ParquetFormat::default() /// )) /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), /// ("col_b".to_string(), DataType::Utf8)]); @@ -306,12 +297,10 @@ impl ListingOptions { /// /// ``` /// # use std::sync::Arc; - /// # use datafusion::prelude::SessionContext; /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let ctx = SessionContext::new(); /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::new(ctx.config_options()) + /// ParquetFormat::default() /// )) /// .with_collect_stat(true); /// @@ -326,12 +315,10 @@ impl ListingOptions { /// /// ``` /// # use std::sync::Arc; - /// # use datafusion::prelude::SessionContext; /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// - /// let ctx = SessionContext::new(); /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::new(ctx.config_options()) + /// ParquetFormat::default() /// )) /// .with_target_partitions(8); /// @@ -346,7 +333,7 @@ impl ListingOptions { /// /// ``` /// # use std::sync::Arc; - /// # use datafusion::prelude::{col, SessionContext}; + /// # use datafusion::prelude::col; /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// /// // Tell datafusion that the files are sorted by column "a" @@ -354,9 +341,8 @@ impl ListingOptions { /// col("a").sort(true, true) /// ]); /// - /// let ctx = SessionContext::new(); /// let listing_options = ListingOptions::new(Arc::new( - /// ParquetFormat::new(ctx.config_options()) + /// ParquetFormat::default() /// )) /// .with_file_sort_order(file_sort_order.clone()); /// @@ -592,7 +578,6 @@ impl TableProvider for ListingTable { limit, output_ordering: self.try_create_output_ordering()?, table_partition_cols, - config_options: state.config_options(), }, filters, ) @@ -736,7 +721,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) @@ -759,7 +744,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))) + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) .with_collect_stat(false); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) @@ -782,8 +767,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let options = - ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); + let options = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = options.infer_schema(&state, &table_path).await.unwrap(); use physical_plan::expressions::col as physical_col; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index b22bc0596922..b48a1058aeb6 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -74,7 +74,7 @@ impl TableProviderFactory for ListingTableFactory { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::new(state.config_options())), + FileType::PARQUET => Arc::new(ParquetFormat::default()), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs index 30847346d0a3..b3dc60c8c8b6 100644 --- a/datafusion/core/src/datasource/streaming.rs +++ b/datafusion/core/src/datasource/streaming.rs @@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Expr, TableType}; use crate::datasource::TableProvider; -use crate::execution::context::SessionState; +use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::streaming::StreamingTableExec; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -37,7 +37,7 @@ pub trait PartitionStream: Send + Sync { fn schema(&self) -> &SchemaRef; /// Returns a stream yielding this partitions values - fn execute(&self) -> SendableRecordBatchStream; + fn execute(&self, ctx: Arc) -> SendableRecordBatchStream; } /// A [`TableProvider`] that streams a set of [`PartitionStream`] diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 0edb47f1a18c..1f61039cd9ea 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -235,11 +235,6 @@ impl SessionContext { self.state.read().runtime_env.clone() } - /// Return a handle to the shared configuration options - pub fn config_options(&self) -> Arc> { - self.state.read().config_options() - } - /// Return the session_id of this Session pub fn session_id(&self) -> String { self.session_id.clone() @@ -379,16 +374,15 @@ impl SessionContext { LogicalPlan::SetVariable(SetVariable { variable, value, .. }) => { - let state = self.state.write(); - let config_options = &state.config.config_options; - - let old_value = - config_options.read().get(&variable).ok_or_else(|| { - DataFusionError::Execution(format!( - "Can not SET variable: Unknown Variable {}", - variable - )) - })?; + let mut state = self.state.write(); + let config_options = &mut state.config.config_options; + + let old_value = config_options.get(&variable).ok_or_else(|| { + DataFusionError::Execution(format!( + "Can not SET variable: Unknown Variable {}", + variable + )) + })?; match old_value { ScalarValue::Boolean(_) => { @@ -398,7 +392,7 @@ impl SessionContext { value, )) })?; - config_options.write().set_bool(&variable, new_value); + config_options.set_bool(&variable, new_value); } ScalarValue::UInt64(_) => { @@ -408,7 +402,7 @@ impl SessionContext { value, )) })?; - config_options.write().set_u64(&variable, new_value); + config_options.set_u64(&variable, new_value); } ScalarValue::Utf8(_) => { @@ -418,7 +412,7 @@ impl SessionContext { value, )) })?; - config_options.write().set_string(&variable, new_value); + config_options.set_string(&variable, new_value); } _ => { @@ -883,7 +877,6 @@ impl SessionContext { let catalog = if information_schema { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&state.catalog_list), - Arc::downgrade(&state.config.config_options), catalog, )) } else { @@ -1173,7 +1166,7 @@ pub struct SessionConfig { /// due to `resolve_table_ref` which passes back references) default_schema: String, /// Configuration options - pub config_options: Arc>, + config_options: ConfigOptions, /// Opaque extensions. extensions: AnyMap, } @@ -1183,7 +1176,7 @@ impl Default for SessionConfig { Self { default_catalog: DEFAULT_CATALOG.to_owned(), default_schema: DEFAULT_SCHEMA.to_owned(), - config_options: Arc::new(RwLock::new(ConfigOptions::new())), + config_options: ConfigOptions::new(), // Assume no extensions by default. extensions: HashMap::with_capacity_and_hasher( 0, @@ -1202,14 +1195,14 @@ impl SessionConfig { /// Create an execution config with config options read from the environment pub fn from_env() -> Self { Self { - config_options: ConfigOptions::from_env().into_shareable(), + config_options: ConfigOptions::from_env(), ..Default::default() } } /// Set a configuration option - pub fn set(self, key: &str, value: ScalarValue) -> Self { - self.config_options.write().set(key, value); + pub fn set(mut self, key: &str, value: ScalarValue) -> Self { + self.config_options.set(key, value); self } @@ -1251,7 +1244,6 @@ impl SessionConfig { /// get target_partitions pub fn target_partitions(&self) -> usize { self.config_options - .read() .get_usize(OPT_TARGET_PARTITIONS) .expect("target partitions must be set") } @@ -1259,7 +1251,6 @@ impl SessionConfig { /// Is the information schema enabled? pub fn information_schema(&self) -> bool { self.config_options - .read() .get_bool(OPT_INFORMATION_SCHEMA) .unwrap_or_default() } @@ -1267,7 +1258,6 @@ impl SessionConfig { /// Should the context create the default catalog and schema? pub fn create_default_catalog_and_schema(&self) -> bool { self.config_options - .read() .get_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA) .unwrap_or_default() } @@ -1275,7 +1265,6 @@ impl SessionConfig { /// Are joins repartitioned during execution? pub fn repartition_joins(&self) -> bool { self.config_options - .read() .get_bool(OPT_REPARTITION_JOINS) .unwrap_or_default() } @@ -1283,7 +1272,6 @@ impl SessionConfig { /// Are aggregates repartitioned during execution? pub fn repartition_aggregations(&self) -> bool { self.config_options - .read() .get_bool(OPT_REPARTITION_AGGREGATIONS) .unwrap_or_default() } @@ -1291,7 +1279,6 @@ impl SessionConfig { /// Are window functions repartitioned during execution? pub fn repartition_window_functions(&self) -> bool { self.config_options - .read() .get_bool(OPT_REPARTITION_WINDOWS) .unwrap_or_default() } @@ -1299,7 +1286,6 @@ impl SessionConfig { /// Are statistics collected during execution? pub fn collect_statistics(&self) -> bool { self.config_options - .read() .get_bool(OPT_COLLECT_STATISTICS) .unwrap_or_default() } @@ -1316,49 +1302,42 @@ impl SessionConfig { } /// Controls whether the default catalog and schema will be automatically created - pub fn with_create_default_catalog_and_schema(self, create: bool) -> Self { + pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self { self.config_options - .write() .set_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, create); self } /// Enables or disables the inclusion of `information_schema` virtual tables - pub fn with_information_schema(self, enabled: bool) -> Self { + pub fn with_information_schema(mut self, enabled: bool) -> Self { self.config_options - .write() .set_bool(OPT_INFORMATION_SCHEMA, enabled); self } /// Enables or disables the use of repartitioning for joins to improve parallelism - pub fn with_repartition_joins(self, enabled: bool) -> Self { - self.config_options - .write() - .set_bool(OPT_REPARTITION_JOINS, enabled); + pub fn with_repartition_joins(mut self, enabled: bool) -> Self { + self.config_options.set_bool(OPT_REPARTITION_JOINS, enabled); self } /// Enables or disables the use of repartitioning for aggregations to improve parallelism - pub fn with_repartition_aggregations(self, enabled: bool) -> Self { + pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { self.config_options - .write() .set_bool(OPT_REPARTITION_AGGREGATIONS, enabled); self } /// Enables or disables the use of repartitioning for window functions to improve parallelism - pub fn with_repartition_windows(self, enabled: bool) -> Self { + pub fn with_repartition_windows(mut self, enabled: bool) -> Self { self.config_options - .write() .set_bool(OPT_REPARTITION_WINDOWS, enabled); self } /// Enables or disables the use of pruning predicate for parquet readers to skip row groups - pub fn with_parquet_pruning(self, enabled: bool) -> Self { + pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { self.config_options - .write() .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled); self } @@ -1366,15 +1345,13 @@ impl SessionConfig { /// Returns true if pruning predicate should be used to skip parquet row groups pub fn parquet_pruning(&self) -> bool { self.config_options - .read() .get_bool(OPT_PARQUET_ENABLE_PRUNING) .unwrap_or(false) } /// Enables or disables the collection of statistics after listing files - pub fn with_collect_statistics(self, enabled: bool) -> Self { + pub fn with_collect_statistics(mut self, enabled: bool) -> Self { self.config_options - .write() .set_bool(OPT_COLLECT_STATISTICS, enabled); self } @@ -1382,7 +1359,6 @@ impl SessionConfig { /// Get the currently configured batch size pub fn batch_size(&self) -> usize { self.config_options - .read() .get_u64(OPT_BATCH_SIZE) .unwrap_or_default() .try_into() @@ -1399,7 +1375,7 @@ impl SessionConfig { pub fn to_props(&self) -> HashMap { let mut map = HashMap::new(); // copy configs from config_options - for (k, v) in self.config_options.read().options() { + for (k, v) in self.config_options.options() { map.insert(k.to_string(), format!("{}", v)); } map.insert( @@ -1430,11 +1406,18 @@ impl SessionConfig { map } - /// Return a handle to the shared configuration options. + /// Return a handle to the configuration options. + /// + /// [`config_options`]: SessionContext::config_option + pub fn config_options(&self) -> &ConfigOptions { + &self.config_options + } + + /// Return a mutable handle to the configuration options. /// /// [`config_options`]: SessionContext::config_option - pub fn config_options(&self) -> Arc> { - self.config_options.clone() + pub fn config_options_mut(&mut self) -> &mut ConfigOptions { + &mut self.config_options } /// Add extensions. @@ -1567,7 +1550,6 @@ impl SessionState { { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&catalog_list), - Arc::downgrade(&config.config_options), Arc::new(default_catalog), )) } else { @@ -1584,14 +1566,12 @@ impl SessionState { physical_optimizers.push(Arc::new(BasicEnforcement::new())); if config .config_options - .read() .get_bool(OPT_COALESCE_BATCHES) .unwrap_or_default() { physical_optimizers.push(Arc::new(CoalesceBatches::new( config .config_options - .read() .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE) .unwrap_or_default() .try_into() @@ -1622,11 +1602,8 @@ impl SessionState { runtime: &Arc, default_catalog: &MemoryCatalogProvider, ) { - let url = config - .config_options - .read() - .get("datafusion.catalog.location"); - let format = config.config_options.read().get("datafusion.catalog.type"); + let url = config.config_options.get("datafusion.catalog.location"); + let format = config.config_options.get("datafusion.catalog.type"); let (url, format) = match (url, format) { (Some(url), Some(format)) => (url, format), _ => return, @@ -1637,10 +1614,7 @@ impl SessionState { let url = url.to_string(); let format = format.to_string(); - let has_header = config - .config_options - .read() - .get("datafusion.catalog.has_header"); + let has_header = config.config_options.get("datafusion.catalog.has_header"); let has_header: bool = has_header .map(|x| FromStr::from_str(&x.to_string()).unwrap_or_default()) .unwrap_or_default(); @@ -1755,7 +1729,7 @@ impl SessionState { pub fn optimize(&self, plan: &LogicalPlan) -> Result { // TODO: Implement OptimizerContext directly on DataFrame (#4631) (#4626) let config = { - let config_options = self.config.config_options.read(); + let config_options = self.config_options(); OptimizerContext::new() .with_skip_failing_rules( config_options @@ -1814,7 +1788,7 @@ impl SessionState { } /// return the configuration options - pub fn config_options(&self) -> Arc> { + pub fn config_options(&self) -> &ConfigOptions { self.config.config_options() } @@ -1867,7 +1841,7 @@ impl ContextProvider for SessionState { } fn get_config_option(&self, variable: &str) -> Option { - self.config.config_options.read().get(variable) + self.config_options().get(variable) } } diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 21a82227cef0..229eaa313501 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -217,7 +217,7 @@ impl<'a> ParquetReadOptions<'a> { /// Helper to convert these user facing options to `ListingTable` options pub fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions { - let file_format = ParquetFormat::new(config.config_options()) + let file_format = ParquetFormat::new() .with_enable_pruning(self.parquet_pruning) .with_skip_metadata(self.skip_metadata); diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 3da9d24773a7..06832ac2498a 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -76,7 +76,6 @@ impl PhysicalOptimizerRule for BasicEnforcement { let target_partitions = config.target_partitions(); let top_down_join_key_reordering = config .config_options() - .read() .get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING) .unwrap_or_default(); let new_plan = if top_down_join_key_reordering { @@ -1071,7 +1070,6 @@ mod tests { use std::ops::Deref; use super::*; - use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::aggregates::{ @@ -1111,7 +1109,6 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering, }, None, diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 428cd8de3591..63e7937fe531 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -215,7 +215,6 @@ impl PhysicalOptimizerRule for JoinSelection { ) -> Result> { let collect_left_threshold: usize = session_config .config_options() - .read() .get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD) .unwrap_or_default() .try_into() diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 42c5e0c3f743..7bdff91ecd7b 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -244,7 +244,6 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; - use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::aggregates::{ @@ -274,7 +273,6 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, None, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 932723132c1c..ab522dc94a53 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -201,7 +201,6 @@ mod private { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::config::ConfigOptions; use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; @@ -259,7 +258,6 @@ mod tests { projection: Some(vec![0, 1, 2]), limit: None, table_partition_cols: vec![], - config_options: state.config_options(), output_ordering: None, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -331,7 +329,6 @@ mod tests { projection, limit: None, table_partition_cols: vec![], - config_options: state.config_options(), output_ordering: None, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -406,7 +403,6 @@ mod tests { "date".to_owned(), partition_type_wrap(DataType::Utf8), )], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 91877a1d0390..0481ca64da10 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -309,7 +309,6 @@ mod tests { use futures::StreamExt; use super::*; - use crate::config::ConfigOptions; use crate::datasource::object_store::ObjectStoreUrl; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::prelude::SessionContext; @@ -351,7 +350,6 @@ mod tests { projection: None, limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }; diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 882d244e6595..a4be015c51a0 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -249,7 +249,6 @@ mod tests { use object_store::local::LocalFileSystem; use crate::assert_batches_eq; - use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::FileType; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; @@ -382,7 +381,6 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, file_compression_type.to_owned(), @@ -457,7 +455,6 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, file_compression_type.to_owned(), @@ -502,7 +499,6 @@ mod tests { projection: Some(vec![0, 2]), limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, file_compression_type.to_owned(), diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 7eb9730c92ac..5e78d5da8bf3 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -42,10 +42,11 @@ use datafusion_physical_expr::PhysicalSortExpr; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use parking_lot::RwLock; -use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; -use crate::{config::ConfigOptions, datasource::listing::FileRange}; +use crate::datasource::{ + listing::{FileRange, PartitionedFile}, + object_store::ObjectStoreUrl, +}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -102,8 +103,6 @@ pub struct FileScanConfig { pub table_partition_cols: Vec<(String, DataType)>, /// The order in which the data is sorted, if known. pub output_ordering: Option>, - /// Configuration options passed to the physical plans - pub config_options: Arc>, } impl FileScanConfig { @@ -808,7 +807,6 @@ mod tests { projection, statistics, table_partition_cols, - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index e23452c3603a..fcd4e6a87d2b 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -25,9 +25,9 @@ use std::fs; use std::ops::Range; use std::sync::Arc; -use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX; use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS; use crate::config::OPT_PARQUET_REORDER_FILTERS; +use crate::config::{ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX}; use crate::datasource::file_format::parquet::fetch_parquet_metadata; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, @@ -191,14 +191,9 @@ impl ParquetExec { } /// Return the value described in [`Self::with_pushdown_filters`] - pub fn pushdown_filters(&self) -> bool { + fn pushdown_filters(&self, config_options: &ConfigOptions) -> bool { self.pushdown_filters - .or_else(|| { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS) - }) + .or_else(|| config_options.get_bool(OPT_PARQUET_PUSHDOWN_FILTERS)) // default to false .unwrap_or_default() } @@ -213,14 +208,9 @@ impl ParquetExec { } /// Return the value described in [`Self::with_reorder_filters`] - pub fn reorder_filters(&self) -> bool { + fn reorder_filters(&self, config_options: &ConfigOptions) -> bool { self.reorder_filters - .or_else(|| { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_REORDER_FILTERS) - }) + .or_else(|| config_options.get_bool(OPT_PARQUET_REORDER_FILTERS)) // default to false .unwrap_or_default() } @@ -235,14 +225,9 @@ impl ParquetExec { } /// Return the value described in [`Self::with_enable_page_index`] - pub fn enable_page_index(&self) -> bool { + fn enable_page_index(&self, config_options: &ConfigOptions) -> bool { self.enable_page_index - .or_else(|| { - self.base_config - .config_options - .read() - .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX) - }) + .or_else(|| config_options.get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX)) // default to false .unwrap_or_default() } @@ -302,6 +287,8 @@ impl ExecutionPlan for ParquetExec { }) })?; + let config_options = ctx.session_config().config_options(); + let opener = ParquetOpener { partition_index, projection: Arc::from(projection), @@ -312,9 +299,9 @@ impl ExecutionPlan for ParquetExec { metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - enable_page_index: self.enable_page_index(), + pushdown_filters: self.pushdown_filters(config_options), + reorder_filters: self.reorder_filters(config_options), + enable_page_index: self.enable_page_index(config_options), }; let stream = FileStream::new( @@ -726,7 +713,6 @@ mod tests { // See also `parquet_exec` integration test use super::*; - use crate::config::ConfigOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; @@ -820,7 +806,6 @@ mod tests { projection, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, predicate, @@ -1297,10 +1282,9 @@ mod tests { let session_ctx = SessionContext::new(); let state = session_ctx.state(); let task_ctx = state.task_ctx(); - let format = ParquetFormat::new(state.config_options()); let parquet_exec = scan_format( &state, - &format, + &ParquetFormat::default(), &testdata, filename, Some(vec![0, 1, 2]), @@ -1345,9 +1329,9 @@ mod tests { } async fn assert_parquet_read( + state: &SessionState, file_groups: Vec>, expected_row_num: Option, - task_ctx: Arc, file_schema: SchemaRef, ) -> Result<()> { let parquet_exec = ParquetExec::new( @@ -1359,14 +1343,13 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, None, None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let results = parquet_exec.execute(0, task_ctx)?.next().await; + let results = parquet_exec.execute(0, state.task_ctx())?.next().await; if let Some(expected_row_num) = expected_row_num { let batch = results.unwrap()?; @@ -1387,7 +1370,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let store = Arc::new(LocalFileSystem::new()) as _; - let file_schema = ParquetFormat::new(state.config_options()) + let file_schema = ParquetFormat::default() .infer_schema(&state, &store, &[meta.clone()]) .await?; @@ -1398,16 +1381,9 @@ mod tests { file_range(&meta, 5, i64::MAX), ]]; - assert_parquet_read(group_empty, None, state.task_ctx(), file_schema.clone()) - .await?; - assert_parquet_read( - group_contain, - Some(8), - state.task_ctx(), - file_schema.clone(), - ) - .await?; - assert_parquet_read(group_all, Some(8), state.task_ctx(), file_schema).await?; + assert_parquet_read(&state, group_empty, None, file_schema.clone()).await?; + assert_parquet_read(&state, group_contain, Some(8), file_schema.clone()).await?; + assert_parquet_read(&state, group_all, Some(8), file_schema).await?; Ok(()) } @@ -1426,7 +1402,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::new(session_ctx.config_options()) + let schema = ParquetFormat::default() .infer_schema(&state, &store, &[meta.clone()]) .await .unwrap(); @@ -1456,7 +1432,6 @@ mod tests { ("month".to_owned(), partition_type_wrap(DataType::Utf8)), ("day".to_owned(), partition_type_wrap(DataType::Utf8)), ], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, None, @@ -1491,7 +1466,7 @@ mod tests { #[tokio::test] async fn parquet_exec_with_error() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let state = session_ctx.state(); let location = Path::from_filesystem_path(".") .unwrap() .child("invalid.parquet"); @@ -1516,14 +1491,13 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, None, None, ); - let mut results = parquet_exec.execute(0, task_ctx)?; + let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found"); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 84c75f30bc50..7e84b0d8042f 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -996,7 +996,6 @@ impl DefaultPhysicalPlanner { }; let prefer_hash_join = session_state.config.config_options() - .read() .get_bool(OPT_PREFER_HASH_JOIN) .unwrap_or_default(); if join_on.is_empty() { @@ -1718,8 +1717,7 @@ impl DefaultPhysicalPlanner { if !session_state .config - .config_options - .read() + .config_options() .get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY) .unwrap_or_default() { @@ -1730,8 +1728,7 @@ impl DefaultPhysicalPlanner { if !session_state .config - .config_options - .read() + .config_options() .get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY) .unwrap_or_default() { diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index a6ab51bb18ad..456c60e8a17a 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -106,9 +106,9 @@ impl ExecutionPlan for StreamingTableExec { fn execute( &self, partition: usize, - _context: Arc, + ctx: Arc, ) -> Result { - let stream = self.partitions[partition].execute(); + let stream = self.partitions[partition].execute(ctx); Ok(match self.projection.clone() { Some(projection) => Box::pin(RecordBatchStreamAdapter::new( self.projected_schema.clone(), diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 837610deb08f..efa3fece15c7 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,7 +18,6 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; -use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; @@ -182,7 +181,6 @@ pub fn partitioned_csv_config( projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }) } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 8123badb1dc4..d3cf06c3465b 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -20,7 +20,6 @@ use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use bytes::Bytes; use datafusion::assert_batches_sorted_eq; -use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -83,7 +82,6 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }, None, @@ -94,7 +92,6 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { ))); let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap(); diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index fc74d7ded7e7..ac3744278c14 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -495,10 +495,10 @@ impl<'a> TestCase<'a> { ) -> RecordBatch { println!(" scan options: {scan_options:?}"); println!(" reading with filter {:?}", filter); - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(scan_options.config()); let exec = self .test_parquet_file - .create_scan(filter.clone(), scan_options) + .create_scan(filter.clone()) .await .unwrap(); let result = collect(exec.clone(), ctx.task_ctx()).await.unwrap(); diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index ddac46c21cd8..58f22716bbf1 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -132,13 +132,16 @@ impl ContextWithParquet { Self::with_config(scenario, unit, SessionConfig::new()).await } - async fn with_config(scenario: Scenario, unit: Unit, config: SessionConfig) -> Self { + async fn with_config( + scenario: Scenario, + unit: Unit, + mut config: SessionConfig, + ) -> Self { let file = match unit { Unit::RowGroup => make_test_file_rg(scenario).await, Unit::Page => { config - .config_options - .write() + .config_options_mut() .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true); make_test_file_page(scenario).await } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index b316302a2f7d..e62124a333fc 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -46,7 +46,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { size: metadata.len() as usize, }; - let schema = ParquetFormat::new(state.config_options()) + let schema = ParquetFormat::default() .infer_schema(state, &store, &[meta.clone()]) .await .unwrap(); @@ -68,7 +68,6 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { projection: None, limit: None, table_partition_cols: vec![], - config_options: state.config_options(), output_ordering: None, }, Some(filter), diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index aeba5ee3c0dd..f04ed032b12c 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -557,7 +557,7 @@ async fn register_partitioned_alltypes_parquet( MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), ); - let options = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))) + let options = ListingOptions::new(Arc::new(ParquetFormat::default())) .with_table_partition_cols( partition_cols .iter() diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 4f5d772dcb5b..1f071febe791 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -86,7 +86,7 @@ async fn get_exec( let path = Path::from_filesystem_path(filename).unwrap(); - let format = ParquetFormat::new(state.config_options()); + let format = ParquetFormat::default(); let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); @@ -113,7 +113,6 @@ async fn get_exec( limit, table_partition_cols: vec![], output_ordering: None, - config_options: state.config_options(), }, &[], ) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 38fa4b78f15e..7eee86465dfe 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -90,7 +90,8 @@ message CsvFormat { } message ParquetFormat { - bool enable_pruning = 1; + // Used to be bool enable_pruning = 1; + reserved 1; } message AvroFormat {} @@ -186,7 +187,7 @@ message PrepareNode { string name = 1; repeated ArrowType data_types = 2; LogicalPlanNode input = 3; - } +} message CreateCatalogSchemaNode { string schema_name = 1; @@ -445,79 +446,79 @@ message InListNode { } enum ScalarFunction { - Abs=0; - Acos=1; - Asin=2; - Atan=3; - Ascii=4; - Ceil=5; - Cos=6; - Digest=7; - Exp=8; - Floor=9; - Ln=10; - Log=11; - Log10=12; - Log2=13; - Round=14; - Signum=15; - Sin=16; - Sqrt=17; - Tan=18; - Trunc=19; - Array=20; - RegexpMatch=21; - BitLength=22; - Btrim=23; - CharacterLength=24; - Chr=25; - Concat=26; - ConcatWithSeparator=27; - DatePart=28; - DateTrunc=29; - InitCap=30; - Left=31; - Lpad=32; - Lower=33; - Ltrim=34; - MD5=35; - NullIf=36; - OctetLength=37; - Random=38; - RegexpReplace=39; - Repeat=40; - Replace=41; - Reverse=42; - Right=43; - Rpad=44; - Rtrim=45; - SHA224=46; - SHA256=47; - SHA384=48; - SHA512=49; - SplitPart=50; - StartsWith=51; - Strpos=52; - Substr=53; - ToHex=54; - ToTimestamp=55; - ToTimestampMillis=56; - ToTimestampMicros=57; - ToTimestampSeconds=58; - Now=59; - Translate=60; - Trim=61; - Upper=62; - Coalesce=63; - Power=64; - StructFun=65; - FromUnixtime=66; - Atan2=67; - DateBin=68; - ArrowTypeof=69; - CurrentDate=70; - CurrentTime=71; - Uuid=72; + Abs = 0; + Acos = 1; + Asin = 2; + Atan = 3; + Ascii = 4; + Ceil = 5; + Cos = 6; + Digest = 7; + Exp = 8; + Floor = 9; + Ln = 10; + Log = 11; + Log10 = 12; + Log2 = 13; + Round = 14; + Signum = 15; + Sin = 16; + Sqrt = 17; + Tan = 18; + Trunc = 19; + Array = 20; + RegexpMatch = 21; + BitLength = 22; + Btrim = 23; + CharacterLength = 24; + Chr = 25; + Concat = 26; + ConcatWithSeparator = 27; + DatePart = 28; + DateTrunc = 29; + InitCap = 30; + Left = 31; + Lpad = 32; + Lower = 33; + Ltrim = 34; + MD5 = 35; + NullIf = 36; + OctetLength = 37; + Random = 38; + RegexpReplace = 39; + Repeat = 40; + Replace = 41; + Reverse = 42; + Right = 43; + Rpad = 44; + Rtrim = 45; + SHA224 = 46; + SHA256 = 47; + SHA384 = 48; + SHA512 = 49; + SplitPart = 50; + StartsWith = 51; + Strpos = 52; + Substr = 53; + ToHex = 54; + ToTimestamp = 55; + ToTimestampMillis = 56; + ToTimestampMicros = 57; + ToTimestampSeconds = 58; + Now = 59; + Translate = 60; + Trim = 61; + Upper = 62; + Coalesce = 63; + Power = 64; + StructFun = 65; + FromUnixtime = 66; + Atan2 = 67; + DateBin = 68; + ArrowTypeof = 69; + CurrentDate = 70; + CurrentTime = 71; + Uuid = 72; } message ScalarFunctionNode { @@ -533,18 +534,18 @@ enum AggregateFunction { COUNT = 4; APPROX_DISTINCT = 5; ARRAY_AGG = 6; - VARIANCE=7; - VARIANCE_POP=8; - COVARIANCE=9; - COVARIANCE_POP=10; - STDDEV=11; - STDDEV_POP=12; - CORRELATION=13; + VARIANCE = 7; + VARIANCE_POP = 8; + COVARIANCE = 9; + COVARIANCE_POP = 10; + STDDEV = 11; + STDDEV_POP = 12; + CORRELATION = 13; APPROX_PERCENTILE_CONT = 14; - APPROX_MEDIAN=15; + APPROX_MEDIAN = 15; APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; GROUPING = 17; - MEDIAN=18; + MEDIAN = 18; } message AggregateExprNode { @@ -692,30 +693,30 @@ message Field { } message FixedSizeBinary{ - int32 length = 1; + int32 length = 1; } message Timestamp{ - TimeUnit time_unit = 1; - string timezone = 2; + TimeUnit time_unit = 1; + string timezone = 2; } enum DateUnit{ - Day = 0; - DateMillisecond = 1; + Day = 0; + DateMillisecond = 1; } enum TimeUnit{ - Second = 0; - Millisecond = 1; - Microsecond = 2; - Nanosecond = 3; + Second = 0; + Millisecond = 1; + Microsecond = 2; + Nanosecond = 3; } enum IntervalUnit{ - YearMonth = 0; - DayTime = 1; - MonthDayNano = 2; + YearMonth = 0; + DayTime = 1; + MonthDayNano = 2; } message Decimal{ @@ -725,21 +726,21 @@ message Decimal{ } message List{ - Field field_type = 1; + Field field_type = 1; } message FixedSizeList{ - Field field_type = 1; - int32 list_size = 2; + Field field_type = 1; + int32 list_size = 2; } message Dictionary{ - ArrowType key = 1; - ArrowType value = 2; + ArrowType key = 1; + ArrowType value = 2; } message Struct{ - repeated Field sub_field_types = 1; + repeated Field sub_field_types = 1; } enum UnionMode{ @@ -748,17 +749,17 @@ enum UnionMode{ } message Union{ - repeated Field union_types = 1; - UnionMode union_mode = 2; - repeated int32 type_ids = 3; + repeated Field union_types = 1; + UnionMode union_mode = 2; + repeated int32 type_ids = 3; } message ScalarListValue{ - // encode null explicitly to distinguish a list with a null value - // from a list with no values) - bool is_null = 3; - Field field = 1; - repeated ScalarValue values = 2; + // encode null explicitly to distinguish a list with a null value + // from a list with no values) + bool is_null = 3; + Field field = 1; + repeated ScalarValue values = 2; } message ScalarTime32Value { @@ -805,8 +806,8 @@ message StructValue { } message ScalarFixedSizeBinary{ - bytes values = 1; - int32 length = 2; + bytes values = 1; + int32 length = 2; } message ScalarValue{ @@ -814,42 +815,42 @@ message ScalarValue{ reserved 19; oneof value { - // was PrimitiveScalarType null_value = 19; - // Null value of any type - ArrowType null_value = 33; - - bool bool_value = 1; - string utf8_value = 2; - string large_utf8_value = 3; - int32 int8_value = 4; - int32 int16_value = 5; - int32 int32_value = 6; - int64 int64_value = 7; - uint32 uint8_value = 8; - uint32 uint16_value = 9; - uint32 uint32_value = 10; - uint64 uint64_value = 11; - float float32_value = 12; - double float64_value = 13; - // Literal Date32 value always has a unit of day - int32 date_32_value = 14; - ScalarTime32Value time32_value = 15; - ScalarListValue list_value = 17; - //WAS: ScalarType null_list_value = 18; - - Decimal128 decimal128_value = 20; - int64 date_64_value = 21; - int32 interval_yearmonth_value = 24; - int64 interval_daytime_value = 25; - ScalarTimestampValue timestamp_value = 26; - ScalarDictionaryValue dictionary_value = 27; - bytes binary_value = 28; - bytes large_binary_value = 29; - ScalarTime64Value time64_value = 30; - IntervalMonthDayNanoValue interval_month_day_nano = 31; - StructValue struct_value = 32; - ScalarFixedSizeBinary fixed_size_binary_value = 34; - } + // was PrimitiveScalarType null_value = 19; + // Null value of any type + ArrowType null_value = 33; + + bool bool_value = 1; + string utf8_value = 2; + string large_utf8_value = 3; + int32 int8_value = 4; + int32 int16_value = 5; + int32 int32_value = 6; + int64 int64_value = 7; + uint32 uint8_value = 8; + uint32 uint16_value = 9; + uint32 uint32_value = 10; + uint64 uint64_value = 11; + float float32_value = 12; + double float64_value = 13; + // Literal Date32 value always has a unit of day + int32 date_32_value = 14; + ScalarTime32Value time32_value = 15; + ScalarListValue list_value = 17; + //WAS: ScalarType null_list_value = 18; + + Decimal128 decimal128_value = 20; + int64 date_64_value = 21; + int32 interval_yearmonth_value = 24; + int64 interval_daytime_value = 25; + ScalarTimestampValue timestamp_value = 26; + ScalarDictionaryValue dictionary_value = 27; + bytes binary_value = 28; + bytes large_binary_value = 29; + ScalarTime64Value time64_value = 30; + IntervalMonthDayNanoValue interval_month_day_nano = 31; + StructValue struct_value = 32; + ScalarFixedSizeBinary fixed_size_binary_value = 34; + } } message Decimal128{ @@ -860,40 +861,40 @@ message Decimal128{ // Serialized data type message ArrowType{ - oneof arrow_type_enum { - EmptyMessage NONE = 1; // arrow::Type::NA - EmptyMessage BOOL = 2; // arrow::Type::BOOL - EmptyMessage UINT8 = 3; // arrow::Type::UINT8 - EmptyMessage INT8 = 4; // arrow::Type::INT8 - EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h - EmptyMessage INT16 = 6; - EmptyMessage UINT32 =7; - EmptyMessage INT32 = 8; - EmptyMessage UINT64 =9; - EmptyMessage INT64 =10 ; - EmptyMessage FLOAT16 =11 ; - EmptyMessage FLOAT32 =12 ; - EmptyMessage FLOAT64 =13 ; - EmptyMessage UTF8 =14 ; - EmptyMessage LARGE_UTF8 = 32; - EmptyMessage BINARY =15 ; - int32 FIXED_SIZE_BINARY =16 ; - EmptyMessage LARGE_BINARY = 31; - EmptyMessage DATE32 =17 ; - EmptyMessage DATE64 =18 ; - TimeUnit DURATION = 19; - Timestamp TIMESTAMP =20 ; - TimeUnit TIME32 =21 ; - TimeUnit TIME64 =22 ; - IntervalUnit INTERVAL =23 ; - Decimal DECIMAL =24 ; - List LIST =25; - List LARGE_LIST = 26; - FixedSizeList FIXED_SIZE_LIST = 27; - Struct STRUCT =28; - Union UNION =29; - Dictionary DICTIONARY =30; - } + oneof arrow_type_enum { + EmptyMessage NONE = 1; // arrow::Type::NA + EmptyMessage BOOL = 2; // arrow::Type::BOOL + EmptyMessage UINT8 = 3; // arrow::Type::UINT8 + EmptyMessage INT8 = 4; // arrow::Type::INT8 + EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h + EmptyMessage INT16 = 6; + EmptyMessage UINT32 = 7; + EmptyMessage INT32 = 8; + EmptyMessage UINT64 = 9; + EmptyMessage INT64 = 10 ; + EmptyMessage FLOAT16 = 11 ; + EmptyMessage FLOAT32 = 12 ; + EmptyMessage FLOAT64 = 13 ; + EmptyMessage UTF8 = 14 ; + EmptyMessage LARGE_UTF8 = 32; + EmptyMessage BINARY = 15 ; + int32 FIXED_SIZE_BINARY = 16 ; + EmptyMessage LARGE_BINARY = 31; + EmptyMessage DATE32 = 17 ; + EmptyMessage DATE64 = 18 ; + TimeUnit DURATION = 19; + Timestamp TIMESTAMP = 20 ; + TimeUnit TIME32 = 21 ; + TimeUnit TIME64 = 22 ; + IntervalUnit INTERVAL = 23 ; + Decimal DECIMAL = 24 ; + List LIST = 25; + List LARGE_LIST = 26; + FixedSizeList FIXED_SIZE_LIST = 27; + Struct STRUCT = 28; + Union UNION = 29; + Dictionary DICTIONARY = 30; + } } //Useful for representing an empty enum variant in rust @@ -1132,6 +1133,9 @@ message ScanLimit { } message FileScanExecConf { + // Was repeated ConfigOption options = 10; + reserved 10; + repeated FileGroup file_groups = 1; Schema schema = 2; repeated uint32 projection = 4; @@ -1140,12 +1144,6 @@ message FileScanExecConf { repeated string table_partition_cols = 7; string object_store_url = 8; repeated PhysicalSortExprNode output_ordering = 9; - repeated ConfigOption options = 10; -} - -message ConfigOption { - string key = 1; - ScalarValue value = 2; } message ParquetScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 0c9d4795e6d6..4913761e1c26 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3057,114 +3057,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { deserializer.deserialize_struct("datafusion.ColumnStats", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for ConfigOption { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if !self.key.is_empty() { - len += 1; - } - if self.value.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.ConfigOption", len)?; - if !self.key.is_empty() { - struct_ser.serialize_field("key", &self.key)?; - } - if let Some(v) = self.value.as_ref() { - struct_ser.serialize_field("value", v)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for ConfigOption { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "key", - "value", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Key, - Value, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "key" => Ok(GeneratedField::Key), - "value" => Ok(GeneratedField::Value), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = ConfigOption; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.ConfigOption") - } - - fn visit_map(self, mut map: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut key__ = None; - let mut value__ = None; - while let Some(k) = map.next_key()? { - match k { - GeneratedField::Key => { - if key__.is_some() { - return Err(serde::de::Error::duplicate_field("key")); - } - key__ = Some(map.next_value()?); - } - GeneratedField::Value => { - if value__.is_some() { - return Err(serde::de::Error::duplicate_field("value")); - } - value__ = map.next_value()?; - } - } - } - Ok(ConfigOption { - key: key__.unwrap_or_default(), - value: value__, - }) - } - } - deserializer.deserialize_struct("datafusion.ConfigOption", FIELDS, GeneratedVisitor) - } -} impl serde::Serialize for CreateCatalogNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -6153,9 +6045,6 @@ impl serde::Serialize for FileScanExecConf { if !self.output_ordering.is_empty() { len += 1; } - if !self.options.is_empty() { - len += 1; - } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -6181,9 +6070,6 @@ impl serde::Serialize for FileScanExecConf { if !self.output_ordering.is_empty() { struct_ser.serialize_field("outputOrdering", &self.output_ordering)?; } - if !self.options.is_empty() { - struct_ser.serialize_field("options", &self.options)?; - } struct_ser.end() } } @@ -6206,7 +6092,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "objectStoreUrl", "output_ordering", "outputOrdering", - "options", ]; #[allow(clippy::enum_variant_names)] @@ -6219,7 +6104,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { TablePartitionCols, ObjectStoreUrl, OutputOrdering, - Options, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6249,7 +6133,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "objectStoreUrl" | "object_store_url" => Ok(GeneratedField::ObjectStoreUrl), "outputOrdering" | "output_ordering" => Ok(GeneratedField::OutputOrdering), - "options" => Ok(GeneratedField::Options), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6277,7 +6160,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut table_partition_cols__ = None; let mut object_store_url__ = None; let mut output_ordering__ = None; - let mut options__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::FileGroups => { @@ -6331,12 +6213,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } output_ordering__ = Some(map.next_value()?); } - GeneratedField::Options => { - if options__.is_some() { - return Err(serde::de::Error::duplicate_field("options")); - } - options__ = Some(map.next_value()?); - } } } Ok(FileScanExecConf { @@ -6348,7 +6224,6 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { table_partition_cols: table_partition_cols__.unwrap_or_default(), object_store_url: object_store_url__.unwrap_or_default(), output_ordering: output_ordering__.unwrap_or_default(), - options: options__.unwrap_or_default(), }) } } @@ -11759,14 +11634,8 @@ impl serde::Serialize for ParquetFormat { S: serde::Serializer, { use serde::ser::SerializeStruct; - let mut len = 0; - if self.enable_pruning { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.ParquetFormat", len)?; - if self.enable_pruning { - struct_ser.serialize_field("enablePruning", &self.enable_pruning)?; - } + let len = 0; + let struct_ser = serializer.serialize_struct("datafusion.ParquetFormat", len)?; struct_ser.end() } } @@ -11777,13 +11646,10 @@ impl<'de> serde::Deserialize<'de> for ParquetFormat { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "enable_pruning", - "enablePruning", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - EnablePruning, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -11804,10 +11670,7 @@ impl<'de> serde::Deserialize<'de> for ParquetFormat { where E: serde::de::Error, { - match value { - "enablePruning" | "enable_pruning" => Ok(GeneratedField::EnablePruning), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } + Err(serde::de::Error::unknown_field(value, FIELDS)) } } deserializer.deserialize_identifier(GeneratedVisitor) @@ -11825,19 +11688,10 @@ impl<'de> serde::Deserialize<'de> for ParquetFormat { where V: serde::de::MapAccess<'de>, { - let mut enable_pruning__ = None; - while let Some(k) = map.next_key()? { - match k { - GeneratedField::EnablePruning => { - if enable_pruning__.is_some() { - return Err(serde::de::Error::duplicate_field("enablePruning")); - } - enable_pruning__ = Some(map.next_value()?); - } - } + while map.next_key::()?.is_some() { + let _ = map.next_value::()?; } Ok(ParquetFormat { - enable_pruning: enable_pruning__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index afd3b2d02f56..a6778755987c 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -113,10 +113,7 @@ pub struct CsvFormat { pub delimiter: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ParquetFormat { - #[prost(bool, tag = "1")] - pub enable_pruning: bool, -} +pub struct ParquetFormat {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct AvroFormat {} #[derive(Clone, PartialEq, ::prost::Message)] @@ -1506,15 +1503,6 @@ pub struct FileScanExecConf { pub object_store_url: ::prost::alloc::string::String, #[prost(message, repeated, tag = "9")] pub output_ordering: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "10")] - pub options: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ConfigOption { - #[prost(string, tag = "1")] - pub key: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] - pub value: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 10b4d0944f47..77b476356c20 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -353,12 +353,9 @@ impl AsLogicalPlan for LogicalPlanNode { self )) })? { - &FileFormatType::Parquet(protobuf::ParquetFormat { - enable_pruning, - }) => Arc::new( - ParquetFormat::new(ctx.config_options()) - .with_enable_pruning(Some(enable_pruning)), - ), + &FileFormatType::Parquet(protobuf::ParquetFormat {}) => { + Arc::new(ParquetFormat::default()) + } FileFormatType::Csv(protobuf::CsvFormat { has_header, delimiter, @@ -820,12 +817,8 @@ impl AsLogicalPlan for LogicalPlanNode { if let Some(listing_table) = source.downcast_ref::() { let any = listing_table.options().format.as_any(); - let file_format_type = if let Some(parquet) = - any.downcast_ref::() - { - FileFormatType::Parquet(protobuf::ParquetFormat { - enable_pruning: parquet.enable_pruning(), - }) + let file_format_type = if any.is::() { + FileFormatType::Parquet(protobuf::ParquetFormat {}) } else if let Some(csv) = any.downcast_ref::() { FileFormatType::Csv(protobuf::CsvFormat { delimiter: byte_to_string(csv.delimiter())?, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 56f94feb1abd..311ed46ffc16 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -22,7 +22,6 @@ use arrow::datatypes::DataType; use chrono::TimeZone; use chrono::Utc; use datafusion::arrow::datatypes::Schema; -use datafusion::config::ConfigOptions; use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::ExecutionProps; @@ -52,7 +51,6 @@ use crate::logical_plan; use crate::protobuf::physical_expr_node::ExprType; use datafusion::physical_plan::joins::utils::JoinSide; use datafusion::physical_plan::sorts::sort::SortOptions; -use parking_lot::RwLock; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -370,19 +368,6 @@ pub fn parse_protobuf_file_scan_config( Some(output_ordering) }; - let mut config_options = ConfigOptions::new(); - for option in proto.options.iter() { - config_options.set( - &option.key, - option - .value - .as_ref() - .map(|value| value.try_into()) - .transpose()? - .unwrap(), - ); - } - Ok(FileScanConfig { object_store_url, file_schema: schema, @@ -392,7 +377,6 @@ pub fn parse_protobuf_file_scan_config( limit: proto.limit.as_ref().map(|sl| sl.limit as usize), table_partition_cols, output_ordering, - config_options: Arc::new(RwLock::new(config_options)), }) } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a5ea75282777..c122b5f43486 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1213,7 +1213,6 @@ mod roundtrip_tests { use crate::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec}; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::datatypes::IntervalUnit; - use datafusion::config::ConfigOptions; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::ExecutionProps; use datafusion::logical_expr::create_udf; @@ -1247,7 +1246,6 @@ mod roundtrip_tests { scalar::ScalarValue, }; use datafusion_common::Result; - use parking_lot::RwLock; fn roundtrip_test(exec_plan: Arc) -> Result<()> { let ctx = SessionContext::new(); @@ -1459,7 +1457,6 @@ mod roundtrip_tests { #[test] fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { let scan_config = FileScanConfig { - config_options: Arc::new(RwLock::new(ConfigOptions::new())), // TODO add serde object_store_url: ObjectStoreUrl::local_filesystem(), file_schema: Arc::new(Schema::new(vec![Field::new( "col", diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 943f4f70da6c..928c6098ea9a 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -40,7 +40,7 @@ use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, use datafusion::physical_plan::{AggregateExpr, PhysicalExpr}; use crate::protobuf; -use crate::protobuf::{ConfigOption, PhysicalSortExprNode}; +use crate::protobuf::PhysicalSortExprNode; use datafusion::logical_expr::BuiltinScalarFunction; use datafusion::physical_expr::expressions::DateTimeIntervalExpr; use datafusion::physical_expr::ScalarFunctionExpr; @@ -445,19 +445,6 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf { } else { vec![] }; - let options = { - let config_options = conf.config_options.read().options().clone(); - config_options - .into_iter() - .map(|(key, value)| { - let value = (&value).try_into()?; - Ok(ConfigOption { - key, - value: Some(value), - }) - }) - .collect::, DataFusionError>>()? - }; Ok(protobuf::FileScanExecConf { file_groups, @@ -478,7 +465,6 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf { .collect::>(), object_store_url: conf.object_store_url.to_string(), output_ordering, - options, }) } } diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs index f1d06c46a7da..5e8f15c0a4a8 100644 --- a/parquet-test-utils/src/lib.rs +++ b/parquet-test-utils/src/lib.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion::common::ToDFSchema; use datafusion::config::{ - ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS, + OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS, OPT_PARQUET_REORDER_FILTERS, }; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; @@ -37,7 +37,7 @@ use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::Expr; +use datafusion::prelude::{Expr, SessionConfig}; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -58,6 +58,16 @@ pub struct ParquetScanOptions { pub enable_page_index: bool, } +impl ParquetScanOptions { + /// Returns a [`SessionConfig`] with the given options + pub fn config(&self) -> SessionConfig { + SessionConfig::new() + .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, self.pushdown_filters) + .set_bool(OPT_PARQUET_REORDER_FILTERS, self.reorder_filters) + .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, self.enable_page_index) + } +} + impl TestParquetFile { /// Creates a new parquet file at the specified location with the /// given properties @@ -119,22 +129,7 @@ impl TestParquetFile { /// (FilterExec) /// (ParquetExec) /// ``` - pub async fn create_scan( - &self, - filter: Expr, - scan_options: ParquetScanOptions, - ) -> Result> { - let ParquetScanOptions { - pushdown_filters, - reorder_filters, - enable_page_index, - } = scan_options; - - let mut config_options = ConfigOptions::new(); - config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); - config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); - config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); - + pub async fn create_scan(&self, filter: Expr) -> Result> { let scan_config = FileScanConfig { object_store_url: self.object_store_url.clone(), file_schema: self.schema.clone(), @@ -148,7 +143,6 @@ impl TestParquetFile { projection: None, limit: None, table_partition_cols: vec![], - config_options: config_options.into_shareable(), output_ordering: None, };