Skip to content

Commit

Permalink
Pulsar config refactoring
Browse files Browse the repository at this point in the history
Fixes #2631
  • Loading branch information
ozangunalp committed Jul 25, 2024
1 parent f6609ee commit 8a7ce52
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
package io.smallrye.reactive.messaging.pulsar;

import static io.smallrye.reactive.messaging.providers.helpers.CDIUtils.getInstanceById;
import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand All @@ -30,6 +40,7 @@

import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.json.JsonObject;

/**
Expand Down Expand Up @@ -110,6 +121,33 @@ public ClientBuilderImpl customize(ClientBuilderImpl builder, PulsarConnectorCom
return (ClientBuilderImpl) ConfigUtils.customize(cc.config(), clientConfigCustomizers, builder);
}

public ClientBuilderImpl configure(PulsarConnectorCommonConfiguration cc, ClientConfigurationData conf)
throws PulsarClientException {
setAuth(conf);
return customize(new ClientBuilderImpl(conf), cc);
}

/**
* Sets the authentication object in the given configuration object using
* `authPluginClassName` and `authParams`/`authParamMap` attributes
* This use to be done by the PulsarClientImpl
*
* @param conf client configuration
* @throws PulsarClientException
*/
private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (Validation.isBlank(conf.getAuthPluginClassName())
|| (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
return;
}

if (!Validation.isBlank(conf.getAuthParams())) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
} else if (conf.getAuthParamMap() != null) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
}
}

/**
* Extract the configuration map for building Pulsar consumer
*
Expand All @@ -129,6 +167,62 @@ public <T> ConsumerBuilder<T> customize(ConsumerBuilder<T> builder, PulsarConnec
return (ConsumerBuilder<T>) ConfigUtils.customize(ic.config(), consumerConfigCustomizers, builder);
}

public <T> ConsumerBuilder<T> configure(ConsumerBuilder<T> builder,
PulsarConnectorIncomingConfiguration ic,
ConsumerConfigurationData<?> conf) {
builder.loadConf(configToMap(conf));
ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i)));
ic.getNegativeAckRedeliveryBackoff()
.ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s, ic.getChannel())));
ic.getAckTimeoutRedeliveryBackoff()
.ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s, ic.getChannel())));
if (conf.getConsumerEventListener() != null) {
builder.consumerEventListener(conf.getConsumerEventListener());
}
if (conf.getPayloadProcessor() != null) {
builder.messagePayloadProcessor(conf.getPayloadProcessor());
}
if (conf.getKeySharedPolicy() != null) {
builder.keySharedPolicy(conf.getKeySharedPolicy());
} else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
if (conf.getMessageCrypto() != null) {
builder.messageCrypto(conf.getMessageCrypto());
}
if (ic.getBatchReceive()) {
builder.batchReceivePolicy(
Objects.requireNonNullElse(conf.getBatchReceivePolicy(), BatchReceivePolicy.DEFAULT_POLICY));
}
return customize(builder, ic);
}

private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) {
return DeadLetterPolicy.builder()
.maxRedeliverCount(redeliverCount)
.deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null))
.retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null))
.initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null))
.build();
}

private RedeliveryBackoff parseBackoff(String backoffString, String channel) {
String[] strings = backoffString.split(",");
try {
return MultiplierRedeliveryBackoff.builder()
.minDelayMs(Long.parseLong(strings[0]))
.maxDelayMs(Long.parseLong(strings[1]))
.multiplier(Double.parseDouble(strings[2]))
.build();
} catch (Exception e) {
log.unableToParseRedeliveryBackoff(backoffString, channel);
return null;
}
}

/**
* Extract the configuration map for building Pulsar producer
*
Expand All @@ -148,6 +242,25 @@ public <T> ProducerBuilder<T> customize(ProducerBuilder<T> builder, PulsarConnec
return (ProducerBuilder<T>) ConfigUtils.customize(oc.config(), producerConfigCustomizers, builder);
}

public <T> ProducerBuilder<T> configure(ProducerBuilder<T> builder,
PulsarConnectorOutgoingConfiguration oc,
ProducerConfigurationData conf) {
builder.loadConf(configToMap(conf));
if (conf.getCustomMessageRouter() != null) {
builder.messageRouter(conf.getCustomMessageRouter());
}
if (conf.getBatcherBuilder() != null) {
builder.batcherBuilder(conf.getBatcherBuilder());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
for (String encryptionKey : conf.getEncryptionKeys()) {
builder.addEncryptionKey(encryptionKey);
}
return customize(builder, oc);
}

private Map<String, Object> mergeMap(Map<String, Object> defaultConfig, Map<String, Object> channelConfig) {
Map<String, Object> map = new HashMap<>(defaultConfig);
map.putAll(channelConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.Config;
Expand All @@ -41,7 +39,6 @@
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.mutiny.core.Vertx;

@ApplicationScoped
Expand Down Expand Up @@ -166,36 +163,14 @@ public void terminate(

private PulsarClientImpl createPulsarClient(PulsarConnectorCommonConfiguration cc, ClientConfigurationData configuration) {
try {
setAuth(configuration);
log.createdClientWithConfig(configuration);
ClientBuilderImpl customized = configResolver.customize(new ClientBuilderImpl(configuration), cc);
return new PulsarClientImpl(customized.getClientConfigurationData(), vertx.nettyEventLoopGroup());
ClientConfigurationData data = configResolver.configure(cc, configuration).getClientConfigurationData();
log.createdClientWithConfig(data);
return new PulsarClientImpl(data, vertx.nettyEventLoopGroup());
} catch (PulsarClientException e) {
throw ex.illegalStateUnableToBuildClient(e);
}
}

/**
* Sets the authentication object in the given configuration object using
* `authPluginClassName` and `authParams`/`authParamMap` attributes
* This use to be done by the PulsarClientImpl
*
* @param conf client configuration
* @throws PulsarClientException
*/
private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (Validation.isBlank(conf.getAuthPluginClassName())
|| (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
return;
}

if (!Validation.isBlank(conf.getAuthParams())) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams()));
} else if (conf.getAuthParamMap() != null) {
conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap()));
}
}

