Skip to content

Commit

Permalink
[INLONG-11076][Sort] Discard unretryable exception when send kafka fa…
Browse files Browse the repository at this point in the history
…iled
  • Loading branch information
vernedeng committed Sep 10, 2024
1 parent e182772 commit 7a3c339
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.datanucleus.util.StringUtils;
Expand All @@ -42,7 +42,9 @@
import java.util.HashMap;
import java.util.Properties;

/** wrapper of kafka producer */
/**
* wrapper of kafka producer
*/
public class KafkaProducerCluster implements LifecycleAware {

public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
Expand Down Expand Up @@ -73,7 +75,9 @@ public KafkaProducerCluster(
this.handler = sinkContext.createEventHandler();
}

/** start and init kafka producer */
/**
* start and init kafka producer
*/
@Override
public void start() {
if (CommonPropertiesHolder.useUnifiedConfiguration()) {
Expand Down Expand Up @@ -161,7 +165,9 @@ public Properties defaultKafkaProperties() {
return props;
}

/** stop and close kafka producer */
/**
* stop and close kafka producer
*/
@Override
public void stop() {
this.state = LifecycleState.STOP;
Expand All @@ -187,8 +193,8 @@ public LifecycleState getLifecycleState() {
/**
* Send data
*
* @param profileEvent data to send
* @return boolean
* @param profileEvent data to send
* @return boolean
* @throws IOException
*/
public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOException {
Expand Down

0 comments on commit 7a3c339

Please sign in to comment.