Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flow perf&fix df func call #4347

Merged
merged 6 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime};

use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use common_config::Configurable;
Expand Down Expand Up @@ -51,7 +51,7 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::repr::{self, DiffRow, Row};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::transform::sql_to_flow_plan;

mod flownode_impl;
Expand Down Expand Up @@ -484,21 +484,32 @@ impl FlowWorkerManager {
}
}

async fn get_buf_size(&self) -> usize {
self.node_context.read().await.get_send_buf_size().await
}

/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
debug!("Starting to run");
let default_interval = Duration::from_secs(1);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
if let Err(err) = self.run_available(true).await {
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
common_telemetry::error!(err;"Run available errors");
}
0
});

// TODO(discord9): error handling
if let Err(err) = self.send_writeback_requests().await {
common_telemetry::error!(err;"Send writeback request errors");
};
self.log_all_errors().await;

// determine if need to shutdown
match &shutdown.as_mut().map(|s| s.try_recv()) {
Some(Ok(())) => {
info!("Shutdown flow's main loop");
Expand All @@ -515,7 +526,25 @@ impl FlowWorkerManager {
}
None => (),
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
// before trigger a run of flow's worker
// (plus one for prevent div by zero)
let wait_for = since_last_run.elapsed();

let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
// rapid increase, slow decay
avg_spd = if cur_spd > avg_spd {
cur_spd
} else {
(9 * avg_spd + cur_spd) / 10
};
debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}
// flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
// FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
Expand All @@ -528,8 +557,10 @@ impl FlowWorkerManager {
///
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// return numbers of rows send to worker
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self, blocking: bool) -> Result<(), Error> {
pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
Expand All @@ -539,35 +570,33 @@ impl FlowWorkerManager {
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now).await?;
} else {
return Ok(());
return Ok(row_cnt);
}
}
// first check how many inputs were sent
// check row send and rows remain in send buf
let (flush_res, buf_len) = if blocking {
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
match self.node_context.try_read() {
Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await),
Err(_) => return Ok(()),
Err(_) => return Ok(row_cnt),
}
};
match flush_res {
Ok(_) => (),
Ok(r) => row_cnt += r,
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if no thing in send buf then break
if buf_len == 0 {
// if not enough rows, break
if buf_len < BATCH_SIZE {
break;
} else {
debug!("Send buf len = {}", buf_len);
}
}

Ok(())
Ok(row_cnt)
}

/// send write request to related source sender
Expand Down
55 changes: 32 additions & 23 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Node context, prone to change with every incoming requests

use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;

use common_telemetry::debug;
Expand Down Expand Up @@ -65,16 +65,20 @@ pub struct FlownodeContext {
/// backpressure and adjust dataflow running duration to avoid blocking
#[derive(Debug)]
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf: RwLock<VecDeque<DiffRow>>,
send_buf_tx: mpsc::UnboundedSender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::UnboundedReceiver<Vec<DiffRow>>>,
}

