Skip to content

Commit

Permalink
[INLONG-11068][Sort] Optimize kafka producer parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Sep 9, 2024
1 parent 32f6336 commit cae8260
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/** wrapper of kafka producer */
public class KafkaProducerCluster implements LifecycleAware {

public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerCluster.class);

private static final String KEY_DEFAULT_SELECTOR = "sink.kafka.selector.default";
private static final String KEY_PRODUCER_CLOSE_TIMEOUT = "sink.kafka.producer.close.timeout";

private final String workerName;
protected final KafkaNodeConfig nodeConfig;
protected final CacheClusterConfig cacheClusterConfig;
Expand Down Expand Up @@ -86,7 +92,6 @@ private void startByCacheCluster() {
try {
Properties props = defaultKafkaProperties();
props.putAll(cacheClusterConfig.getParams());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName());
props.put(ProducerConfig.ACKS_CONFIG,
cacheClusterConfig.getParams().getOrDefault(ProducerConfig.ACKS_CONFIG, "all"));

Expand All @@ -112,7 +117,6 @@ private void startByNodeConfig() {
try {
Properties props = defaultKafkaProperties();
props.putAll(nodeConfig.getProperties() == null ? new HashMap<>() : nodeConfig.getProperties());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, nodeConfig.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId() + "-" + workerName);
Expand All @@ -126,11 +130,15 @@ private void startByNodeConfig() {

public Properties defaultKafkaProperties() {
Properties props = new Properties();

if (!CommonPropertiesHolder.getBoolean(KEY_DEFAULT_SELECTOR, false)) {
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName());
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "122880");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "44740000");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "86400000");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "300000");
props.put(ProducerConfig.LINGER_MS_CONFIG, "500");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
Expand Down Expand Up @@ -158,8 +166,9 @@ public Properties defaultKafkaProperties() {
public void stop() {
this.state = LifecycleState.STOP;
try {
LOG.info("stop kafka producer");
producer.close();
long timeout = CommonPropertiesHolder.getLong(KEY_PRODUCER_CLOSE_TIMEOUT, 60L);
LOG.info("stop kafka producer, timeout={}", timeout);
producer.close(Duration.ofSeconds(timeout));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
Expand Down

0 comments on commit cae8260

Please sign in to comment.