Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(flow): use batch mode for flow #4599

Merged
merged 29 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f60124f
generic bundle trait
discord9 Aug 20, 2024
44cf333
feat: impl get/let
discord9 Aug 20, 2024
2cd141f
fix: drop batch
discord9 Aug 21, 2024
4d25319
test: tumble batch
discord9 Aug 21, 2024
4a513bd
feat: use batch eval flow
discord9 Aug 21, 2024
c9f3766
fix: div use arrow::div not mul
discord9 Aug 21, 2024
b039830
perf: not append batch
discord9 Aug 21, 2024
509e3bb
perf: use bool mask for reduce
discord9 Aug 21, 2024
2d07427
perf: tiny opt
discord9 Aug 22, 2024
18b3b8a
perf: refactor slow path
discord9 Aug 22, 2024
e31f25e
feat: opt if then
discord9 Aug 23, 2024
0597fd0
fix: WIP
discord9 Aug 23, 2024
0ccdf98
perf: if then
discord9 Aug 23, 2024
dba7d8f
chore: use trace instead
discord9 Aug 23, 2024
9ba9ea5
fix: reduce missing non-first batch
discord9 Aug 23, 2024
8769b83
perf: flow if then using interleave
discord9 Aug 26, 2024
eead292
docs: add TODO
discord9 Aug 26, 2024
61e4ebe
perf: remove unnecessary eq
discord9 Aug 26, 2024
61544c0
chore: remove unused import
discord9 Aug 26, 2024
a358c9c
fix: run_available no longer loop forever
discord9 Aug 26, 2024
d061762
feat: blocking on high input buf
discord9 Aug 27, 2024
c9a75d8
chore: increase threhold
discord9 Aug 27, 2024
c3ce933
chore: after rebase
discord9 Aug 30, 2024
e5530c8
chore: per review
discord9 Sep 2, 2024
9209eac
chore: per review
discord9 Sep 3, 2024
dd5716c
fix: allow empty values in reduce&test
discord9 Sep 3, 2024
1d9a0b8
tests: more flow doc example tests
discord9 Sep 4, 2024
7f1f409
chore: per review
discord9 Sep 9, 2024
b055f10
chore: per review
discord9 Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 184 additions & 166 deletions src/flow/src/adapter.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::debug;
use common_telemetry::{debug, trace};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -189,7 +189,7 @@ impl Flownode for FlowWorkerManager {
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
debug!("Reordering columns: {:?}", fetch_order)
trace!("Reordering columns: {:?}", fetch_order)
}
fetch_order
};
Expand Down
97 changes: 45 additions & 52 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
//! Node context, prone to change with every incoming requests

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_telemetry::debug;
use common_telemetry::trace;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
Expand All @@ -27,9 +27,9 @@ use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::expr::{Batch, GlobalId};
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP};
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};

/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
Expand All @@ -47,39 +47,36 @@ pub struct FlownodeContext {
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
pub sink_receiver:
BTreeMap<TableName, (mpsc::UnboundedSender<Batch>, mpsc::UnboundedReceiver<Batch>)>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}

/// a simple broadcast sender with backpressure and unbound capacity
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow
///
/// receiver still use tokio broadcast channel, since only sender side need to know
/// backpressure and adjust dataflow running duration to avoid blocking
#[derive(Debug)]
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf_tx: mpsc::Sender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::Receiver<Vec<DiffRow>>>,
sender: broadcast::Sender<Batch>,
send_buf_tx: mpsc::Sender<Batch>,
send_buf_rx: RwLock<mpsc::Receiver<Batch>>,
send_buf_row_cnt: AtomicUsize,
}

impl Default for SourceSender {
fn default() -> Self {
// TODO(discord9): the capacity is arbitrary, we can adjust it later, might also want to limit the max number of rows in send buf
let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP);
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
sender: broadcast::Sender::new(SEND_BUF_CAP),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
send_buf_row_cnt: AtomicUsize::new(0),
Expand All @@ -90,7 +87,7 @@ impl Default for SourceSender {
impl SourceSender {
/// max number of iterations to try flush send buf
const MAX_ITERATIONS: usize = 16;
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
pub fn get_receiver(&self) -> broadcast::Receiver<Batch> {
self.sender.subscribe()
}

Expand All @@ -106,30 +103,27 @@ impl SourceSender {
break;
}
// TODO(discord9): send rows instead so it's just moving a point
if let Some(rows) = send_buf.recv().await {
let len = rows.len();
self.send_buf_row_cnt
.fetch_sub(len, std::sync::atomic::Ordering::SeqCst);
for row in rows {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
if let Some(batch) = send_buf.recv().await {
let len = batch.row_count();
self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst);
row_cnt += len;
self.sender
.send(batch)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
trace!("Source Flushed {} rows", row_cnt);
METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf_rx.read().await.len()
trace!(
"Remaining Source Send buf.len() = {}",
METRIC_FLOW_INPUT_BUF_SIZE.get()
);
}

Expand All @@ -138,12 +132,23 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf_tx.send(rows).await.map_err(|e| {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}
// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect())
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;

Ok(0)
}
}
Expand All @@ -159,8 +164,6 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;

debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows).await
}

Expand All @@ -174,16 +177,6 @@ impl FlownodeContext {
}
Ok(sum)
}

