Skip to content

Commit

Permalink
refactor(kafka-sink): change sequential await to group await when com…
Browse files Browse the repository at this point in the history
…mitting (#12013)

Signed-off-by: Runji Wang <wangrunji0408@163.com>
Co-authored-by: Runji Wang <wangrunji0408@163.com>
Co-authored-by: William Wen <william123.wen@gmail.com>
  • Loading branch information
3 people authored and Li0k committed Sep 15, 2023
1 parent f957f90 commit 2bbe5bf
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ aws-types = "0.55"
etcd-client = { package = "madsim-etcd-client", version = "0.3" }
futures-async-stream = "0.2"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = [
rdkafka = { package = "madsim-rdkafka", git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = [
Expand Down
183 changes: 121 additions & 62 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::{Future, FutureExt};
use futures_async_stream::for_await;
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::message::ToBytes;
Expand Down Expand Up @@ -327,6 +329,11 @@ enum KafkaSinkState {
Running(u64),
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536;

pub struct KafkaSinkWriter {
pub config: KafkaConfig,
pub inner: FutureProducer<PrivateLinkProducerContext>,
Expand All @@ -335,6 +342,7 @@ pub struct KafkaSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
future_delivery_buffer: VecDeque<DeliveryFuture>,
db_name: String,
sink_from_name: String,
}
Expand Down Expand Up @@ -382,104 +390,147 @@ impl KafkaSinkWriter {
schema,
pk_indices,
is_append_only,
future_delivery_buffer: VecDeque::new(),
db_name,
sink_from_name,
})
}

/// The wrapper function for the actual `FutureProducer::send_result`
/// Just for better error handling purpose
#[expect(clippy::unused_async)]
async fn send_result_inner<'a, K, P>(
&'a self,
record: FutureRecord<'a, K, P>,
) -> core::result::Result<DeliveryFuture, (KafkaError, FutureRecord<'a, K, P>)>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.inner.send_result(record)
}

/// The actual `send_result` function, will be called when the `KafkaSinkWriter` needs to sink
/// messages
async fn send_result<'a, K, P>(&'a self, mut record: FutureRecord<'a, K, P>) -> KafkaResult<()>
async fn send_result<'a, K, P>(
&'a mut self,
mut record: FutureRecord<'a, K, P>,
) -> KafkaResult<()>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
// The error to be returned
let mut err = KafkaError::Canceled;
let mut success_flag = false;

let mut ret = Ok(());

for _ in 0..self.config.max_retry_num {
match self.send_result_inner(record).await {
Ok(delivery_future) => match delivery_future.await {
Ok(delivery_future_result) => match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
Ok(_) => return Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Err((k_err, _msg)) => {
err = k_err;
break;
}
},
// Nothing to do here, since the err has already been set to
// KafkaError::Canceled. This represents the producer is dropped
// before the delivery status is received
Err(_) => break,
},
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
.expect("Expect the future not to be None")
.await,
)?;
}

self.future_delivery_buffer.push_back(delivery_future);
success_flag = true;
break;
}
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
err = e;
record = rec;
match err {
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
_ => break,
_ => return Err(e),
}
}
}
}

Err(err)
if !success_flag {
// In this case, after trying `max_retry_num`
// The enqueue buffer is still full
ret = Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull));
}

ret
}

async fn write_json_objects(
&self,
&mut self,
event_key_object: Option<Value>,
event_object: Option<Value>,
) -> Result<()> {
let topic = self.config.common.topic.clone();
// here we assume the key part always exists and value part is optional.
// if value is None, we will skip the payload part.
let key_str = event_key_object.unwrap().to_string();
let mut record = FutureRecord::<[u8], [u8]>::to(self.config.common.topic.as_str())
.key(key_str.as_bytes());
let mut record = FutureRecord::<[u8], [u8]>::to(topic.as_str()).key(key_str.as_bytes());
let payload;
if let Some(value) = event_object {
payload = value.to_string();
record = record.payload(payload.as_bytes());
}
// Send the data but not wait it to finish sinking
// Will join all `DeliveryFuture` during commit
self.send_result(record).await?;
Ok(())
}

