Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka backfill frontend #14465

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ if ${is_not_ci}
no_rust_log = not ${rust_log}

if ${no_rust_log}
set_env RUST_LOG "pgwire_query_log=info"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info"
else
set_env RUST_LOG "pgwire_query_log=info,${rust_log}"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info,${rust_log}"
end
end
''',
Expand Down
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ENV PATH /root/.cargo/bin/:$PATH
RUN rustup show
RUN rustup default `rustup show active-toolchain | awk '{print $1}'`

RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.4.0/buf-$(uname -s)-$(uname -m).tar.gz" | \
RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.29.0/buf-$(uname -s)-$(uname -m).tar.gz" | \
tar -xvzf - -C /usr/local --strip-components 1

# install python dependencies
Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240223
export BUILD_ENV_VERSION=v20240229

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
- mysql
- db
Expand All @@ -98,12 +98,12 @@ services:


rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -114,7 +114,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
db:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Upstream Status",
"Monitor each source upstream, 0 means the upstream is not normal, 1 means the source is ready.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized executor actor per second.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
bool cdc_source_job = 13;
// Whether the stream source has a streaming job.
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// Currently, the following sources have streaming jobs:
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka, Pulsar, Kinesis, etc.)
bool has_streaming_job = 13;
// Only used when `has_streaming_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
bool is_distributed = 15;
reserved "cdc_source_job"; // deprecated
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
19 changes: 19 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ message StreamFsFetchNode {
StreamFsFetch node_inner = 1;
}

message SourceBackfillNode {
uint32 source_id = 1;
optional uint32 row_id_index = 3;
// XXX: is this all columns or only required columns?
repeated plan_common.ColumnCatalog columns = 4;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
map<string, string> with_properties = 6;
// Streaming rate limit
// optional uint32 rate_limit = 9;

// fields above are the same as StreamSource

// `| partition_id | backfill_progress |`
catalog.Table state_table = 2;
}

message SinkDesc {
reserved 4;
reserved "columns";
Expand Down Expand Up @@ -770,6 +787,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SubscriptionNode subscription = 141;
SourceBackfillNode source_backfill = 142;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -865,6 +883,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 1024;
}

// The streaming context associated with a stream plan
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -549,7 +550,7 @@ impl<L> tonic::transport::server::Router<L> {
config.tcp_nodelay,
config.keepalive_duration,
)
.unwrap();
.unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report()));
let incoming = MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ pub struct ConfigMap {
#[parameter(default = false)]
background_ddl: bool,

/// Run DDL statements in background
#[parameter(default = true)]
enable_reusable_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
server_encoding: String,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ macro_rules! impl_set_system_param {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,13 +724,17 @@ macro_rules! impl_convert {

paste! {
impl ScalarImpl {
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<as_ $suffix_name>](&self) -> &$scalar {
match self {
Self::$variant_name(ref scalar) => scalar,
other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
}
}

/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar {
match self {
Self::$variant_name(scalar) => scalar,
Expand All @@ -740,7 +744,8 @@ macro_rules! impl_convert {
}

impl <'scalar> ScalarRefImpl<'scalar> {
// Note that this conversion consume self.
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
match self {
Self::$variant_name(inner) => inner,
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/util/iter_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,29 @@ where
{
a.into_iter().zip_eq_fast(b)
}

pub trait IntoIteratorExt
where
for<'a> &'a Self: IntoIterator,
{
/// Shorter version of `self.iter().map(f).collect()`.
fn map_collect<A, B, F, BCollection>(&self, f: F) -> BCollection
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
BCollection: FromIterator<B>,
{
self.into_iter().map(f).collect()
}

/// Shorter version of `self.iter().map(f).collect_vec()`.
fn map_to_vec<A, B, F>(&self, f: F) -> Vec<B>
where
F: FnMut(&A) -> B,
for<'a> &'a Self: IntoIterator<Item = &'a A>,
{
self.map_collect(f)
}
}

impl<T> IntoIteratorExt for T where for<'a> &'a Self: IntoIterator {}
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
61 changes: 61 additions & 0 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,67 @@ pub fn build_additional_column_catalog(
Ok(catalog)
}

pub fn add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = columns
.iter()
.map(|c| c.column_desc.column_id)
.max()
.unwrap_or(ColumnId::placeholder());

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(&*connector_name)
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_catalog(
last_column_id,
&connector_name,
col_type,
None,
None,
None,
false,
)
.unwrap(),
)
} else {
None
}
})
.collect()
};
assert_eq!(additional_columns.len(), 2);

// Check if partition/file/offset columns are included explicitly.
for col in columns {
use risingwave_pb::plan_common::additional_column::ColumnType;
match col.column_desc.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
} => {
columns_exist[0] = true;
}
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
} => {
columns_exist[1] = true;
}
_ => (),
}
}

(columns_exist, additional_columns.try_into().unwrap())
}

fn build_header_catalog(
column_id: ColumnId,
col_name: &str,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ pub enum ByteStreamSourceParserImpl {
pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
/// Converts this `SourceMessage` stream into a stream of [`StreamChunk`].
pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
#[auto_enum(futures03::Stream)]
let stream = match self {
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
fn init_from_pb_source(&mut self, _source: &PbSource) {}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
}

Expand Down Expand Up @@ -443,10 +445,12 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kinesis(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
}
Expand Down Expand Up @@ -575,6 +579,7 @@ pub type DataType = risingwave_common::types::DataType;
pub struct Column {
pub name: String,
pub data_type: DataType,
/// This field is only used by datagen.
pub is_visible: bool,
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_multi_table_shared = info.has_streaming_job;
}
}

Expand Down
Loading
Loading