Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connector name missing when using dead-letter-queue with concurrency config #2541

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
Expand Down Expand Up @@ -100,8 +101,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-dead-letter-topic-producer-" + consumerClientId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
Expand Down Expand Up @@ -127,8 +128,8 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
config.getChannel(), "dead-letter-queue", Map.of(
KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
.orElse("kafka-delayed-retry-topic-producer-" + consumerClientId),
Expand All @@ -145,9 +146,9 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));

ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX,
rootConfig.get(), config.getChannel(), "delayed-retry-topic.consumer", Map.of(
"lazy-client", c -> true,
ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer",
Map.of("lazy-client", c -> true,
CLIENT_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId,
GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId));
Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
Expand Down Expand Up @@ -88,9 +89,9 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
Instance<KafkaConsumerRebalanceListener> rebalanceListeners) {
super(config, defaultBufferSize);
this.channel = config.name();
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration, channel,
"reply", Map.of(
"topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX,
ConnectorConfig connectorConfig = new OverrideConnectorConfig(OUTGOING_PREFIX, channelConfiguration,
KafkaConnector.CONNECTOR_NAME, channel, "reply",
Map.of("topic", c -> c.getOriginalValue("topic", String.class).orElse(channel) + DEFAULT_REPLIES_TOPIC_SUFFIX,
"assign-seek",
c -> c.getOriginalValue(REPLY_PARTITION_KEY, Integer.class).map(String::valueOf).orElse(null)));
Config replyKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, connectorConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private MapBasedConfig dataconfig() {
.with("group.id", groupId)
.with("topic", topic)
.with("concurrency", 3)
.with("failure-strategy", "dead-letter-queue")
.with("auto.offset.reset", "earliest")
.with("value.deserializer", IntegerDeserializer.class.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static boolean isConcurrencyChannelName(String name) {
private final String indexedChannelPrefix;
private final String indexedChannel;

public ConcurrencyConnectorConfig(String prefix, Config overall, String channel, int index) {
super(prefix, overall, channel);
public ConcurrencyConnectorConfig(String prefix, Config overall, String connector, String channel, int index) {
super(prefix, overall, connector, channel);
this.indexedChannel = channel + CONCURRENCY_CONFIG_SEPARATOR + index;
this.indexedChannelPrefix = channelPrefix(prefix, indexedChannel);
}
Expand All @@ -59,7 +59,7 @@ public String getIndexedChannel() {
}

public ConcurrencyConnectorConfig(ConnectorConfig config, int index) {
this(config.prefix, config.overall, config.name, index);
this(config.prefix, config.overall, config.connector, config.name, index);
}

public String indexedChannelKey(String propertyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ public class ConnectorConfig implements Config {
protected final String connector;
protected final String channelPrefix;

protected ConnectorConfig(String prefix, Config overall, String channel) {
protected ConnectorConfig(String prefix, Config overall, String connector, String channel) {
this.prefix = Objects.requireNonNull(prefix, msg.prefixMustNotBeSet());
this.overall = Objects.requireNonNull(overall, msg.configMustNotBeSet());
this.name = Objects.requireNonNull(channel, msg.channelMustNotBeSet());
this.channelPrefix = channelPrefix(prefix, name);

Optional<String> value = overall.getOptionalValue(channelKey(CONNECTOR_ATTRIBUTE), String.class);
this.connector = value
.orElseGet(() -> overall.getOptionalValue(channelKey("type"), String.class) // Legacy
.orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(name)));
this.connector = connector;

// Detect invalid channel-name attribute
for (String key : overall.getPropertyNames()) {
Expand All @@ -59,6 +55,16 @@ protected ConnectorConfig(String prefix, Config overall, String channel) {
}
}

protected ConnectorConfig(String prefix, Config overall, String channel) {
this(prefix, overall, getConnectorAttribute(prefix, overall, channel), channel);
}

public static String getConnectorAttribute(String prefix, Config overall, String channel) {
return overall.getOptionalValue(channelPrefix(prefix, channel) + CONNECTOR_ATTRIBUTE, String.class)
.orElseGet(() -> overall.getOptionalValue(channelPrefix(prefix, channel) + "type", String.class) // Legacy
.orElseThrow(() -> ex.illegalArgumentChannelConnectorConfiguration(channel)));
}

public static String channelPrefix(String prefix, String name) {
return name.contains(".") ? prefix + "\"" + name + "\"." : prefix + name + ".";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public class OverrideConnectorConfig extends ConnectorConfig {
private final String nestedChannel;
private final Map<String, Function<OverrideConnectorConfig, Object>> overrides;

public OverrideConnectorConfig(String prefix, Config overall, String channel,
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel,
Map<String, Function<OverrideConnectorConfig, Object>> overrides) {
this(prefix, overall, channel, null, overrides);
this(prefix, overall, connector, channel, null, overrides);
}

public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel) {
this(prefix, overall, channel, nestedChannel, new HashMap<>());
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel) {
this(prefix, overall, connector, channel, nestedChannel, new HashMap<>());
}

public OverrideConnectorConfig(String prefix, Config overall, String channel, String nestedChannel,
public OverrideConnectorConfig(String prefix, Config overall, String connector, String channel, String nestedChannel,
Map<String, Function<OverrideConnectorConfig, Object>> overrides) {
super(prefix, overall, channel);
super(prefix, overall, connector, channel);
this.nestedChannel = nestedChannel;
this.overrides = overrides;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public String getName() {
}
});
overallConfig = builder.build();
config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 1);
config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", 2);
config = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 1);
config2 = new ConcurrencyConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", 2);
}

@SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_FOO_ATTR", value = "new-value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public String getName() {
}
});
overallConfig = builder.build();
config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar");
config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "foo", "bar",
config = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar");
config2 = new OverrideConnectorConfig("mp.messaging.incoming.", overallConfig, "some-connector", "foo", "bar",
Map.of("attr1", c -> "some-other-value",
"attr2", c -> c.getOriginalValue("attr2", Integer.class).map(i -> i + 10)
.orElse(10)));
Expand Down
Loading