From eae2ca2b8dc22c12557ffc24a3daf23a1d9329b1 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 15 Mar 2024 18:14:51 +0100 Subject: [PATCH] Fix connector name missing when using dead-letter-queue with concurrency config --- .../kafka/fault/KafkaDeadLetterQueue.java | 5 +++-- .../kafka/fault/KafkaDelayedRetryTopic.java | 11 ++++++----- .../kafka/reply/KafkaRequestReplyImpl.java | 7 ++++--- .../kafka/ConcurrentProcessorTest.java | 1 + .../impl/ConcurrencyConnectorConfig.java | 6 +++--- .../providers/impl/ConnectorConfig.java | 18 ++++++++++++------ .../impl/OverrideConnectorConfig.java | 12 ++++++------ .../impl/ConcurrencyConnectorConfigTest.java | 4 ++-- .../impl/OverrideConnectorConfigTest.java | 4 ++-- 9 files changed, 39 insertions(+), 29 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index 1aed8cb2c4..5800d13991 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -30,6 +30,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; +import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; @@ -100,8 +101,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), - config.getChannel(), "dead-letter-queue", Map.of( - KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), + KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue", + Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() .orElse("kafka-dead-letter-topic-producer-" + consumerClientId), diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java index 0dcf1f1e8b..1d482150ba 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java @@ -46,6 +46,7 @@ import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; +import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; @@ -127,8 +128,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), - config.getChannel(), "dead-letter-queue", Map.of( - KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), + KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue", + Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() .orElse("kafka-delayed-retry-topic-producer-" + consumerClientId), @@ -145,9 +146,9 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(producerConfig, serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); - ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, - rootConfig.get(), config.getChannel(), "delayed-retry-topic.consumer", Map.of( - "lazy-client", c -> true, + ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), + KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer", + Map.of("lazy-client", c -> true, CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId, GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId)); Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 42866b9049..ee1d2c00eb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -36,6 +36,7 @@ import io.smallrye.reactive.messaging.OutgoingMessageMetadata; import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; +import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; @@ -88,9 +89,9 @@ public KafkaRequestReplyImpl(EmitterConfiguration config, Instance rebalanceListeners) { super(config, defaultBufferSize); this.channel = config.name(); - ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration, channel, - "reply", Map.of( - "topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX, + ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration, + KafkaConnector.CONNECTOR_NAME, channel, "reply", + Map.of("topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX, "assign-seek", c -> c.getOriginalValue(REPLY_PARTITION_KEY, Integer.class).map(String::valueOf).orElse(null))); Config replyKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConcurrentProcessorTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConcurrentProcessorTest.java index 57f901ff93..9e049aa8d8 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConcurrentProcessorTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConcurrentProcessorTest.java @@ -38,6 +38,7 @@ private MapBasedConfig dataconfig() { .with("group.id", groupId) .with("topic", topic) .with("concurrency", 3) + .with("failure-strategy", "dead-letter-queue") .with("auto.offset.reset", "earliest") .with("value.deserializer", IntegerDeserializer.class.getName()); } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfig.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfig.java index 27206c5301..a44149d911 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfig.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfig.java @@ -48,8 +48,8 @@ public static boolean isConcurrencyChannelName(String name) { private final String indexedChannelPrefix; private final String indexedChannel; - public ConcurrencyConnectorConfig(String prefix, Config overall, String channel, int index) { - super(prefix, overall, channel); + public ConcurrencyConnectorConfig(String prefix, Config overall, String connector, String channel, int index) { + super(prefix, overall, connector, channel); this.indexedChannel = channel + CONCURRENCY_CONFIG_SEPARATOR + index; this.indexedChannelPrefix = channelPrefix(prefix, indexedChannel); } @@ -59,7 +59,7 @@ public String getIndexedChannel() { } public ConcurrencyConnectorConfig(ConnectorConfig config, int index) { - this(config.prefix, config.overall, config.name, index); + this(config.prefix, config.overall, config.connector, config.name, index); } public String indexedChannelKey(String propertyName) { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConnectorConfig.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConnectorConfig.java index 55424252ba..ab6bd38a48 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConnectorConfig.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConnectorConfig.java @@ -40,16 +40,12 @@ public class ConnectorConfig implements Config { protected final String connector; protected final String channelPrefix; - protected ConnectorConfig(String prefix, Config overall, String channel) { + protected ConnectorConfig(String prefix, Config overall, String connector, String channel) { this.prefix = Objects.requireNonNull(prefix, msg.prefixMustNotBeSet()); this.overall = Objects.requireNonNull(overall, msg.configMustNotBeSet()); this.name = Objects.requireNonNull(channel, msg.channelMustNotBeSet()); this.channelPrefix = channelPrefix(prefix, name); - - Optional value = overall.getOptionalValue(channelKey(CONNECTOR_ATTRIBUTE), String.class); - this.connector = value - .orElseGet(() -> overall.getOptionalValue(channelKey("type"), String.class) // Legacy - .orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(name))); + this.connector = connector; // Detect invalid channel-name attribute for (String key : overall.getPropertyNames()) { @@ -59,6 +55,16 @@ protected ConnectorConfig(String prefix, Config overall, String channel) { } } + protected ConnectorConfig(String prefix, Config overall, String channel) { + this(prefix, overall, getConnectorAttribute(prefix, overall, channel), channel); + } + + public static String getConnectorAttribute(String prefix, Config overall, String channel) { + return overall.getOptionalValue(channelPrefix(prefix, channel) + CONNECTOR_ATTRIBUTE, String.class) + .orElseGet(() -> overall.getOptionalValue(channelPrefix(prefix, channel) + "type", String.class) // Legacy + .orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(channel))); + } + public static String channelPrefix(String prefix, String name) { return name.contains(".") ? prefix + "\"" + name + "\"." : prefix + name + "."; } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java index d8f507197f..b8223e9e2f 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfig.java @@ -29,18 +29,18 @@ public class OverrideConnectorConfig extends ConnectorConfig { private final String nestedChannel; private final Map> overrides; - public OverrideConnectorConfig(String prefix, Config overall, String channel, + public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, Map> overrides) { - this(prefix, overall, channel, null, overrides); + this(prefix, overall, connector, channel, null, overrides); } - public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel) { - this(prefix, overall, channel, nestedChannel, new HashMap<>()); + public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel) { + this(prefix, overall, connector, channel, nestedChannel, new HashMap<>()); } - public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel, + public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel, Map> overrides) { - super(prefix, overall, channel); + super(prefix, overall, connector, channel); this.nestedChannel = nestedChannel; this.overrides = overrides; } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfigTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfigTest.java index 5038059da4..c6e517a448 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfigTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/ConcurrencyConnectorConfigTest.java @@ -75,8 +75,8 @@ public String getName() { } }); overallConfig = builder.build(); - config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 1); - config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 2); + config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 1); + config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 2); } @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value") diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java index 51503549ca..fe368ecd46 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/impl/OverrideConnectorConfigTest.java @@ -74,8 +74,8 @@ public String getName() { } }); overallConfig = builder.build(); - config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar"); - config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar", + config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar"); + config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar", Map.of("attr1", c -> "some-other-value", "attr2", c -> c.getOriginalValue("attr2", Integer.class).map(i -> i + 10) .orElse(10)));