Skip to content

Commit

Permalink
[FEAT] Streaming CSV Reads (#2565)
Browse files Browse the repository at this point in the history
Enables streaming csv reads for the new executor.

---------

Co-authored-by: Colin Ho <colinho@Colins-MacBook-Pro.local>
Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
  • Loading branch information
3 people authored Jul 29, 2024
1 parent c24635e commit 4c3d1b5
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 44 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use metadata::read_csv_schema_bulk;
pub use options::{char_to_byte, CsvConvertOptions, CsvParseOptions, CsvReadOptions};
#[cfg(feature = "python")]
use pyo3::prelude::*;
pub use read::{read_csv, read_csv_bulk};
pub use read::{read_csv, read_csv_bulk, stream_csv};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down
121 changes: 121 additions & 0 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,28 @@ pub fn read_csv_bulk(
tables.into_iter().collect::<DaftResult<Vec<_>>>()
}

#[allow(clippy::too_many_arguments)]
pub async fn stream_csv(
uri: &str,
convert_options: Option<CsvConvertOptions>,
parse_options: Option<CsvParseOptions>,
read_options: Option<CsvReadOptions>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Vec<Table>>> + Send> {
stream_csv_single(
uri,
convert_options,
parse_options,
read_options,
io_client,
io_stats,
max_chunks_in_flight,
)
.await
}

// Parallel version of table concat
// get rid of this once Table APIs are parallel
fn tables_concat(mut tables: Vec<Table>) -> DaftResult<Table> {
Expand Down Expand Up @@ -301,6 +323,105 @@ async fn read_csv_single_into_table(
}
}

async fn stream_csv_single(
uri: &str,
convert_options: Option<CsvConvertOptions>,
parse_options: Option<CsvParseOptions>,
read_options: Option<CsvReadOptions>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
max_chunks_in_flight: Option<usize>,
) -> DaftResult<impl Stream<Item = DaftResult<Vec<Table>>> + Send> {
let predicate = convert_options
.as_ref()
.and_then(|opts| opts.predicate.clone());

let limit = convert_options.as_ref().and_then(|opts| opts.limit);

let include_columns = convert_options
.as_ref()
.and_then(|opts| opts.include_columns.clone());

let convert_options_with_predicate_columns = match (convert_options, &predicate) {
(None, _) => None,
(co, None) => co,
(Some(mut co), Some(predicate)) => {
if let Some(ref mut include_columns) = co.include_columns {
let required_columns_for_predicate = get_required_columns(predicate);
for rc in required_columns_for_predicate {
if include_columns.iter().all(|c| c.as_str() != rc.as_str()) {
include_columns.push(rc)
}
}
}
// if we have a limit and a predicate, remove limit for stream
co.limit = None;
Some(co)
}
};

let (chunk_stream, _fields) = read_csv_single_into_stream(
uri,
convert_options_with_predicate_columns.unwrap_or_default(),
parse_options.unwrap_or_default(),
read_options,
io_client,
io_stats,
)
.await?;
// Default max chunks in flight is set to 2x the number of cores, which should ensure pipelining of reading chunks
// with the parsing of chunks on the rayon threadpool.
let max_chunks_in_flight = max_chunks_in_flight.unwrap_or_else(|| {
std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(2).unwrap())
.checked_mul(2.try_into().unwrap())
.unwrap()
.into()
});
// Collect all chunks in chunk x column form.
let tables = chunk_stream
// Limit the number of chunks we have in flight at any given time.
.try_buffered(max_chunks_in_flight);

let filtered_tables = tables.map_ok(move |table| {
if let Some(predicate) = &predicate {
let filtered = table?.filter(&[predicate.clone()])?;
if let Some(include_columns) = &include_columns {
filtered.get_columns(include_columns.as_slice())
} else {
Ok(filtered)
}
} else {
table
}
});

let mut remaining_rows = limit.map(|limit| limit as i64);
let tables = filtered_tables
.try_take_while(move |result| {
match (result, remaining_rows) {
// Limit has been met, early-terminate.
(_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)),
// Limit has not yet been met, update remaining limit slack and continue.
(Ok(table), Some(rows_left)) => {
remaining_rows = Some(rows_left - table.len() as i64);
futures::future::ready(Ok(true))
}
// (1) No limit, never early-terminate.
// (2) Encountered error, propagate error to try_collect to allow it to short-circuit.
(_, None) | (Err(_), _) => futures::future::ready(Ok(true)),
}
})
.map(|r| match r {
Ok(table) => table,
Err(e) => Err(e.into()),
})
// Chunk the tables into chunks of size max_chunks_in_flight.
.try_ready_chunks(max_chunks_in_flight)
.map_err(|e| DaftError::ComputeError(e.to_string()));
Ok(tables)
}

async fn read_csv_single_into_stream(
uri: &str,
convert_options: CsvConvertOptions,
Expand Down
22 changes: 22 additions & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,28 @@ pub fn get_io_client(multi_thread: bool, config: Arc<IOConfig>) -> DaftResult<Ar
}
}