/// Return the sum number of rows in all send buf
/// TODO(discord9): remove this since we can't get correct row cnt anyway
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf_rx.read().await.len();
}
sum
}
}

impl FlownodeContext {
Expand Down Expand Up @@ -230,7 +223,7 @@ impl FlownodeContext {
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
.or_insert_with(mpsc::unbounded_channel);
}

pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> {
Expand All @@ -254,7 +247,7 @@ impl FlownodeContext {
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
) -> Result<mpsc::UnboundedSender<Batch>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)
Expand Down
34 changes: 18 additions & 16 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use crate::adapter::FlowId;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::expr::{Batch, GlobalId};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow};

Expand Down Expand Up @@ -89,6 +89,8 @@ impl<'subgraph> ActiveDataflowState<'subgraph> {
err_collector: self.err_collector.clone(),
input_collection: Default::default(),
local_scope: Default::default(),
input_collection_batch: Default::default(),
local_scope_batch: Default::default(),
}
}

Expand Down Expand Up @@ -156,13 +158,13 @@ impl WorkerHandle {
///
/// the returned error is unrecoverable, and the worker should be shutdown/rebooted
pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> {
common_telemetry::debug!("Running available with blocking={}", blocking);
common_telemetry::trace!("Running available with blocking={}", blocking);
if blocking {
let resp = self
.itc_client
.call_with_resp(Request::RunAvail { now, blocking })
.await?;
common_telemetry::debug!("Running available with response={:?}", resp);
common_telemetry::trace!("Running available with response={:?}", resp);
Ok(())
} else {
self.itc_client
Expand Down Expand Up @@ -225,9 +227,9 @@ impl<'s> Worker<'s> {
flow_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
sink_sender: mpsc::UnboundedSender<Batch>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
Expand All @@ -249,12 +251,12 @@ impl<'s> Worker<'s> {
{
let mut ctx = cur_task_state.new_ctx(sink_id);
for (source_id, src_recv) in source_ids.iter().zip(src_recvs) {
let bundle = ctx.render_source(src_recv)?;
ctx.insert_global(*source_id, bundle);
let bundle = ctx.render_source_batch(src_recv)?;
ctx.insert_global_batch(*source_id, bundle);
}

let rendered = ctx.render_plan(plan)?;
ctx.render_unbounded_sink(rendered, sink_sender);
let rendered = ctx.render_plan_batch(plan)?;
ctx.render_unbounded_sink_batch(rendered, sink_sender);
}
self.task_states.insert(flow_id, cur_task_state);
Ok(Some(flow_id))
Expand Down Expand Up @@ -370,9 +372,9 @@ pub enum Request {
flow_id: FlowId,
plan: TypedPlan,
sink_id: GlobalId,
sink_sender: mpsc::UnboundedSender<DiffRow>,
sink_sender: mpsc::UnboundedSender<Batch>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
Expand Down Expand Up @@ -472,7 +474,7 @@ mod test {
use super::*;
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};
use crate::repr::RelationType;

#[test]
fn drop_handle() {
Expand All @@ -497,8 +499,8 @@ mod test {
});
let handle = rx.await.unwrap();
let src_ids = vec![GlobalId::User(1)];
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
let (tx, rx) = broadcast::channel::<Batch>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<Batch>();
let (flow_id, plan) = (
1,
TypedPlan {
Expand All @@ -523,9 +525,9 @@ mod test {
handle.create_flow(create_reqs).await.unwrap(),
Some(flow_id)
);
tx.send((Row::empty(), 0, 0)).unwrap();
tx.send(Batch::empty()).unwrap();
handle.run_available(0, true).await.unwrap();
assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty());
assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty());
drop(handle);
worker_thread_handle.join().unwrap();
}
Expand Down
Loading
Loading