Skip to content

Commit

Permalink
Don't share ConfigOptions (apache#3886)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 22, 2022
1 parent c9d6118 commit 3327d11
Show file tree
Hide file tree
Showing 34 changed files with 332 additions and 655 deletions.
3 changes: 1 addition & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ async fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::new(state.config_options())
.with_enable_pruning(Some(true));
let format = ParquetFormat::default().with_enable_pruning(Some(true));

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ async fn main() -> Result<()> {
let testdata = datafusion::test_util::parquet_test_data();

// Configure listing options
let file_format =
ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true));
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());

Expand Down
42 changes: 20 additions & 22 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use parking_lot::RwLock;

use datafusion_common::Result;

use crate::config::ConfigOptions;
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
Expand All @@ -55,20 +55,17 @@ const DF_SETTINGS: &str = "df_settings";
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -89,15 +86,10 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig {
catalog_list,
config_options,
},
}) as Arc<dyn SchemaProvider>
})
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig { catalog_list },
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
Expand Down Expand Up @@ -127,7 +119,6 @@ struct InformationSchemaProvider {
#[derive(Clone)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaConfig {
Expand Down Expand Up @@ -220,8 +211,12 @@ impl InformationSchemaConfig {
}

/// Construct the `information_schema.df_settings` virtual table
fn make_df_settings(&self, builder: &mut InformationSchemaDfSettingsBuilder) {
for (name, setting) in self.config_options.read().options() {
fn make_df_settings(
&self,
config_options: &ConfigOptions,
builder: &mut InformationSchemaDfSettingsBuilder,
) {
for (name, setting) in config_options.options() {
builder.add_setting(name, setting.to_string());
}
}
Expand Down Expand Up @@ -298,7 +293,7 @@ impl PartitionStream for InformationSchemaTables {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -389,7 +384,7 @@ impl PartitionStream for InformationSchemaViews {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -503,7 +498,7 @@ impl PartitionStream for InformationSchemaColumns {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -690,15 +685,18 @@ impl PartitionStream for InformationSchemaDfSettings {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
let mut builder = self.builder();
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
// create a mem table with the names of tables
config.make_df_settings(&mut builder);
config.make_df_settings(
ctx.session_config().config_options(),
&mut builder,
);
Ok(builder.finish())
}),
))
Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::env;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/*-************************************
* Catalog related
Expand Down Expand Up @@ -484,11 +482,6 @@ impl ConfigOptions {
Self { options }
}

/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
}

/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: state.config_options(),
output_ordering: None,
},
&[],
Expand Down
70 changes: 28 additions & 42 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion_common::DataFusionError;
use datafusion_optimizer::utils::conjunction;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parking_lot::RwLock;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
Expand Down Expand Up @@ -56,25 +55,25 @@ use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";

/// The Apache Parquet `FileFormat` implementation
#[derive(Debug)]
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
/// associated with the [`SessionState`] instead of overridden on a format-bases
///
/// TODO: Deprecate and remove overrides (#4349)
#[derive(Debug, Default)]
pub struct ParquetFormat {
// Session level configuration
config_options: Arc<RwLock<ConfigOptions>>,
// local overides
/// Override the global setting for enable_pruning
enable_pruning: Option<bool>,
/// Override the global setting for metadata_size_hint
metadata_size_hint: Option<usize>,
/// Override the global setting for skip_metadata
skip_metadata: Option<bool>,
}

impl ParquetFormat {
/// construct a new Format with the specified `ConfigOptions`
pub fn new(config_options: Arc<RwLock<ConfigOptions>>) -> Self {
Self {
config_options,
enable_pruning: None,
metadata_size_hint: None,
skip_metadata: None,
}
/// construct a new Format with no local overrides
pub fn new() -> Self {
Self::default()
}

/// Activate statistics based row group level pruning
Expand All @@ -85,13 +84,9 @@ impl ParquetFormat {
}

/// Return true if pruning is enabled
pub fn enable_pruning(&self) -> bool {
pub fn enable_pruning(&self, config_options: &ConfigOptions) -> bool {
self.enable_pruning
.or_else(|| {
self.config_options
.read()
.get_bool(OPT_PARQUET_ENABLE_PRUNING)
})
.or_else(|| config_options.get_bool(OPT_PARQUET_ENABLE_PRUNING))
.unwrap_or(true)
}

Expand All @@ -107,12 +102,9 @@ impl ParquetFormat {
}

/// Return the metadata size hint if set
pub fn metadata_size_hint(&self) -> Option<usize> {
self.metadata_size_hint.or_else(|| {
self.config_options
.read()
.get_usize(OPT_PARQUET_METADATA_SIZE_HINT)
})
pub fn metadata_size_hint(&self, config_options: &ConfigOptions) -> Option<usize> {
self.metadata_size_hint
.or_else(|| config_options.get_usize(OPT_PARQUET_METADATA_SIZE_HINT))
}

/// Tell the parquet reader to skip any metadata that may be in
Expand All @@ -127,13 +119,9 @@ impl ParquetFormat {

/// returns true if schema metadata will be cleared prior to
/// schema merging.
pub fn skip_metadata(&self) -> bool {
pub fn skip_metadata(&self, config_options: &ConfigOptions) -> bool {
self.skip_metadata
.or_else(|| {
self.config_options
.read()
.get_bool(OPT_PARQUET_SKIP_METADATA)
})
.or_else(|| config_options.get_bool(OPT_PARQUET_SKIP_METADATA))
.unwrap_or(true)
}
}
Expand Down Expand Up @@ -163,7 +151,7 @@ impl FileFormat for ParquetFormat {

async fn infer_schema(
&self,
_state: &SessionState,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
Expand All @@ -174,7 +162,7 @@ impl FileFormat for ParquetFormat {
schemas.push(schema)
}

let schema = if self.skip_metadata() {
let schema = if self.skip_metadata(state.config_options()) {
Schema::try_merge(clear_metadata(schemas))
} else {
Schema::try_merge(schemas)
Expand Down Expand Up @@ -202,14 +190,14 @@ impl FileFormat for ParquetFormat {

async fn create_physical_plan(
&self,
_state: &SessionState,
state: &SessionState,
conf: FileScanConfig,
filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning() {
let predicate = if self.enable_pruning(state.config_options()) {
conjunction(filters.to_vec())
} else {
None
Expand All @@ -218,7 +206,7 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(ParquetExec::new(
conf,
predicate,
self.metadata_size_hint(),
self.metadata_size_hint(state.config_options()),
)))
}
}
Expand Down Expand Up @@ -655,7 +643,7 @@ mod tests {

let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::new(ctx.config_options());
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();

let stats =
Expand Down Expand Up @@ -804,8 +792,7 @@ mod tests {

let session = SessionContext::new();
let ctx = session.state();
let format =
ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(Some(9));
let format = ParquetFormat::default().with_metadata_size_hint(Some(9));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.await
Expand Down Expand Up @@ -835,8 +822,7 @@ mod tests {
// ensure the requests were coalesced into a single request
assert_eq!(store.request_count(), 1);

let format = ParquetFormat::new(ctx.config_options())
.with_metadata_size_hint(Some(size_hint));
let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.await
Expand Down Expand Up @@ -1272,7 +1258,7 @@ mod tests {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::parquet_test_data();
let format = ParquetFormat::new(state.config_options());
let format = ParquetFormat::default();
scan_format(state, &format, &testdata, file_name, projection, limit).await
}
}
Loading

0 comments on commit 3327d11

Please sign in to comment.