Skip to content

Commit

Permalink
make arc drop in the single thread finally.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 6, 2024
1 parent 2b56774 commit 3b393d3
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -38,12 +37,15 @@ use crate::physical_plan::RecordBatchStream;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use chrono::TimeZone;
use datafusion_common::instant::Instant;
use datafusion_common::ScalarValue;

use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type FileOpenFuture =
Expand Down Expand Up @@ -76,7 +78,8 @@ pub trait FileOpener: Unpin {
/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream<F: FileOpener> {
/// An iterator over input files.
file_iter: VecDeque<PartitionedFile>,
files: Vec<PartitionedFile>,
cur_file_idx: usize,
/// The stream schema (file schema including partition columns and after
/// projection).
projected_schema: SchemaRef,
Expand Down Expand Up @@ -263,7 +266,8 @@ impl<F: FileOpener> FileStream<F> {
let files = config.file_groups[partition].clone();

Ok(Self {
file_iter: files.into(),
files,
cur_file_idx: 0,
projected_schema,
remain: config.limit,
file_opener,
Expand All @@ -289,18 +293,36 @@ impl<F: FileOpener> FileStream<F> {
/// Since file opening is mostly IO (and may involve a
/// bunch of sequential IO), it can be parallelized with decoding.
fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
let part_file = self.file_iter.pop_front()?;
if self.cur_file_idx == self.files.len() {
return None;
}

let part_file = &mut self.files[self.cur_file_idx];
self.cur_file_idx += 1;

let object_meta = mem::replace(
&mut part_file.object_meta,
ObjectMeta {
location: Path::default(),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
},
);

let partition_values = mem::take(&mut part_file.partition_values);

let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
object_meta,
range: part_file.range.clone(),
extensions: part_file.extensions.clone(),
};

Some(
self.file_opener
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
.map(|future| (future, partition_values)),
)
}

Expand Down

0 comments on commit 3b393d3

Please sign in to comment.