Skip to content

Commit

Permalink
Fix connector name missing when using dead-letter-queue with concurre…
Browse files Browse the repository at this point in the history
…ncy config
  • Loading branch information
ozangunalp committed Mar 15, 2024
1 parent 589d508 commit eae2ca2
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
Expand Down Expand Up @@ -100,8 +101,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-dead-letter-topic-producer-" + consumerClientId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
Expand Down Expand Up @@ -127,8 +128,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-delayed-retry-topic-producer-" + consumerClientId),
Expand All @@ -145,9 +146,9 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));

ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX,
rootConfig.get(), config.getChannel(), "delayed-retry-topic.consumer", Map.of(
"lazy-client", c -> true,
ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer",
Map.of("lazy-client", c -> true,
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,
GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId));
Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
Expand Down Expand Up @@ -88,9 +89,9 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
Instance<KafkaConsumerRebalanceListener> rebalanceListeners) {
super(config, defaultBufferSize);
this.channel = config.name();
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration, channel,
"reply", Map.of(
"topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX,
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration,
KafkaConnector.CONNECTOR_NAME, channel, "reply",
Map.of("topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX,
"assign-seek",
c -> c.getOriginalValue(REPLY_PARTITION_KEY, Integer.class).map(String::valueOf).orElse(null)));
Config replyKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private MapBasedConfig dataconfig() {
.with("group.id", groupId)
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "dead-letter-queue")
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static boolean isConcurrencyChannelName(String name) {
private final String indexedChannelPrefix;
private final String indexedChannel;

public ConcurrencyConnectorConfig(String prefix, Config overall, String channel, int index) {
super(prefix, overall, channel);
public ConcurrencyConnectorConfig(String prefix, Config overall, String connector, String channel, int index) {
super(prefix, overall, connector, channel);
this.indexedChannel = channel + CONCURRENCY_CONFIG_SEPARATOR + index;
this.indexedChannelPrefix = channelPrefix(prefix, indexedChannel);
}
Expand All @@ -59,7 +59,7 @@ public String getIndexedChannel() {
}

public ConcurrencyConnectorConfig(ConnectorConfig config, int index) {
this(config.prefix, config.overall, config.name, index);
this(config.prefix, config.overall, config.connector, config.name, index);
}

public String indexedChannelKey(String propertyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ public class ConnectorConfig implements Config {
protected final String connector;
protected final String channelPrefix;

protected ConnectorConfig(String prefix, Config overall, String channel) {
protected ConnectorConfig(String prefix, Config overall, String connector, String channel) {
this.prefix = Objects.requireNonNull(prefix, msg.prefixMustNotBeSet());
this.overall = Objects.requireNonNull(overall, msg.configMustNotBeSet());
this.name = Objects.requireNonNull(channel, msg.channelMustNotBeSet());
this.channelPrefix = channelPrefix(prefix, name);

Optional<String> value = overall.getOptionalValue(channelKey(CONNECTOR_ATTRIBUTE), String.class);
this.connector = value
.orElseGet(() -> overall.getOptionalValue(channelKey("type"), String.class) // Legacy
.orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(name)));
this.connector = connector;

// Detect invalid channel-name attribute
for (String key : overall.getPropertyNames()) {
Expand All @@ -59,6 +55,16 @@ protected ConnectorConfig(String prefix, Config overall, String channel) {
}
}

protected ConnectorConfig(String prefix, Config overall, String channel) {
this(prefix, overall, getConnectorAttribute(prefix, overall, channel), channel);
}

public static String getConnectorAttribute(String prefix, Config overall, String channel) {
return overall.getOptionalValue(channelPrefix(prefix, channel) + CONNECTOR_ATTRIBUTE, String.class)
.orElseGet(() -> overall.getOptionalValue(channelPrefix(prefix, channel) + "type", String.class) // Legacy
.orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(channel)));
}

public static String channelPrefix(String prefix, String name) {
return name.contains(".") ? prefix + "\"" + name + "\"." : prefix + name + ".";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public class OverrideConnectorConfig extends ConnectorConfig {
private final String nestedChannel;
private final Map<String, Function<OverrideConnectorConfig, Object>> overrides;

public OverrideConnectorConfig(String prefix, Config overall, String channel,
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel,
Map<String, Function<OverrideConnectorConfig, Object>> overrides) {
this(prefix, overall, channel, null, overrides);
this(prefix, overall, connector, channel, null, overrides);
}

public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel) {
this(prefix, overall, channel, nestedChannel, new HashMap<>());
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel) {
this(prefix, overall, connector, channel, nestedChannel, new HashMap<>());
}

public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel,
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel,
Map<String, Function<OverrideConnectorConfig, Object>> overrides) {
super(prefix, overall, channel);
super(prefix, overall, connector, channel);
this.nestedChannel = nestedChannel;
this.overrides = overrides;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public String getName() {
}
});
overallConfig = builder.build();
config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 1);
config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 2);
config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 1);
config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 2);
}

@SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public String getName() {
}
});
overallConfig = builder.build();
config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar");
config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar",
config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar");
config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar",
Map.of("attr1", c -> "some-other-value",
"attr2", c -> c.getOriginalValue("attr2", Integer.class).map(i -> i + 10)
.orElse(10)));
Expand Down

0 comments on commit eae2ca2

Please sign in to comment.