Skip to content

Commit

Permalink
fix: handle kafka sink message timeout error (#12350)
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored Sep 17, 2023
1 parent 8ef74ad commit a975d93
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,14 @@ impl KafkaSinkWriter {
Err((e, rec)) => {
record = rec;
match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
| err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => {
tracing::warn!(
"producing message (key {:?}) to topic {} failed, err {:?}, retrying",
record.key.map(|k| k.to_bytes()),
record.topic,
err
);
tokio::time::sleep(self.config.retry_interval).await;
continue;
}
Expand Down

0 comments on commit a975d93

Please sign in to comment.