From 6c4629de7bbbd868245b24920cbff1d72b18f416 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Wed, 10 Jul 2024 13:49:54 +0800 Subject: [PATCH] [INLONG-10954][Sort] Provide default kafka producer configuration --- .../config/pojo/CacheClusterConfig.java | 39 +------- .../kafka/KafkaFederationSinkContext.java | 11 +++ .../sink/kafka/KafkaProducerCluster.java | 95 +++++++++++++++---- .../sink/kafka/KafkaProducerFederation.java | 31 +++++- .../pulsar/PulsarFederationSinkContext.java | 8 ++ 5 files changed, 130 insertions(+), 54 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java index caee067831c..83ae055e0df 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.standalone.config.pojo; +import lombok.Data; + import java.util.HashMap; import java.util.Map; @@ -24,45 +26,10 @@ * * CacheClusterConfig */ +@Data public class CacheClusterConfig { private String clusterName; private Map params = new HashMap<>(); - /** - * get clusterName - * - * @return the clusterName - */ - public String getClusterName() { - return clusterName; - } - - /** - * set clusterName - * - * @param clusterName the clusterName to set - */ - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - /** - * get params - * - * @return the params - */ - public Map getParams() { - return params; - } - - /** - * set params - * - * @param params the params to set - */ - public void setParams(Map params) { - this.params = params; - } - } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java index 739b214b1e9..d93194bc2e2 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java @@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; @@ -52,6 +53,7 @@ public class KafkaFederationSinkContext extends SinkContext { public static final String KEY_EVENT_HANDLER = "eventHandler"; private KafkaNodeConfig kafkaNodeConfig; + private CacheClusterConfig cacheClusterConfig; private Map idConfigMap = new ConcurrentHashMap<>(); public KafkaFederationSinkContext(String sinkName, Context context, Channel channel) { @@ -79,6 +81,11 @@ public void reload() { this.kafkaNodeConfig = requestNodeConfig; } + CacheClusterConfig clusterConfig = new CacheClusterConfig(); + clusterConfig.setClusterName(this.taskName); + clusterConfig.setParams(this.sortTaskConfig.getSinkParams()); + this.cacheClusterConfig = clusterConfig; + this.taskConfig = newTaskConfig; this.sortTaskConfig = newSortTaskConfig; @@ -121,6 +128,10 @@ public KafkaNodeConfig getNodeConfig() { return kafkaNodeConfig; } + public CacheClusterConfig getCacheClusterConfig() { + return cacheClusterConfig; + } + /** * get Topic by uid * diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java index d0fda7c5aa6..430cdbd38c4 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java @@ -19,12 +19,12 @@ import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.utils.Constants; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.flume.Context; import org.apache.flume.Transaction; import org.apache.flume.lifecycle.LifecycleAware; import org.apache.flume.lifecycle.LifecycleState; @@ -45,8 +45,8 @@ public class KafkaProducerCluster implements LifecycleAware { private final String workerName; protected final KafkaNodeConfig nodeConfig; + protected final CacheClusterConfig cacheClusterConfig; private final KafkaFederationSinkContext sinkContext; - private final Context context; private final String cacheClusterName; private LifecycleState state; @@ -56,12 +56,13 @@ public class KafkaProducerCluster implements LifecycleAware { public KafkaProducerCluster( String workerName, + CacheClusterConfig cacheClusterConfig, KafkaNodeConfig nodeConfig, KafkaFederationSinkContext kafkaFederationSinkContext) { this.workerName = Preconditions.checkNotNull(workerName); this.nodeConfig = nodeConfig; + this.cacheClusterConfig = cacheClusterConfig; this.sinkContext = Preconditions.checkNotNull(kafkaFederationSinkContext); - this.context = new Context(nodeConfig.getProperties() != null ? nodeConfig.getProperties() : Maps.newHashMap()); this.state = LifecycleState.IDLE; this.cacheClusterName = nodeConfig.getNodeName(); this.handler = sinkContext.createEventHandler(); @@ -70,22 +71,53 @@ public KafkaProducerCluster( /** start and init kafka producer */ @Override public void start() { + if (CommonPropertiesHolder.useUnifiedConfiguration()) { + startByNodeConfig(); + } else { + startByCacheCluster(); + } + } + + private void startByCacheCluster() { this.state = LifecycleState.START; + if (cacheClusterConfig == null) { + LOG.error("start kafka producer cluster failed, cacheClusterConfig config is null"); + return; + } try { - Properties props = new Properties(); - props.putAll(context.getParameters()); - props.put( - ProducerConfig.PARTITIONER_CLASS_CONFIG, - context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName())); - props.put( - ProducerConfig.ACKS_CONFIG, - context.getString(ProducerConfig.ACKS_CONFIG, "all")); - props.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - nodeConfig.getBootstrapServers()); + Properties props = defaultKafkaProperties(); + props.putAll(cacheClusterConfig.getParams()); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, + cacheClusterConfig.getParams().getOrDefault(ProducerConfig.ACKS_CONFIG, "all")); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + cacheClusterConfig.getParams().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + props.put(ProducerConfig.CLIENT_ID_CONFIG, - nodeConfig.getClientId() + "-" + workerName); - LOG.info("init kafka client info: " + props); + cacheClusterConfig.getParams().get(ProducerConfig.CLIENT_ID_CONFIG) + "-" + workerName); + LOG.info("init kafka client by cache cluster info: " + props); + producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer()); + Preconditions.checkNotNull(producer); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + private void startByNodeConfig() { + this.state = LifecycleState.START; + if (nodeConfig == null) { + LOG.error("start kafka producer cluster failed, node config is null"); + return; + } + try { + Properties props = defaultKafkaProperties(); + props.putAll(nodeConfig.getProperties()); + props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, nodeConfig.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId() + "-" + workerName); + LOG.info("init kafka client by node config info: " + props); producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer()); Preconditions.checkNotNull(producer); } catch (Exception e) { @@ -93,6 +125,35 @@ public void start() { } } + public Properties defaultKafkaProperties() { + Properties props = new Properties(); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "122880"); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "44740000"); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "86400000"); + props.put(ProducerConfig.LINGER_MS_CONFIG, "500"); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608"); + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000"); + props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768"); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); + props.put(ProducerConfig.RETRIES_CONFIG, "100000"); + props.put(ProducerConfig.SEND_BUFFER_CONFIG, "524288"); + props.put("mute.partition.error.max.times", "20"); + props.put("mute.partition.max.percentage", "20"); + props.put("rpc.timeout.ms", "30000"); + props.put("topic.expiry.ms", "86400000"); + props.put("unmute.partition.interval.ms", "600000"); + props.put("metadata.retry.backoff.ms", "500"); + props.put("metadata.fetch.timeout.ms", "1000"); + props.put("maxThreads", "2"); + props.put("enable.replace.partition.for.can.retry", "true"); + props.put("enable.replace.partition.for.not.leader", "true"); + props.put("enable.topic.partition.circuit.breaker", "true"); + return props; + } + /** stop and close kafka producer */ @Override public void stop() { diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java index 23e817dd8d7..219519b907e 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java @@ -19,6 +19,8 @@ import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import com.google.common.base.Preconditions; @@ -45,6 +47,7 @@ public class KafkaProducerFederation implements Runnable { private KafkaNodeConfig nodeConfig; private KafkaProducerCluster cluster; private KafkaProducerCluster deleteCluster; + private CacheClusterConfig cacheClusterConfig; public KafkaProducerFederation(String workerName, KafkaFederationSinkContext context) { this.workerName = Preconditions.checkNotNull(workerName); @@ -86,13 +89,39 @@ private void reload() { LOG.error("failed to close delete cluster, ex={}", e.getMessage(), e); } + if (CommonPropertiesHolder.useUnifiedConfiguration()) { + reloadByNodeConfig(); + } else { + reloadByCacheClusterConfig(); + } + + } + + private void reloadByCacheClusterConfig() { try { + if (cacheClusterConfig != null && !cacheClusterConfig.equals(context.getCacheClusterConfig())) { + return; + } + this.cacheClusterConfig = context.getCacheClusterConfig(); + KafkaProducerCluster updateCluster = + new KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig, context); + updateCluster.start(); + this.deleteCluster = cluster; + this.cluster = updateCluster; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + private void reloadByNodeConfig() { + try { if (nodeConfig != null && context.getNodeConfig().getVersion() <= nodeConfig.getVersion()) { return; } this.nodeConfig = context.getNodeConfig(); - KafkaProducerCluster updateCluster = new KafkaProducerCluster(workerName, nodeConfig, context); + KafkaProducerCluster updateCluster = + new KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig, context); updateCluster.start(); this.deleteCluster = cluster; this.cluster = updateCluster; diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java index f5fe9c5b96f..cce771675f8 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java @@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter; import org.apache.inlong.sort.standalone.metrics.SortMetricItem; @@ -50,6 +51,7 @@ public class PulsarFederationSinkContext extends SinkContext { public static final String KEY_EVENT_HANDLER = "eventHandler"; private Map idConfigMap = new ConcurrentHashMap<>(); private PulsarNodeConfig pulsarNodeConfig; + private CacheClusterConfig cacheClusterConfig; public PulsarFederationSinkContext(String sinkName, Context context, Channel channel) { super(sinkName, context, channel); @@ -73,6 +75,12 @@ public void reload() { if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) { this.pulsarNodeConfig = requestNodeConfig; } + + CacheClusterConfig clusterConfig = new CacheClusterConfig(); + clusterConfig.setClusterName(this.taskName); + clusterConfig.setParams(this.sortTaskConfig.getSinkParams()); + this.cacheClusterConfig = clusterConfig; + this.taskConfig = newTaskConfig; this.sortTaskConfig = newSortTaskConfig;