Skip to content

Commit

Permalink
feat(stream): support source throttling (#12295)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Sep 14, 2023
1 parent 5ffd58d commit 4525e67
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 142 deletions.
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET RW_STREAMING_RATE_LIMIT TO 10000;
SET STREAMING_RATE_LIMIT TO 10000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ message StreamSource {
map<string, string> properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
optional uint32 rate_limit = 9;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const CONFIG_KEYS: [&str; 37] = [
"LOCK_TIMEOUT",
"ROW_SECURITY",
"STANDARD_CONFORMING_STRINGS",
"RW_STREAMING_RATE_LIMIT",
"STREAMING_RATE_LIMIT",
"CDC_BACKFILL",
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
];
Expand Down Expand Up @@ -112,7 +112,7 @@ const STATEMENT_TIMEOUT: usize = 30;
const LOCK_TIMEOUT: usize = 31;
const ROW_SECURITY: usize = 32;
const STANDARD_CONFORMING_STRINGS: usize = 33;
const RW_STREAMING_RATE_LIMIT: usize = 34;
const STREAMING_RATE_LIMIT: usize = 34;
const CDC_BACKFILL: usize = 35;
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;

Expand Down Expand Up @@ -337,7 +337,7 @@ type StatementTimeout = ConfigI32<STATEMENT_TIMEOUT, 0>;
type LockTimeout = ConfigI32<LOCK_TIMEOUT, 0>;
type RowSecurity = ConfigBool<ROW_SECURITY, true>;
type StandardConformingStrings = ConfigString<STANDARD_CONFORMING_STRINGS>;
type StreamingRateLimit = ConfigU64<RW_STREAMING_RATE_LIMIT, 0>;
type StreamingRateLimit = ConfigU64<STREAMING_RATE_LIMIT, 0>;
type CdcBackfill = ConfigBool<CDC_BACKFILL, false>;

/// Report status or notice to caller.
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ pub fn to_stream_prost_body(
log_store_type: SinkLogStoreType::InMemoryLogStore as i32,
}),
Node::Source(me) => {
// TODO(kwannoel): Is branch used, seems to be a duplicate of stream_source?
let rate_limit = me.ctx().session_ctx().config().get_streaming_rate_limit();
let me = &me.core.catalog;
let source_inner = me.as_ref().map(|me| StreamSource {
source_id: me.id,
Expand All @@ -748,6 +750,7 @@ pub fn to_stream_prost_body(
row_id_index: me.row_id_index.map(|index| index as _),
columns: me.columns.iter().map(|c| c.to_protobuf()).collect(),
properties: me.properties.clone().into_iter().collect(),
rate_limit,
});
PbNodeBody::Source(SourceNode { source_inner })
}
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::stream_plan::{PbStreamSource, SourceNode};
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, StreamNode};
use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -86,6 +87,12 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
properties: source_catalog.properties.clone().into_iter().collect(),
rate_limit: self
.base
.ctx()
.session_ctx()
.config()
.get_streaming_rate_limit(),
});
PbNodeBody::Source(SourceNode { source_inner })
}
Expand Down
12 changes: 7 additions & 5 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter};
use std::num::NonZeroU32;

use governor::clock::MonotonicClock;
use governor::{Quota, RateLimiter};
use governor::{InsufficientCapacity, Quota, RateLimiter};
use risingwave_common::catalog::Schema;

use super::*;
Expand Down Expand Up @@ -58,10 +58,12 @@ impl FlowControlExecutor {
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await;
assert!(
result.is_ok(),
"the capacity of rate_limiter must be larger than the cardinality of chunk"
);
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {} smaller than chunk cardinality {n}",
self.rate_limit,
);
}
}
yield Message::Chunk(chunk);
}
Expand Down
Loading

0 comments on commit 4525e67

Please sign in to comment.