public PulsarClient getClient(String channel) {
return clientsByChannel.get(channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import jakarta.enterprise.inject.Instance;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
Expand Down Expand Up @@ -62,7 +61,6 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
this.channel = ic.getChannel();
this.healthEnabled = ic.getHealthEnabled();
this.tracingEnabled = ic.getTracingEnabled();
ConsumerBuilder<T> builder = client.newConsumer(schema);
ConsumerConfigurationData<?> conf = configResolver.getConsumerConf(ic);
if (conf.getSubscriptionName() == null) {
String s = UUID.randomUUID().toString();
Expand All @@ -75,32 +73,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
if (conf.getConsumerName() == null) {
conf.setConsumerName(channel);
}
builder.loadConf(configResolver.configToMap(conf));
ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i)));
ic.getNegativeAckRedeliveryBackoff().ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s)));
ic.getAckTimeoutRedeliveryBackoff().ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s)));
if (conf.getConsumerEventListener() != null) {
builder.consumerEventListener(conf.getConsumerEventListener());
}
if (conf.getPayloadProcessor() != null) {
builder.messagePayloadProcessor(conf.getPayloadProcessor());
}
if (conf.getKeySharedPolicy() != null) {
builder.keySharedPolicy(conf.getKeySharedPolicy());
} else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
if (conf.getMessageCrypto() != null) {
builder.messageCrypto(conf.getMessageCrypto());
}
if (ic.getBatchReceive() && conf.getBatchReceivePolicy() == null) {
builder.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY);
}

this.consumer = configResolver.customize(builder, ic).subscribe();
ConsumerBuilder<T> builder = configResolver.configure(client.newConsumer(schema), ic, conf);
this.consumer = builder.subscribe();
log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
this.ackHandler = ackHandlerFactory.create(consumer, ic);
this.failureHandler = failureHandlerFactory.create(consumer, ic, this::reportFailure);
Expand Down Expand Up @@ -196,29 +170,6 @@ private boolean isEndOfStream(PulsarClient client, Throwable throwable) {
return false;
}