impl Default for SourceSender {
fn default() -> Self {
let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel();
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf: Default::default(),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
}
}
}
Expand All @@ -86,33 +90,35 @@ impl SourceSender {

/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub async fn try_send_all(&self) -> Result<usize, Error> {
pub async fn try_flush(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let mut send_buf = self.send_buf.write().await;
let mut send_buf = self.send_buf_rx.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
if let Some(row) = send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
if let Some(rows) = send_buf.recv().await {
for row in rows {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
}
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf.read().await.len()
self.send_buf_rx.read().await.len()
discord9 marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand All @@ -121,11 +127,14 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.write().await.extend(rows);

let row_cnt = self.try_send_all().await?;
self.send_buf_tx.send(rows).map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;

Ok(row_cnt)
Ok(0)
}
}

Expand All @@ -150,7 +159,7 @@ impl FlownodeContext {
pub async fn flush_all_sender(&self) -> Result<usize, Error> {
let mut sum = 0;
for sender in self.source_sender.values() {
sender.try_send_all().await.inspect(|x| sum += x)?;
sender.try_flush().await.inspect(|x| sum += x)?;
}
Ok(sum)
}
Expand All @@ -159,7 +168,7 @@ impl FlownodeContext {
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf.read().await.len();
sum += sender.send_buf_rx.read().await.len();
}
sum
}
Expand Down
10 changes: 10 additions & 0 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ fn mfp_subgraph(
// find all updates that need to be send from arrangement
let output_kv = arrange.read().get_updates_in_range(range);

err_collector.run(|| {
snafu::ensure!(
mfp_plan.is_temporal() || output_kv.is_empty(),
crate::expr::error::InternalSnafu {
reason: "Output from future should be empty since temporal filter is not applied"
}
);
Ok(())
});

// the output is expected to be key -> empty val
let output = output_kv
.into_iter()
Expand Down
41 changes: 34 additions & 7 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,39 @@ fn split_row_to_key_val(
}
}

/// split a row into key and val by evaluate the key and val plan
fn batch_split_rows_to_key_val(
rows: impl IntoIterator<Item = DiffRow>,
key_val_plan: KeyValPlan,
err_collector: ErrCollector,
) -> impl IntoIterator<Item = KeyValDiffRow> {
let mut row_buf = Row::new(vec![]);
rows.into_iter().filter_map(
move |(mut row, sys_time, diff): DiffRow| -> Option<KeyValDiffRow> {
err_collector.run(|| {
let len = row.len();
if let Some(key) = key_val_plan
.key_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
{
// reuse the row as buffer
row.inner.resize(len, Value::Null);
// val_plan is not supported to carry any filter predicate,
let val = key_val_plan
.val_plan
.evaluate_into(&mut row.inner, &mut row_buf)?
.context(InternalSnafu {
reason: "val_plan should not contain any filter predicate",
})?;
Ok(Some(((key, val), sys_time, diff)))
} else {
Ok(None)
}
})?
},
)
discord9 marked this conversation as resolved.
Show resolved Hide resolved
}

/// reduce subgraph, reduce the input data into a single row
/// output is concat from key and val
fn reduce_subgraph(
Expand All @@ -204,13 +237,7 @@ fn reduce_subgraph(
send,
}: SubgraphArg,
) {
let mut row_buf = Row::empty();
let key_val = data.into_iter().filter_map(|(row, sys_time, diff)| {
// error is collected and then the row is skipped
err_collector
.run(|| split_row_to_key_val(row, sys_time, diff, key_val_plan, &mut row_buf))
.flatten()
});
let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
// from here for distinct reduce and accum reduce, things are drastically different
// for distinct reduce the arrange store the output,
// but for accum reduce the arrange store the accum state, and output is
Expand Down
8 changes: 2 additions & 6 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
let all = prev_avail.chain(to_send).collect_vec();
if !all.is_empty() || !to_arrange.is_empty() {
debug!(
"Rendered Source All send: {} rows, not yet send: {} rows",
all.len(),
to_arrange.len()
);
if !to_arrange.is_empty() {
debug!("Source Operator buffered {} rows", to_arrange.len());
}
err_collector.run(|| arranged.apply_updates(now, to_arrange));
send.give(all);
Expand Down
4 changes: 4 additions & 0 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ pub struct MfpPlan {
}

impl MfpPlan {
/// Indicates if the `MfpPlan` contains temporal predicates. That is have outputs that may occur in future.
pub fn is_temporal(&self) -> bool {
!self.lower_bounds.is_empty() || !self.upper_bounds.is_empty()
}
/// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, Error> {
let mut lower_bounds = Vec::new();
Expand Down
2 changes: 2 additions & 0 deletions src/flow/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;

pub const BATCH_SIZE: usize = BROADCAST_CAP / 2;

/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
Expand Down
Loading
Loading