Skip to content

Commit

Permalink
refactor(sink): impl SinkFormatter for AppendOnly and Upsert (#12321)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Sep 18, 2023
1 parent f304ed2 commit 1877aed
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 135 deletions.
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#![feature(async_fn_in_trait)]
#![feature(associated_type_defaults)]
#![feature(impl_trait_in_assoc_type)]
#![feature(iter_from_generator)]

use std::time::Duration;

Expand Down
55 changes: 55 additions & 0 deletions src/connector/src/sink/formatter/append_only.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::Op;

use super::{Result, SinkFormatter, StreamChunk};
use crate::sink::encoder::RowEncoder;
use crate::tri;

pub struct AppendOnlyFormatter<KE, VE> {
key_encoder: KE,
val_encoder: VE,
}

impl<KE, VE> AppendOnlyFormatter<KE, VE> {
pub fn new(key_encoder: KE, val_encoder: VE) -> Self {
Self {
key_encoder,
val_encoder,
}
}
}

impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<KE, VE> {
type K = KE::Output;
type V = VE::Output;

fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
std::iter::from_generator(|| {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
let event_key_object = Some(tri!(self.key_encoder.encode(row)));
let event_object = Some(tri!(self.val_encoder.encode(row)));

yield Ok((event_key_object, event_object))
}
})
}
}
50 changes: 50 additions & 0 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::StreamChunk;

use crate::sink::Result;

mod append_only;
mod upsert;

pub use append_only::AppendOnlyFormatter;
pub use upsert::UpsertFormatter;

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
/// for example append-only, upsert or debezium.
pub trait SinkFormatter {
type K;
type V;

fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
}

/// `tri!` in generators yield `Err` and return `()`
/// `?` in generators return `Err`
#[macro_export]
macro_rules! tri {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => {
yield Err(err);
return;
}
}
};
}
61 changes: 61 additions & 0 deletions src/connector/src/sink/formatter/upsert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::Op;

use super::{Result, SinkFormatter, StreamChunk};
use crate::sink::encoder::RowEncoder;
use crate::tri;

pub struct UpsertFormatter<KE, VE> {
key_encoder: KE,
val_encoder: VE,
}

impl<KE, VE> UpsertFormatter<KE, VE> {
pub fn new(key_encoder: KE, val_encoder: VE) -> Self {
Self {
key_encoder,
val_encoder,
}
}
}

impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<KE, VE> {
type K = KE::Output;
type V = VE::Output;

fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
std::iter::from_generator(|| {
for (op, row) in chunk.rows() {
let event_key_object = Some(tri!(self.key_encoder.encode(row)));

let event_object = match op {
Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))),
// Empty value with a key
Op::Delete => None,
Op::UpdateDelete => {
// upsert semantic does not require update delete event
continue;
}
};

yield Ok((event_key_object, event_object))
}
})
}
}
72 changes: 30 additions & 42 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};

use super::encoder::{JsonEncoder, TimestampHandlingMode};
use super::formatter::{AppendOnlyFormatter, UpsertFormatter};
use super::{
Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::utils::{
gen_append_only_message_stream, gen_debezium_message_stream, gen_upsert_message_stream,
AppendOnlyAdapterOpts, DebeziumAdapterOpts, UpsertAdapterOpts,
};
use crate::sink::utils::{gen_debezium_message_stream, DebeziumAdapterOpts};
use crate::sink::{
DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter,
};
Expand Down Expand Up @@ -460,20 +457,20 @@ impl KafkaSinkWriter {
ret
}

async fn write_json_objects(
async fn write_inner(
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
event_key_object: Option<Vec<u8>>,
event_object: Option<Vec<u8>>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let key_str = event_key_object.unwrap();
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(&key_str);
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
payload = value;
record = record.payload(&payload);
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
Expand Down Expand Up @@ -544,8 +541,11 @@ impl KafkaSinkWriter {
#[for_await]
for msg in dbz_stream {
let (event_key_object, event_object) = msg?;
self.write_json_objects(event_key_object, event_object)
.await?;
self.write_inner(
event_key_object.map(|j| j.to_string().into_bytes()),
event_object.map(|j| j.to_string().into_bytes()),
)
.await?;
}
Ok(())
}
Expand All @@ -559,20 +559,9 @@ impl KafkaSinkWriter {
let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli);

// Initialize the upsert_stream
let upsert_stream = gen_upsert_message_stream(
chunk,
UpsertAdapterOpts::default(),
key_encoder,
val_encoder,
);
let f = UpsertFormatter::new(key_encoder, val_encoder);

#[for_await]
for msg in upsert_stream {
let (event_key_object, event_object) = msg?;
self.write_json_objects(event_key_object, event_object)
.await?;
}
Ok(())
self.write_chunk(chunk, f).await
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
Expand All @@ -584,20 +573,18 @@ impl KafkaSinkWriter {
let val_encoder = JsonEncoder::new(&schema, None, TimestampHandlingMode::Milli);

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
chunk,
AppendOnlyAdapterOpts::default(),
key_encoder,
val_encoder,
);
let f = AppendOnlyFormatter::new(key_encoder, val_encoder);

#[for_await]
for msg in append_only_stream {
let (event_key_object, event_object) = msg?;
self.write_json_objects(event_key_object, event_object)
.await?;
}
Ok(())
self.write_chunk(chunk, f).await
}
}

impl FormattedSink for KafkaSinkWriter {
type K = Vec<u8>;
type V = Vec<u8>;

async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()> {
self.write_inner(k, v).await
}
}

Expand Down Expand Up @@ -652,6 +639,7 @@ mod test {
use risingwave_common::catalog::Field;
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use serde_json::Value;

use super::*;
use crate::sink::utils::*;
Expand Down
Loading

0 comments on commit 1877aed

Please sign in to comment.