private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) {
return DeadLetterPolicy.builder()
.maxRedeliverCount(redeliverCount)
.deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null))
.retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null))
.initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null))
.build();
}

private RedeliveryBackoff parseBackoff(String backoffString) {
String[] strings = backoffString.split(",");
try {
return MultiplierRedeliveryBackoff.builder()
.minDelayMs(Long.parseLong(strings[0]))
.maxDelayMs(Long.parseLong(strings[1]))
.multiplier(Double.parseDouble(strings[2]))
.build();
} catch (Exception e) {
log.unableToParseRedeliveryBackoff(backoffString, this.channel);
return null;
}
}

static boolean hasTopicConfig(ConsumerConfigurationData<?> conf) {
return conf.getTopicsPattern() != null
|| (conf.getTopicNames() != null && !conf.getTopicNames().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,8 @@ public PulsarOutgoingChannel(PulsarClient client, Schema<T> schema, PulsarConnec
if (conf.getMaxPendingMessages() > 0 && conf.getMaxPendingMessagesAcrossPartitions() == 0) {
conf.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessages());
}
Map<String, Object> producerConf = configResolver.configToMap(conf);
ProducerBuilder<T> builder = client.newProducer(schema)
.loadConf(producerConf);
if (conf.getBatcherBuilder() != null) {
builder.batcherBuilder(conf.getBatcherBuilder());
}
if (conf.getCryptoKeyReader() != null) {
builder.cryptoKeyReader(conf.getCryptoKeyReader());
}
for (String encryptionKey : conf.getEncryptionKeys()) {
builder.addEncryptionKey(encryptionKey);
}
this.producer = configResolver.customize(builder, oc).create();
ProducerBuilder<T> builder = configResolver.configure(client.newProducer(schema), oc, conf);
this.producer = builder.create();
log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf);
long requests = getRequests(oc, conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase;
Expand All @@ -45,7 +49,7 @@ static void insertMessages() throws PulsarClientException {
}

@Test
void testBatchRecieveAppUsingPulsarConnector() {
void testBatchReceiveAppUsingPulsarConnector() {
// Run app
ConsumingApp app = runApplication(config(), ConsumingApp.class);
long start = System.currentTimeMillis();
Expand All @@ -58,6 +62,35 @@ void testBatchRecieveAppUsingPulsarConnector() {
assertThat(app.getResults()).containsExactlyElementsOf(expected);
}

@Test
void testBatchReceiveAppWithCustomConfig() {
addBeans(BatchConfig.class);
// Run app
ConsumingApp app = runApplication(config()
.with("mp.messaging.incoming.data.consumer-configuration", "batch-config"), ConsumingApp.class);
long start = System.currentTimeMillis();

// Check for consumed messages in app
await().atMost(Duration.ofSeconds(30)).until(() -> app.getResults().size() == NUMBER_OF_MESSAGES);
long end = System.currentTimeMillis();

System.out.println("Ack - Estimate: " + (end - start) + " ms");
assertThat(app.getResults()).containsExactlyElementsOf(expected);
}

@ApplicationScoped
public static class BatchConfig {
@Produces
@Identifier("batch-config")
public ConsumerConfigurationData<Object> configureBatchConsumer() {
var data = new ConsumerConfigurationData<>();
data.setBatchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(10)
.build());
return data;
}
}

MapBasedConfig config() {
return new MapBasedConfig()
.with("mp.messaging.incoming.data.connector", PulsarConnector.CONNECTOR_NAME)
Expand All @@ -77,7 +110,6 @@ public static class ConsumingApp {

@Incoming("data")
public CompletionStage<Void> consume(PulsarIncomingBatchMessage<Integer> message) {
// System.out.println(message.getIncomingMessages().size());
results.addAll(message.getPayload());
return message.ack();
}
Expand Down

0 comments on commit 8a7ce52

Please sign in to comment.