Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't share ConfigOptions (#3886) #4712

Merged
merged 6 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use datafusion::prelude::{col, SessionContext};
use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
Expand Down Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {
let opt: Opt = Opt::from_args();
println!("Running benchmarks with the following options: {:?}", opt);

let config = SessionConfig::new().with_target_partitions(opt.partitions);
let mut ctx = SessionContext::with_config(config);

let path = opt.path.join("logs.parquet");

let mut props_builder = WriterProperties::builder();
Expand All @@ -88,17 +85,12 @@ async fn main() -> Result<()> {

let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;

run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
run_benchmarks(opt, &test_file).await?;

Ok(())
}

async fn run_benchmarks(
ctx: &mut SessionContext,
test_file: &TestParquetFile,
iterations: usize,
debug: bool,
) -> Result<()> {
async fn run_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> {
let scan_options_matrix = vec![
ParquetScanOptions {
pushdown_filters: false,
Expand Down Expand Up @@ -148,11 +140,14 @@ async fn run_benchmarks(
println!("Executing with filter '{}'", filter_expr);
for scan_options in &scan_options_matrix {
println!("Using scan options {:?}", scan_options);
for i in 0..iterations {
for i in 0..opt.iterations {
let start = Instant::now();

let config = scan_options.config().with_target_partitions(opt.partitions);
let ctx = SessionContext::with_config(config);

let rows =
exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
.await?;
exec_scan(&ctx, test_file, filter_expr.clone(), opt.debug).await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
Expand All @@ -170,10 +165,9 @@ async fn exec_scan(
ctx: &SessionContext,
test_file: &TestParquetFile,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let exec = test_file.create_scan(filter, scan_options).await?;
let exec = test_file.create_scan(filter).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ async fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::new(state.config_options())
.with_enable_pruning(Some(true));
let format = ParquetFormat::default().with_enable_pruning(Some(true));

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down
39 changes: 11 additions & 28 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::arrow::error::ArrowError;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
use futures::stream::BoxStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

Expand All @@ -39,37 +38,21 @@ pub struct FlightServiceImpl {}

#[tonic::async_trait]
impl FlightService for FlightServiceImpl {
type HandshakeStream = Pin<
Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>,
>;
type ListFlightsStream =
Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>;
type DoGetStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
type DoPutStream =
Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>;
type DoActionStream = Pin<
Box<
dyn Stream<Item = Result<arrow_flight::Result, Status>>
+ Send
+ Sync
+ 'static,
>,
>;
type ListActionsStream =
Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>;
type DoExchangeStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot this was here -- I have to give this example love to give this after my work to make arrow-flight easier to use

type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;

async fn get_schema(
&self,
request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let config = SessionConfig::new();
let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::new(config.config_options())));
let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_path =
ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;

Expand All @@ -79,10 +62,10 @@ impl FlightService for FlightServiceImpl {
.await
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(&schema, &options)
.try_into()
.map_err(|e: ArrowError| tonic::Status::internal(e.to_string()))?;
.map_err(|e: ArrowError| Status::internal(e.to_string()))?;

Ok(Response::new(schema_result))
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ async fn main() -> Result<()> {
let testdata = datafusion::test_util::parquet_test_data();

// Configure listing options
let file_format =
ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true));
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());

Expand Down
42 changes: 20 additions & 22 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use parking_lot::RwLock;

use datafusion_common::Result;

use crate::config::ConfigOptions;
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
Expand All @@ -55,20 +55,17 @@ const DF_SETTINGS: &str = "df_settings";
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -89,15 +86,10 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig {
catalog_list,
config_options,
},
}) as Arc<dyn SchemaProvider>
})
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig { catalog_list },
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
Expand Down Expand Up @@ -127,7 +119,6 @@ struct InformationSchemaProvider {
#[derive(Clone)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaConfig {
Expand Down Expand Up @@ -220,8 +211,12 @@ impl InformationSchemaConfig {
}

/// Construct the `information_schema.df_settings` virtual table
fn make_df_settings(&self, builder: &mut InformationSchemaDfSettingsBuilder) {
for (name, setting) in self.config_options.read().options() {
fn make_df_settings(
&self,
config_options: &ConfigOptions,
builder: &mut InformationSchemaDfSettingsBuilder,
) {
for (name, setting) in config_options.options() {
builder.add_setting(name, setting.to_string());
}
}
Expand Down Expand Up @@ -298,7 +293,7 @@ impl PartitionStream for InformationSchemaTables {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -389,7 +384,7 @@ impl PartitionStream for InformationSchemaViews {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -503,7 +498,7 @@ impl PartitionStream for InformationSchemaColumns {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -690,15 +685,18 @@ impl PartitionStream for InformationSchemaDfSettings {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
let mut builder = self.builder();
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
// create a mem table with the names of tables
config.make_df_settings(&mut builder);
config.make_df_settings(
ctx.session_config().config_options(),
&mut builder,
);
Ok(builder.finish())
}),
))
Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::env;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/*-************************************
* Catalog related
Expand Down Expand Up @@ -484,11 +482,6 @@ impl ConfigOptions {
Self { options }
}

/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
}

/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: state.config_options(),
output_ordering: None,
},
&[],
Expand Down
Loading