From 7864b838bbbf0f778e1d3956d9dead9789aea0c7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 21 Dec 2022 17:44:24 +0000 Subject: [PATCH] Push SessionState into FileFormat (#4349) --- .../core/src/datasource/file_format/avro.rs | 57 ++++++---- .../core/src/datasource/file_format/csv.rs | 25 +++-- .../core/src/datasource/file_format/json.rs | 32 ++++-- .../core/src/datasource/file_format/mod.rs | 13 ++- .../src/datasource/file_format/parquet.rs | 102 ++++++++++-------- .../core/src/datasource/listing/table.rs | 6 +- datafusion/core/src/execution/context.rs | 5 + .../src/physical_plan/file_format/avro.rs | 26 +++-- .../src/physical_plan/file_format/json.rs | 15 +-- .../src/physical_plan/file_format/parquet.rs | 52 +++++---- datafusion/core/tests/parquet/page_pruning.rs | 37 ++++--- datafusion/core/tests/row.rs | 39 +++---- 12 files changed, 234 insertions(+), 175 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index ac1ea1ba8626..65301a62145b 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -28,6 +28,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; use crate::error::Result; +use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; @@ -47,6 +48,7 @@ impl FileFormat for AvroFormat { async fn infer_schema( &self, + _ctx: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -68,6 +70,7 @@ impl FileFormat for AvroFormat { async fn infer_stats( &self, + _ctx: &SessionState, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -77,6 +80,7 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, + _ctx: &SessionState, conf: FileScanConfig, _filters: &[Expr], ) -> Result> { @@ -101,10 +105,11 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); - let ctx = SessionContext::with_config(config); + let session_ctx = SessionContext::with_config(config); + let ctx = session_ctx.state(); let task_ctx = ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let stream = exec.execute(0, task_ctx)?; let tt_batches = stream @@ -124,9 +129,10 @@ mod tests { #[tokio::test] async fn read_limit() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.avro", projection, Some(1)).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, Some(1)).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); @@ -138,9 +144,10 @@ mod tests { #[tokio::test] async fn read_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let x: Vec = exec .schema() @@ -190,9 +197,10 @@ mod tests { #[tokio::test] async fn read_bool_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -216,9 +224,10 @@ mod tests { #[tokio::test] async fn read_i32_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -239,9 +248,10 @@ mod tests { #[tokio::test] async fn read_i96_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -262,9 +272,10 @@ mod tests { #[tokio::test] async fn read_f32_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -288,9 +299,10 @@ mod tests { #[tokio::test] async fn read_f64_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -314,9 +326,10 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.avro", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.avro", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(batches.len(), 1); @@ -338,6 +351,7 @@ mod tests { } async fn get_exec( + ctx: &SessionState, file_name: &str, projection: Option>, limit: Option, @@ -345,7 +359,7 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let store_root = format!("{}/avro", testdata); let format = AvroFormat {}; - scan_format(&format, &store_root, file_name, projection, limit).await + scan_format(ctx, &format, &store_root, file_name, projection, limit).await } } @@ -356,13 +370,16 @@ mod tests { use super::super::test_util::scan_format; use crate::error::DataFusionError; + use crate::prelude::SessionContext; #[tokio::test] async fn test() -> Result<()> { + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let format = AvroFormat {}; let testdata = crate::test_util::arrow_test_data(); let filename = "avro/alltypes_plain.avro"; - let result = scan_format(&format, &testdata, filename, None, None).await; + let result = scan_format(&ctx, &format, &testdata, filename, None, None).await; assert!(matches!( result, Err(DataFusionError::NotImplemented(msg)) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index d44f6ca51d65..4dc7e81ba49e 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -35,6 +35,7 @@ use super::FileFormat; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; +use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; @@ -113,6 +114,7 @@ impl FileFormat for CsvFormat { async fn infer_schema( &self, + _ctx: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -150,6 +152,7 @@ impl FileFormat for CsvFormat { async fn infer_stats( &self, + _ctx: &SessionState, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -159,6 +162,7 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, + _ctx: &SessionState, conf: FileScanConfig, _filters: &[Expr], ) -> Result> { @@ -184,11 +188,12 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); - let ctx = SessionContext::with_config(config); + let session_ctx = SessionContext::with_config(config); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); // skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work) let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]); - let exec = get_exec("aggregate_test_100.csv", projection, None).await?; - let task_ctx = ctx.task_ctx(); + let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?; let stream = exec.execute(0, task_ctx)?; let tt_batches: i32 = stream @@ -212,9 +217,10 @@ mod tests { #[tokio::test] async fn read_limit() -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3]); - let exec = get_exec("aggregate_test_100.csv", projection, Some(1)).await?; + let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, Some(1)).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(4, batches[0].num_columns()); @@ -225,8 +231,11 @@ mod tests { #[tokio::test] async fn infer_schema() -> Result<()> { + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let projection = None; - let exec = get_exec("aggregate_test_100.csv", projection, None).await?; + let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?; let x: Vec = exec .schema() @@ -259,9 +268,10 @@ mod tests { #[tokio::test] async fn read_char_column() -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("aggregate_test_100.csv", projection, None).await?; + let exec = get_exec(&ctx, "aggregate_test_100.csv", projection, None).await?; let batches = collect(exec, task_ctx).await.expect("Collect batches"); @@ -281,12 +291,13 @@ mod tests { } async fn get_exec( + ctx: &SessionState, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let root = format!("{}/csv", crate::test_util::arrow_test_data()); let format = CsvFormat::default(); - scan_format(&format, &root, file_name, projection, limit).await + scan_format(ctx, &format, &root, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 08bb2adece85..7443a810400f 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -36,6 +36,7 @@ use super::FileScanConfig; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; +use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; @@ -86,6 +87,7 @@ impl FileFormat for JsonFormat { async fn infer_schema( &self, + _ctx: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -129,6 +131,7 @@ impl FileFormat for JsonFormat { async fn infer_stats( &self, + _ctx: &SessionState, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -138,6 +141,7 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, + _ctx: &SessionState, conf: FileScanConfig, _filters: &[Expr], ) -> Result> { @@ -161,10 +165,11 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); - let ctx = SessionContext::with_config(config); - let projection = None; - let exec = get_exec(projection, None).await?; + let session_ctx = SessionContext::with_config(config); + let ctx = session_ctx.state(); let task_ctx = ctx.task_ctx(); + let projection = None; + let exec = get_exec(&ctx, projection, None).await?; let stream = exec.execute(0, task_ctx)?; let tt_batches: i32 = stream @@ -188,9 +193,10 @@ mod tests { #[tokio::test] async fn read_limit() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; - let exec = get_exec(projection, Some(1)).await?; + let exec = get_exec(&ctx, projection, Some(1)).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(4, batches[0].num_columns()); @@ -202,7 +208,9 @@ mod tests { #[tokio::test] async fn infer_schema() -> Result<()> { let projection = None; - let exec = get_exec(projection, None).await?; + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let exec = get_exec(&ctx, projection, None).await?; let x: Vec = exec .schema() @@ -218,9 +226,10 @@ mod tests { #[tokio::test] async fn read_int_column() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec(projection, None).await?; + let exec = get_exec(&ctx, projection, None).await?; let batches = collect(exec, task_ctx).await.expect("Collect batches"); @@ -243,22 +252,25 @@ mod tests { } async fn get_exec( + ctx: &SessionState, projection: Option>, limit: Option, ) -> Result> { let filename = "tests/jsons/2.json"; let format = JsonFormat::default(); - scan_format(&format, ".", filename, projection, limit).await + scan_format(ctx, &format, ".", filename, projection, limit).await } #[tokio::test] async fn infer_schema_with_limit() { + let session = SessionContext::new(); + let ctx = session.state(); let store = Arc::new(LocalFileSystem::new()) as _; let filename = "tests/jsons/schema_infer_limit.json"; let format = JsonFormat::default().with_schema_infer_max_rec(Some(3)); let file_schema = format - .infer_schema(&store, &[local_unpartitioned_file(filename)]) + .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)]) .await .expect("Schema inference"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 507e437a4b8f..77c141de0172 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,6 +36,7 @@ use crate::logical_expr::Expr; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{ExecutionPlan, Statistics}; +use crate::execution::context::SessionState; use async_trait::async_trait; use object_store::{ObjectMeta, ObjectStore}; @@ -54,6 +55,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// the files have schemas that cannot be merged. async fn infer_schema( &self, + ctx: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result; @@ -67,6 +69,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// TODO: should the file source return statistics for only columns referred to in the table schema? async fn infer_stats( &self, + ctx: &SessionState, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -76,6 +79,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// according to this file format. async fn create_physical_plan( &self, + ctx: &SessionState, conf: FileScanConfig, filters: &[Expr], ) -> Result>; @@ -84,13 +88,13 @@ pub trait FileFormat: Send + Sync + fmt::Debug { #[cfg(test)] pub(crate) mod test_util { use super::*; - use crate::config::ConfigOptions; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::test::object_store::local_unpartitioned_file; use object_store::local::LocalFileSystem; pub async fn scan_format( + ctx: &SessionState, format: &dyn FileFormat, store_root: &str, file_name: &str, @@ -100,10 +104,10 @@ pub(crate) mod test_util { let store = Arc::new(LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{}/{}", store_root, file_name)); - let file_schema = format.infer_schema(&store, &[meta.clone()]).await?; + let file_schema = format.infer_schema(ctx, &store, &[meta.clone()]).await?; let statistics = format - .infer_stats(&store, file_schema.clone(), &meta) + .infer_stats(ctx, &store, file_schema.clone(), &meta) .await?; let file_groups = vec![vec![PartitionedFile { @@ -115,6 +119,7 @@ pub(crate) mod test_util { let exec = format .create_physical_plan( + ctx, FileScanConfig { object_store_url: ObjectStoreUrl::local_filesystem(), file_schema, @@ -123,7 +128,7 @@ pub(crate) mod test_util { projection, limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: ctx.config_options(), output_ordering: None, }, &[], diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6e1fa17824fc..4b395b60570e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -46,6 +46,7 @@ use crate::config::OPT_PARQUET_METADATA_SIZE_HINT; use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; +use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; @@ -162,6 +163,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, + _ctx: &SessionState, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -183,6 +185,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, + _ctx: &SessionState, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -199,6 +202,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, + _ctx: &SessionState, conf: FileScanConfig, filters: &[Expr], ) -> Result> { @@ -649,9 +653,10 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; - let ctx = SessionContext::new(); + let session = SessionContext::new(); + let ctx = session.state(); let format = ParquetFormat::new(ctx.config_options()); - let schema = format.infer_schema(&store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; @@ -797,10 +802,14 @@ mod tests { assert_eq!(store.request_count(), 2); - let ctx = SessionContext::new(); + let session = SessionContext::new(); + let ctx = session.state(); let format = ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(Some(9)); - let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); + let schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); let stats = fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9)) @@ -826,10 +835,12 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let ctx = SessionContext::new(); let format = ParquetFormat::new(ctx.config_options()) .with_metadata_size_hint(Some(size_hint)); - let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); + let schema = format + .infer_schema(&ctx, &store.upcast(), &meta) + .await + .unwrap(); let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), @@ -863,10 +874,11 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); - let ctx = SessionContext::with_config(config); + let session_ctx = SessionContext::with_config(config); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; - let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx)?; let tt_batches = stream @@ -890,7 +902,8 @@ mod tests { #[tokio::test] async fn capture_bytes_scanned_metric() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); - let ctx = SessionContext::with_config(config); + let session = SessionContext::with_config(config); + let ctx = session.state(); // Read the full file let projection = None; @@ -915,10 +928,10 @@ mod tests { #[tokio::test] async fn read_limit() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -935,10 +948,10 @@ mod tests { #[tokio::test] async fn read_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = None; - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let x: Vec = exec .schema() @@ -974,10 +987,10 @@ mod tests { #[tokio::test] async fn read_bool_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![1]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1001,10 +1014,10 @@ mod tests { #[tokio::test] async fn read_i32_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![0]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1025,10 +1038,10 @@ mod tests { #[tokio::test] async fn read_i96_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![10]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1049,10 +1062,10 @@ mod tests { #[tokio::test] async fn read_f32_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![6]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1076,10 +1089,10 @@ mod tests { #[tokio::test] async fn read_f64_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![7]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1103,10 +1116,10 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let projection = Some(vec![9]); - let exec = - get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1130,10 +1143,11 @@ mod tests { #[tokio::test] async fn read_decimal_parquet() -> Result<()> { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); // parquet use the int32 as the physical type to store decimal - let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?; + let exec = get_exec(&ctx, "int32_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1141,7 +1155,7 @@ mod tests { assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal - let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?; + let exec = get_exec(&ctx, "int64_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1149,21 +1163,15 @@ mod tests { assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal - let exec = - get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?; + let exec = get_exec(&ctx, "fixed_length_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); - let exec = get_exec( - &session_ctx, - "fixed_length_decimal_legacy.parquet", - None, - None, - ) - .await?; + let exec = + get_exec(&ctx, "fixed_length_decimal_legacy.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1257,13 +1265,13 @@ mod tests { } async fn get_exec( - ctx: &SessionContext, + ctx: &SessionState, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); let format = ParquetFormat::new(ctx.config_options()); - scan_format(&format, &testdata, file_name, projection, limit).await + scan_format(ctx, &format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 724671ffbcbf..7a06b4ca1c76 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -385,7 +385,7 @@ impl ListingOptions { .try_collect() .await?; - self.format.infer_schema(&store, &files).await + self.format.infer_schema(ctx, &store, &files).await } } @@ -582,6 +582,7 @@ impl TableProvider for ListingTable { self.options .format .create_physical_plan( + ctx, FileScanConfig { object_store_url: self.table_paths.get(0).unwrap().object_store(), file_schema: Arc::clone(&self.file_schema), @@ -591,7 +592,7 @@ impl TableProvider for ListingTable { limit, output_ordering: self.try_create_output_ordering()?, table_partition_cols, - config_options: ctx.config.config_options(), + config_options: ctx.config_options(), }, filters, ) @@ -663,6 +664,7 @@ impl ListingTable { .options .format .infer_stats( + ctx, &store, self.file_schema.clone(), &part_file.object_meta, diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index d6eb5240ae2e..b052d86bed2f 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1800,6 +1800,11 @@ impl SessionState { pub fn config_options(&self) -> Arc> { self.config.config_options() } + + /// Get a new TaskContext to run in this session + pub fn task_ctx(&self) -> Arc { + Arc::new(TaskContext::from(self)) + } } impl ContextProvider for SessionState { diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ec5b27ca789a..ab4b4561c0c1 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -236,15 +236,19 @@ mod tests { } async fn test_with_stores(store: Arc) -> Result<()> { - let ctx = SessionContext::new(); - ctx.runtime_env() + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + + ctx.runtime_env .register_object_store("file", "", store.clone()); let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let meta = local_unpartitioned_file(filename); - let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?; + let file_schema = AvroFormat {} + .infer_schema(&ctx, &store, &[meta.clone()]) + .await?; let avro_exec = AvroExec::new(FileScanConfig { object_store_url: ObjectStoreUrl::local_filesystem(), @@ -254,7 +258,7 @@ mod tests { projection: Some(vec![0, 1, 2]), limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: ctx.config_options(), output_ordering: None, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -299,13 +303,16 @@ mod tests { #[tokio::test] async fn avro_exec_missing_column() -> Result<()> { + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let object_store = Arc::new(LocalFileSystem::new()) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let actual_schema = AvroFormat {} - .infer_schema(&object_store, &[meta.clone()]) + .infer_schema(&ctx, &object_store, &[meta.clone()]) .await?; let mut fields = actual_schema.fields().clone(); @@ -323,12 +330,11 @@ mod tests { projection, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: ctx.config_options(), output_ordering: None, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); - let ctx = SessionContext::new(); let mut results = avro_exec .execute(0, ctx.task_ctx()) .expect("plan execution failed"); @@ -370,13 +376,16 @@ mod tests { #[tokio::test] async fn avro_exec_with_partition() -> Result<()> { + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let object_store = Arc::new(LocalFileSystem::new()) as _; let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} - .infer_schema(&object_store, &[meta.clone()]) + .infer_schema(&ctx, &object_store, &[meta.clone()]) .await?; let mut partitioned_file = PartitionedFile::from(meta); @@ -401,7 +410,6 @@ mod tests { }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); - let ctx = SessionContext::new(); let mut results = avro_exec .execute(0, ctx.task_ctx()) .expect("plan execution failed"); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index fac394ee379c..cefbf8a9a55d 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -268,11 +268,11 @@ mod tests { const TEST_DATA_BASE: &str = "tests/jsons"; async fn prepare_store( - ctx: &SessionContext, + ctx: &SessionState, file_compression_type: FileCompressionType, ) -> (ObjectStoreUrl, Vec>, SchemaRef) { let store_url = ObjectStoreUrl::local_filesystem(); - let store = ctx.runtime_env().object_store(&store_url).unwrap(); + let store = ctx.runtime_env.object_store(&store_url).unwrap(); let filename = "1.json"; let file_groups = partitioned_file_groups( @@ -292,7 +292,7 @@ mod tests { .object_meta; let schema = JsonFormat::default() .with_file_compression_type(file_compression_type.to_owned()) - .infer_schema(&store, &[meta.clone()]) + .infer_schema(ctx, &store, &[meta.clone()]) .await .unwrap(); @@ -366,11 +366,12 @@ mod tests { file_compression_type: FileCompressionType, ) -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; let (object_store_url, file_groups, file_schema) = - prepare_store(&session_ctx, file_compression_type.to_owned()).await; + prepare_store(&ctx, file_compression_type.to_owned()).await; let exec = NdJsonExec::new( FileScanConfig { @@ -435,10 +436,11 @@ mod tests { file_compression_type: FileCompressionType, ) -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; let (object_store_url, file_groups, actual_schema) = - prepare_store(&session_ctx, file_compression_type.to_owned()).await; + prepare_store(&ctx, file_compression_type.to_owned()).await; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); @@ -486,9 +488,10 @@ mod tests { file_compression_type: FileCompressionType, ) -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); let (object_store_url, file_groups, file_schema) = - prepare_store(&session_ctx, file_compression_type.to_owned()).await; + prepare_store(&ctx, file_compression_type.to_owned()).await; let exec = NdJsonExec::new( FileScanConfig { diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index a9815680b150..7545f6221696 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1294,15 +1294,22 @@ mod tests { async fn parquet_exec_with_projection() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = "alltypes_plain.parquet"; - let ctx = SessionContext::new(); + let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); let format = ParquetFormat::new(ctx.config_options()); - let parquet_exec = - scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None) - .await - .unwrap(); + let parquet_exec = scan_format( + &ctx, + &format, + &testdata, + filename, + Some(vec![0, 1, 2]), + None, + ) + .await + .unwrap(); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let task_ctx = SessionContext::new().task_ctx(); let mut results = parquet_exec.execute(0, task_ctx)?; let batch = results.next().await.unwrap()?; @@ -1372,14 +1379,16 @@ mod tests { } let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); + let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let meta = local_unpartitioned_file(filename); let store = Arc::new(LocalFileSystem::new()) as _; - let file_schema = ParquetFormat::new(session_ctx.config_options()) - .infer_schema(&store, &[meta.clone()]) + let file_schema = ParquetFormat::new(ctx.config_options()) + .infer_schema(&ctx, &store, &[meta.clone()]) .await?; let group_empty = vec![vec![file_range(&meta, 0, 5)]]; @@ -1389,22 +1398,11 @@ mod tests { file_range(&meta, 5, i64::MAX), ]]; - assert_parquet_read( - group_empty, - None, - session_ctx.task_ctx(), - file_schema.clone(), - ) - .await?; - assert_parquet_read( - group_contain, - Some(8), - session_ctx.task_ctx(), - file_schema.clone(), - ) - .await?; - assert_parquet_read(group_all, Some(8), session_ctx.task_ctx(), file_schema) + assert_parquet_read(group_empty, None, ctx.task_ctx(), file_schema.clone()) .await?; + assert_parquet_read(group_contain, Some(8), ctx.task_ctx(), file_schema.clone()) + .await?; + assert_parquet_read(group_all, Some(8), ctx.task_ctx(), file_schema).await?; Ok(()) } @@ -1412,13 +1410,11 @@ mod tests { #[tokio::test] async fn parquet_exec_with_partition() -> Result<()> { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); let object_store_url = ObjectStoreUrl::local_filesystem(); - let store = session_ctx - .runtime_env() - .object_store(&object_store_url) - .unwrap(); + let store = ctx.runtime_env.object_store(&object_store_url).unwrap(); let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); @@ -1426,7 +1422,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let schema = ParquetFormat::new(session_ctx.config_options()) - .infer_schema(&store, &[meta.clone()]) + .infer_schema(&ctx, &store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 51c55912d668..55c4f9470afb 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -17,11 +17,11 @@ use crate::parquet::Unit::Page; use crate::parquet::{ContextWithParquet, Scenario}; -use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::execution::context::SessionState; use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -31,12 +31,9 @@ use object_store::path::Path; use object_store::ObjectMeta; use tokio_stream::StreamExt; -async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetExec { +async fn get_parquet_exec(ctx: &SessionState, filter: Expr) -> ParquetExec { let object_store_url = ObjectStoreUrl::local_filesystem(); - let store = session_ctx - .runtime_env() - .object_store(&object_store_url) - .unwrap(); + let store = ctx.runtime_env.object_store(&object_store_url).unwrap(); let testdata = datafusion::test_util::parquet_test_data(); let filename = format!("{}/alltypes_tiny_pages.parquet", testdata); @@ -49,8 +46,8 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE size: metadata.len() as usize, }; - let schema = ParquetFormat::new(session_ctx.config_options()) - .infer_schema(&store, &[meta.clone()]) + let schema = ParquetFormat::new(ctx.config_options()) + .infer_schema(ctx, &store, &[meta.clone()]) .await .unwrap(); @@ -71,7 +68,7 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: ctx.config_options(), output_ordering: None, }, Some(filter), @@ -83,12 +80,13 @@ async fn get_parquet_exec(filter: Expr, session_ctx: SessionContext) -> ParquetE #[tokio::test] async fn page_index_filter_one_col() { let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = session_ctx.state(); + let task_ctx = ctx.task_ctx(); // 1.create filter month == 1; let filter = col("month").eq(lit(1_i32)); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -105,7 +103,7 @@ async fn page_index_filter_one_col() { // 2. create filter month == 1 or month == 2; let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -125,7 +123,7 @@ async fn page_index_filter_one_col() { .eq(lit(1_i32)) .and(col("month").eq(lit(12_i32))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -136,7 +134,7 @@ async fn page_index_filter_one_col() { // 4.create filter 0 < month < 2 ; let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -150,7 +148,7 @@ async fn page_index_filter_one_col() { // 5.create filter date_string_col == 1; let filter = col("date_string_col").eq(lit("01/01/09")); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); let batch = results.next().await.unwrap().unwrap(); @@ -165,12 +163,13 @@ async fn page_index_filter_one_col() { #[tokio::test] async fn page_index_filter_multi_col() { let session_ctx = SessionContext::new(); + let ctx = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); // create filter month == 1 and year = 2009; let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -188,7 +187,7 @@ async fn page_index_filter_multi_col() { .eq(lit(1_i32)) .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1)))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -199,7 +198,7 @@ async fn page_index_filter_multi_col() { // this filter use two columns will not push down let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); @@ -215,7 +214,7 @@ async fn page_index_filter_multi_col() { .and(col("id").eq(lit(1))) .or(col("year").eq(lit(2010))); - let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await; + let parquet_exec = get_parquet_exec(&ctx, filter).await; let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 92c47989e877..cdc5f1218bd9 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::error::Result; +use datafusion::execution::context::SessionState; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; @@ -31,16 +31,12 @@ use std::sync::Arc; #[tokio::test] async fn test_with_parquet() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = SessionContext::new(); + let state = ctx.state(); + let task_ctx = state.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec( - &session_ctx, - "alltypes_plain.parquet", - projection.as_ref(), - None, - ) - .await?; + let exec = + get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -58,16 +54,12 @@ async fn test_with_parquet() -> Result<()> { #[tokio::test] async fn test_with_parquet_word_aligned() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let ctx = SessionContext::new(); + let state = ctx.state(); + let task_ctx = state.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec( - &session_ctx, - "alltypes_plain.parquet", - projection.as_ref(), - None, - ) - .await?; + let exec = + get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -84,7 +76,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { } async fn get_exec( - ctx: &SessionContext, + ctx: &SessionState, file_name: &str, projection: Option<&Vec>, limit: Option, @@ -101,16 +93,17 @@ async fn get_exec( let meta = object_store.head(&path).await.unwrap(); let file_schema = format - .infer_schema(&object_store, &[meta.clone()]) + .infer_schema(ctx, &object_store, &[meta.clone()]) .await .expect("Schema inference"); let statistics = format - .infer_stats(&object_store, file_schema.clone(), &meta) + .infer_stats(ctx, &object_store, file_schema.clone(), &meta) .await .expect("Stats inference"); let file_groups = vec![vec![meta.into()]]; let exec = format .create_physical_plan( + ctx, FileScanConfig { object_store_url, file_schema, @@ -119,8 +112,8 @@ async fn get_exec( projection: projection.cloned(), limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), output_ordering: None, + config_options: ctx.config_options(), }, &[], )