pub async fn get_io_client_async(
multi_thread: bool,
config: Arc<IOConfig>,
) -> DaftResult<Arc<IOClient>> {
let read_handle = CLIENT_CACHE.read().await;
let key = (multi_thread, config.clone());
if let Some(client) = read_handle.get(&key) {
Ok(client.clone())
} else {
drop(read_handle);

let mut w_handle = CLIENT_CACHE.write().await;
if let Some(client) = w_handle.get(&key) {
Ok(client.clone())
} else {
let client = Arc::new(IOClient::new(config.clone())?);
w_handle.insert(key, client.clone());
Ok(client)
}
}
}

pub fn get_runtime(multi_thread: bool) -> DaftResult<Arc<tokio::runtime::Runtime>> {
match multi_thread {
false => {
Expand Down
1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
Expand Down
14 changes: 3 additions & 11 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{env, sync::Arc};
use std::sync::Arc;

use common_error::DaftResult;
use daft_micropartition::MicroPartition;
Expand All @@ -9,7 +9,7 @@ use crate::{
create_channel, create_single_channel, spawn_compute_task, MultiReceiver, MultiSender,
SingleReceiver, SingleSender,
},
NUM_CPUS,
DEFAULT_MORSEL_SIZE, NUM_CPUS,
};

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
Expand All @@ -19,14 +19,6 @@ pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {

dyn_clone::clone_trait_object!(IntermediateOperator);

/// The number of rows that will trigger an intermediate operator to output its data.
fn get_output_threshold() -> usize {
env::var("OUTPUT_THRESHOLD")
.unwrap_or_else(|_| "1000".to_string())
.parse()
.expect("OUTPUT_THRESHOLD must be a number")
}

/// State of an operator task, used to buffer data and output it when a threshold is reached.
pub struct OperatorTaskState {
pub buffer: Vec<Arc<MicroPartition>>,
Expand All @@ -39,7 +31,7 @@ impl OperatorTaskState {
Self {
buffer: vec![],
curr_len: 0,
threshold: get_output_threshold(),
threshold: DEFAULT_MORSEL_SIZE,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

const DEFAULT_MORSEL_SIZE: usize = 1000;

#[cfg(feature = "python")]
use pyo3::prelude::*;

Expand Down
40 changes: 32 additions & 8 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum PipelineNode {
}

impl PipelineNode {
pub fn start(&self, sender: MultiSender) {
pub fn start(self, sender: MultiSender) {
match self {
PipelineNode::Source { source } => {
run_source(source.clone(), sender);
Expand All @@ -64,15 +64,15 @@ impl PipelineNode {
child.start(sender);
}
PipelineNode::SingleInputSink { sink, child } => {
let sender = run_single_input_sink(sink.clone(), sender);
let sender = run_single_input_sink(sink, sender);
child.start(sender);
}
PipelineNode::DoubleInputSink {
sink,
left_child,
right_child,
} => {
let (left_sender, right_sender) = run_double_input_sink(sink.clone(), sender);
let (left_sender, right_sender) = run_double_input_sink(sink, sender);
left_child.start(left_sender);
right_child.start(right_sender);
}
Expand Down Expand Up @@ -184,19 +184,43 @@ pub fn physical_plan_to_pipeline(
input,
aggregations,
group_by,
schema,
..
}) => {
let agg_sink = AggregateSink::new(
aggregations
.iter()
let (first_stage_aggs, second_stage_aggs, final_exprs) =
populate_aggregation_stages(aggregations, schema, group_by);
let first_stage_agg_op = AggregateOperator::new(
first_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
group_by.clone(),
);
let second_stage_agg_sink = AggregateSink::new(
second_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
group_by.clone(),
);
let final_stage_project = ProjectOperator::new(final_exprs);

let child_node = physical_plan_to_pipeline(input, psets);
PipelineNode::SingleInputSink {
sink: Box::new(agg_sink),
let intermediate_agg_op_node = PipelineNode::IntermediateOp {
intermediate_op: Box::new(first_stage_agg_op),
child: Box::new(child_node),
};

let sink_node = PipelineNode::SingleInputSink {
sink: Box::new(second_stage_agg_sink),
child: Box::new(intermediate_agg_op_node),
};

PipelineNode::IntermediateOp {
intermediate_op: Box::new(final_stage_project),
child: Box::new(sink_node),
}
}
LocalPhysicalPlan::Sort(Sort {
Expand Down
4 changes: 1 addition & 3 deletions src/daft-local-execution/src/sinks/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ pub enum SinkResultType {
Finished,
}

pub trait SingleInputSink: Send + Sync + dyn_clone::DynClone {
pub trait SingleInputSink: Send + Sync {
fn sink(&mut self, input: &Arc<MicroPartition>) -> DaftResult<SinkResultType>;
fn in_order(&self) -> bool;
fn finalize(&mut self) -> DaftResult<Vec<Arc<MicroPartition>>>;
}

dyn_clone::clone_trait_object!(SingleInputSink);

pub struct SingleInputSinkActor {
sink: Box<dyn SingleInputSink>,
receiver: MultiReceiver,
Expand Down
Loading

0 comments on commit 4c3d1b5

Please sign in to comment.