Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into make-dataframe-co…
Browse files Browse the repository at this point in the history
…nsuming
  • Loading branch information
tustvold committed Dec 14, 2022
2 parents eed0dd7 + da0de9d commit 23f5e40
Show file tree
Hide file tree
Showing 61 changed files with 751 additions and 453 deletions.
7 changes: 3 additions & 4 deletions datafusion-examples/examples/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use datafusion::arrow::{
array::ArrayRef, array::Float32Array, datatypes::DataType, record_batch::RecordBatch,
};
use datafusion::from_slice::FromSlice;
use datafusion::logical_expr::AggregateState;
use datafusion::{error::Result, physical_plan::Accumulator};
use datafusion::{logical_expr::Volatility, prelude::*, scalar::ScalarValue};
use datafusion_common::cast::as_float64_array;
Expand Down Expand Up @@ -108,10 +107,10 @@ impl Accumulator for GeometricMean {
// This function serializes our state to `ScalarValue`, which DataFusion uses
// to pass this state between execution stages.
// Note that this can be arbitrary data.
fn state(&self) -> Result<Vec<AggregateState>> {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![
AggregateState::Scalar(ScalarValue::from(self.prod)),
AggregateState::Scalar(ScalarValue::from(self.n)),
ScalarValue::from(self.prod),
ScalarValue::from(self.n),
])
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn create_context() -> Result<SessionContext> {
/// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b
#[tokio::main]
async fn main() -> Result<()> {
let mut ctx = create_context()?;
let ctx = create_context()?;

// First, declare the actual implementation of the calculation
let pow = |args: &[ArrayRef]| {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl DataFrame {
pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
let task_ctx = Arc::new(self.task_ctx());
let plan = self.create_physical_plan().await?;
execute_stream(plan, task_ctx).await
execute_stream(plan, task_ctx)
}

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
Expand Down Expand Up @@ -498,7 +498,7 @@ impl DataFrame {
) -> Result<Vec<SendableRecordBatchStream>> {
let task_ctx = Arc::new(self.task_ctx());
let plan = self.create_physical_plan().await?;
execute_stream_partitioned(plan, task_ctx).await
execute_stream_partitioned(plan, task_ctx)
}

/// Returns the schema describing the output of this DataFrame in terms of columns returned,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ impl SessionContext {

/// Registers a variable provider within this context.
pub fn register_variable(
&mut self,
&self,
variable_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) {
Expand All @@ -583,7 +583,7 @@ impl SessionContext {
///
/// `SELECT MY_FUNC(x)...` will look for a function named `"my_func"`
/// `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"`
pub fn register_udf(&mut self, f: ScalarUDF) {
pub fn register_udf(&self, f: ScalarUDF) {
self.state
.write()
.scalar_functions
Expand All @@ -597,7 +597,7 @@ impl SessionContext {
///
/// `SELECT MY_UDAF(x)...` will look for an aggregate named `"my_udaf"`
/// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"`
pub fn register_udaf(&mut self, f: AggregateUDF) {
pub fn register_udaf(&self, f: AggregateUDF) {
self.state
.write()
.aggregate_functions
Expand Down Expand Up @@ -633,7 +633,7 @@ impl SessionContext {

/// Creates a [`DataFrame`] for reading an Json data source.
pub async fn read_json(
&mut self,
&self,
table_path: impl AsRef<str>,
options: NdJsonReadOptions<'_>,
) -> Result<DataFrame> {
Expand Down Expand Up @@ -2080,7 +2080,7 @@ mod tests {
async fn create_variable_expr() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
let ctx = create_ctx(&tmp_dir, partition_count).await?;

let variable_provider = test::variable::SystemVar::new();
ctx.register_variable(VarType::System, Arc::new(variable_provider));
Expand Down Expand Up @@ -2138,7 +2138,7 @@ mod tests {

#[tokio::test]
async fn case_sensitive_identifiers_user_defined_functions() -> Result<()> {
let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
.unwrap();

Expand Down Expand Up @@ -2179,7 +2179,7 @@ mod tests {

#[tokio::test]
async fn case_sensitive_identifiers_user_defined_aggregates() -> Result<()> {
let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ fn create_batch_from_map(
accumulators.group_states.iter().map(|group_state| {
group_state.accumulator_set[x]
.state()
.and_then(|x| x[y].as_scalar().map(|v| v.clone()))
.map(|x| x[y].clone())
.expect("unexpected accumulator state in hash aggregate")
}),
)?;
Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,20 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
projection: self.base_config.projected_file_column_names(),
object_store,
});
let opener = private::AvroOpener { config };

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -157,6 +157,7 @@ mod private {
pub schema: SchemaRef,
pub batch_size: usize,
pub projection: Option<Vec<String>>,
pub object_store: Arc<dyn ObjectStore>,
}

impl AvroConfig {
Expand All @@ -178,14 +179,10 @@ mod private {
}

impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,25 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(CsvConfig {
batch_size: context.session_config().batch_size(),
file_schema: Arc::clone(&self.base_config.file_schema),
file_projection: self.base_config.file_column_projection_indices(),
has_header: self.has_header,
delimiter: self.delimiter,
object_store,
});

let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
};
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

Expand Down Expand Up @@ -184,6 +184,7 @@ struct CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
object_store: Arc<dyn ObjectStore>,
}

impl CsvConfig {
Expand All @@ -208,15 +209,11 @@ struct CsvOpener {
}

impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
Ok(futures::stream::iter(config.open(decoder, true)).boxed())
Expand Down
38 changes: 6 additions & 32 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@

use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::ObjectStore;

use datafusion_common::ScalarValue;

use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
Expand All @@ -56,11 +52,7 @@ pub type FileOpenFuture =
pub trait FileOpener: Unpin {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture>;
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}

/// A stream that iterates record batch by record batch, file over file.
Expand All @@ -79,8 +71,6 @@ pub struct FileStream<F: FileOpener> {
file_reader: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
/// the store from which to source the files.
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// File stream specific metrics
Expand Down Expand Up @@ -175,7 +165,6 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
metrics: ExecutionPlanMetricsSet,
) -> Result<Self> {
Expand All @@ -191,17 +180,12 @@ impl<F: FileOpener> FileStream<F> {

let files = config.file_groups[partition].clone();

let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;

Ok(Self {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
pc_projector,
object_store,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
Expand All @@ -228,7 +212,7 @@ impl<F: FileOpener> FileStream<F> {

self.file_stream_metrics.time_opening.start();

match self.file_reader.open(self.object_store.clone(), file_meta) {
match self.file_reader.open(file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
Expand Down Expand Up @@ -339,11 +323,7 @@ mod tests {
}

impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
Expand Down Expand Up @@ -375,14 +355,8 @@ mod tests {
output_ordering: None,
};

let file_stream = FileStream::new(
&config,
0,
ctx.task_ctx(),
reader,
ExecutionPlanMetricsSet::new(),
)
.unwrap();
let file_stream =
FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,18 @@ impl ExecutionPlan for NdJsonExec {
options
};

let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
file_schema,
options,
file_compression_type: self.file_compression_type.to_owned(),
object_store,
};

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand Down Expand Up @@ -162,16 +161,14 @@ struct JsonOpener {
options: DecoderOptions,
file_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
}

impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
let store = self.object_store.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
Expand Down Expand Up @@ -306,7 +303,7 @@ mod tests {
file_compression_type: FileCompressionType,
store: Arc<dyn ObjectStore>,
) {
let mut ctx = SessionContext::new();
let ctx = SessionContext::new();
ctx.runtime_env()
.register_object_store("file", "", store.clone());
let filename = "1.json";
Expand Down
Loading

0 comments on commit 23f5e40

Please sign in to comment.