async fn debezium_update(&self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
fn map_future_result(
delivery_future_result: <DeliveryFuture as Future>::Output,
) -> KafkaResult<()> {
match delivery_future_result {
// Successfully sent the record
// Will return the partition and offset of the message (i32, i64)
// Note that `Vec<()>` won't cause memory allocation
Ok(Ok(_)) => Ok(()),
// If the message failed to be delivered. (i.e., flush)
// The error & the copy of the original message will be returned
// i.e., (KafkaError, OwnedMessage)
// We will just stop the loop, and return the error
// The sink executor will back to the latest checkpoint
Ok(Err((k_err, _msg))) => Err(k_err),
// This represents the producer is dropped
// before the delivery status is received
// Return `KafkaError::Canceled`
Err(_) => Err(KafkaError::Canceled),
}
}

async fn commit_inner(&mut self) -> Result<()> {
let _v = try_join_all(
self.future_delivery_buffer
.drain(..)
.map(|delivery_future| {
delivery_future.map(|delivery_future_result| {
Self::map_future_result(delivery_future_result).map_err(SinkError::Kafka)
})
}),
)
.await?;

// Sanity check
debug_assert!(
self.future_delivery_buffer.is_empty(),
"The buffer after `commit_inner` must be empty"
);

Ok(())
}

async fn debezium_update(&mut self, chunk: StreamChunk, ts_ms: u64) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();
let db_name = self.db_name.clone();
let sink_from_name = self.sink_from_name.clone();

// Initialize the dbz_stream
let dbz_stream = gen_debezium_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
ts_ms,
DebeziumAdapterOpts::default(),
&self.db_name,
&self.sink_from_name,
&db_name,
&sink_from_name,
);

#[for_await]
Expand All @@ -491,13 +542,14 @@ impl KafkaSinkWriter {
Ok(())
}

async fn upsert(&self, chunk: StreamChunk) -> Result<()> {
let upsert_stream = gen_upsert_message_stream(
&self.schema,
&self.pk_indices,
chunk,
UpsertAdapterOpts::default(),
);
async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the upsert_stream
let upsert_stream =
gen_upsert_message_stream(&schema, &pk_indices, chunk, UpsertAdapterOpts::default());

#[for_await]
for msg in upsert_stream {
Expand All @@ -508,10 +560,15 @@ impl KafkaSinkWriter {
Ok(())
}

async fn append_only(&self, chunk: StreamChunk) -> Result<()> {
async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
// TODO: Remove the clones here, only to satisfy borrow checker at present
let schema = self.schema.clone();
let pk_indices = self.pk_indices.clone();

// Initialize the append_only_stream
let append_only_stream = gen_append_only_message_stream(
&self.schema,
&self.pk_indices,
&schema,
&pk_indices,
chunk,
AppendOnlyAdapterOpts::default(),
);
Expand Down Expand Up @@ -561,6 +618,8 @@ impl SinkWriterV1 for KafkaSinkWriter {
}

async fn commit(&mut self) -> Result<()> {
// Group delivery (await the `FutureRecord`) here
self.commit_inner().await?;
Ok(())
}

Expand Down Expand Up @@ -701,7 +760,7 @@ mod test {
}

/// Note: Please enable the kafka by running `./risedev configure` before commenting #[ignore]
/// to run the test
/// to run the test, also remember to modify `risedev.yml`
#[ignore]
#[tokio::test]
async fn test_kafka_producer() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
mio = { version = "0.8", features = ["net", "os-ext"] }
multimap = { version = "0.8" }
Expand Down Expand Up @@ -159,7 +159,7 @@ lexical-write-integer = { version = "0.8", default-features = false, features =
libc = { version = "0.2", features = ["extra_traits"] }
lock_api = { version = "0.4", features = ["arc_lock"] }
log = { version = "0.4", default-features = false, features = ["std"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "bb8f063", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] }
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
mio = { version = "0.8", features = ["net", "os-ext"] }
multimap = { version = "0.8" }
Expand Down

0 comments on commit 2bbe5bf

Please sign in to comment.