Skip to content

Commit

Permalink
feat: add kafka backfill frontend (#15602)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Apr 2, 2024
1 parent c2b0f58 commit 2afbb6f
Show file tree
Hide file tree
Showing 56 changed files with 1,275 additions and 308 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ if ${is_not_ci}
no_rust_log = not ${rust_log}
if ${no_rust_log}
set_env RUST_LOG "pgwire_query_log=info"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info"
else
set_env RUST_LOG "pgwire_query_log=info,${rust_log}"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info,${rust_log}"
end
end
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ user rw_batch_enable_lookup_join
user rw_batch_enable_sort_agg
user rw_enable_join_ordering
user rw_enable_share_plan
user rw_enable_shared_source
user rw_enable_two_phase_agg
user rw_force_split_distinct_agg
user rw_force_two_phase_agg
Expand Down
7 changes: 7 additions & 0 deletions e2e_test/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
58 changes: 58 additions & 0 deletions e2e_test/source/basic/kafka_shared_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

statement ok
SET rw_enable_shared_source TO true;

statement ok
create source s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic',
properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_1 as select * from s0;

statement ok
SET rw_enable_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select v1, v2 from s0;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_1;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_2;
----
1 1
2 22
3 333
4 4444

# TODO: add more data to the topic and re-check the data. Currently there's no good test infra to do this...
# To test the correctness of source backfill, we might need to keep producing data during an interval, to let it go
# through the backfill stage to the forward stage.

statement ok
drop source s0 cascade;
17 changes: 15 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,22 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
// **This field should now be called `is_shared`.** Not renamed for backwards compatibility.
//
// Whether the stream source is a shared source (it has a streaming job).
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
//
// Currently, the following sources can be shared:
//
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka)
bool cdc_source_job = 13;
// Only used when `cdc_source_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
//
// - Direct CDC sources: `false`
// - MQ sources (Kafka): `true`
bool is_distributed = 15;
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ message DropViewResponse {
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
// And one may add other types to support Table jobs that based on other shared sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ message StreamFsFetchNode {
}

message SourceBackfillNode {
uint32 source_id = 1;
uint32 upstream_source_id = 1;
optional uint32 row_id_index = 2;
repeated plan_common.ColumnCatalog columns = 3;
catalog.StreamSourceInfo info = 4;
Expand Down Expand Up @@ -876,13 +876,14 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_MVIEW = 2;
FRAGMENT_TYPE_FLAG_SINK = 4;
FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
// Include StreamScan and StreamCdcScan
FRAGMENT_TYPE_FLAG_STREAM_SCAN = 16;
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 1024;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;
}

// The streaming context associated with a stream plan
Expand Down
5 changes: 5 additions & 0 deletions scripts/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This folder contains scripts to prepare data for testing sources.

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
3 changes: 2 additions & 1 deletion src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -548,7 +549,7 @@ impl<L> tonic::transport::server::Router<L> {
config.tcp_nodelay,
config.keepalive_duration,
)
.unwrap();
.unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report()));
let incoming = MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#![feature(negative_impls)]
#![feature(bound_map)]
#![feature(array_methods)]
#![feature(register_tool)]
#![register_tool(rw)]

#[cfg_attr(not(test), allow(unused_extern_crates))]
extern crate self as risingwave_common;
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ pub struct ConfigMap {
#[parameter(default = false)]
background_ddl: bool,

/// Enable shared source. Currently only for Kafka.
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false)]
rw_enable_shared_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
server_encoding: String,
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ macro_rules! impl_set_system_param {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
#[allow(rw::format_error)]
v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
assert_matches = "1"
async-nats = "0.34"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
78 changes: 78 additions & 0 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,84 @@ pub fn build_additional_column_catalog(
Ok(catalog)
}

/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
///
/// ## Returns
/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
pub fn add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = columns
.iter()
.map(|c| c.column_desc.column_id)
.max()
.unwrap_or(ColumnId::placeholder());

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_catalog(
last_column_id,
connector_name,
col_type,
None,
None,
None,
false,
)
.unwrap(),
)
} else {
None
}
})
.collect()
};
assert_eq!(additional_columns.len(), 2);
use risingwave_pb::plan_common::additional_column::ColumnType;
assert_matches::assert_matches!(
additional_columns[0].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
}
);
assert_matches::assert_matches!(
additional_columns[1].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
}
);

// Check if partition/file/offset columns are included explicitly.
for col in columns {
match col.column_desc.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
} => {
columns_exist[0] = true;
}
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
} => {
columns_exist[1] = true;
}
_ => (),
}
}

(columns_exist, additional_columns.try_into().unwrap())
}

fn build_header_catalog(
column_id: ColumnId,
col_name: &str,
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
fn init_from_pb_source(&mut self, _source: &PbSource) {}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
}

Expand Down Expand Up @@ -366,10 +368,12 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kinesis(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_cdc_source_job = info.cdc_source_job;
self.is_cdc_source_job = info.is_shared();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl BrokerAddrRewriter {
role: PrivateLinkContextRole,
broker_rewrite_map: Option<HashMap<String, String>>,
) -> ConnectorResult<Self> {
tracing::info!("[{}] rewrite map {:?}", role, broker_rewrite_map);
let rewrite_map: ConnectorResult<BTreeMap<BrokerAddr, BrokerAddr>> = broker_rewrite_map
.map_or(Ok(BTreeMap::new()), |addr_map| {
tracing::info!("[{}] rewrite map {:?}", role, addr_map);
addr_map
.into_iter()
.map(|(old_addr, new_addr)| {
Expand Down
Loading

0 comments on commit 2afbb6f

Please sign in to comment.