diff --git a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java index e4509005..731f2784 100644 --- a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java +++ b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java @@ -10,21 +10,16 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; import jakarta.inject.Named; -import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ConnectionFactoryConfigurator; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.Environment; @@ -53,10 +48,51 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream."; - private static final String PROP_STREAM = PROP_PREFIX + "stream"; - private static final String PROP_CONNECTION_PREFIX = PROP_PREFIX + "connection."; + @Deprecated + @ConfigProperty(name = PROP_PREFIX + "connection.host") + String legacyHost; - @ConfigProperty(name = PROP_STREAM) + @Deprecated + @ConfigProperty(name = PROP_PREFIX + "connection.port") + int legacyPort; + + @ConfigProperty(name = PROP_PREFIX + "host", defaultValue = "localhost") + String host; + + @ConfigProperty(name = PROP_PREFIX + "port", defaultValue = "5552") + int port; + + @ConfigProperty(name = PROP_PREFIX + "username", defaultValue = "guest") + String username; + + @ConfigProperty(name = PROP_PREFIX + "password", defaultValue = "guest") + String password; + + @ConfigProperty(name = PROP_PREFIX + "virtualHost", defaultValue = "/") + String virtualHost; + + @ConfigProperty(name = PROP_PREFIX + "rpcTimeout", defaultValue = "10") + int rpcTimeout; + + @ConfigProperty(name = PROP_PREFIX + "maxProducersByConnection", defaultValue = "256") + int maxProducersByConnection; + + @ConfigProperty(name = PROP_PREFIX + "maxTrackingConsumersByConnection", defaultValue = "50") + int maxTrackingConsumersByConnection; + + @ConfigProperty(name = PROP_PREFIX + "maxConsumersByConnection", defaultValue = "256") + int maxConsumersByConnection; + + @ConfigProperty(name = PROP_PREFIX + "requestedHeartbeat", defaultValue = "60") + int requestedHeartbeat; + + @ConfigProperty(name = PROP_PREFIX + "requestedMaxFrameSize", defaultValue = "0") + int requestedMaxFrameSize; + + @ConfigProperty(name = PROP_PREFIX + "id", defaultValue = "rabbitmq-stream") + String id; + + @ConfigProperty(name = PROP_PREFIX + "stream") Optional stream; @ConfigProperty(name = PROP_PREFIX + "stream.maxAge") @@ -68,8 +104,26 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple @ConfigProperty(name = PROP_PREFIX + "stream.maxSegmentSize") Optional streamMaxSegmentSize; - @ConfigProperty(name = PROP_PREFIX + "ackTimeout", defaultValue = "30000") - int ackTimeout; + @ConfigProperty(name = PROP_PREFIX + "producer.name") + Optional producerName; + + @ConfigProperty(name = PROP_PREFIX + "producer.batchSize", defaultValue = "100") + int producerBatchSize; + + @ConfigProperty(name = PROP_PREFIX + "producer.subEntrySize", defaultValue = "1") + int producerSubEntrySize; + + @ConfigProperty(name = PROP_PREFIX + "producer.maxUnconfirmedMessages", defaultValue = "10000") + int producerMaxUnconfirmedMessages; + + @ConfigProperty(name = PROP_PREFIX + "producer.batchPublishingDelay", defaultValue = "100") + int producerBatchPublishingDelay; + + @ConfigProperty(name = PROP_PREFIX + "producer.confirmTimeout", defaultValue = "30") + int producerConfirmTimeout; + + @ConfigProperty(name = PROP_PREFIX + "producer.enqueueTimeout", defaultValue = "10") + int producerEnqueueTimeout; @ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default") String nullValue; @@ -91,26 +145,31 @@ private void createStream(Environment env, String name) { @PostConstruct void connect() { + if(legacyHost != null || legacyPort > 0) { + LOGGER.warn("The parameters connection.host and connection.port are deprecated, please use rabbitmqstream.host and rabbitmqstream.port moving forward."); + } - final Config config = ConfigProvider.getConfig(); - ConnectionFactory factory = new ConnectionFactory(); - - Map configProperties = getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, - entry -> (entry.getValue() == null) ? null : entry.getValue().toString())); + String connectionHost = legacyHost != null ? legacyHost : host; + int connectionPort = legacyPort > 0 ? legacyPort : port; - ConnectionFactoryConfigurator.load(factory, configProperties, ""); - LOGGER.info("Using connection to {}:{}", factory.getHost(), factory.getPort()); + LOGGER.info("Using connection to {}:{}", connectionHost, connectionPort); try { - Address entryPoint = new Address(factory.getHost(), factory.getPort()); + Address entryPoint = new Address(connectionHost, connectionPort); environment = Environment.builder() .host(entryPoint.host()) .port(entryPoint.port()) .addressResolver(address -> entryPoint) - .username(factory.getUsername()) - .password(factory.getPassword()) - .virtualHost(factory.getVirtualHost()) + .username(username) + .password(password) + .virtualHost(virtualHost) + .requestedMaxFrameSize(requestedMaxFrameSize) + .requestedHeartbeat(Duration.ofSeconds(requestedHeartbeat)) + .rpcTimeout(Duration.ofSeconds(rpcTimeout)) + .maxProducersByConnection(maxProducersByConnection) + .maxTrackingConsumersByConnection(maxTrackingConsumersByConnection) + .maxConsumersByConnection(maxConsumersByConnection) + .id(id) .build(); } catch (StreamException | IllegalArgumentException e) { @@ -154,7 +213,13 @@ public void handleBatch(List> records, RecordCommitt } producer = environment.producerBuilder() - .confirmTimeout(Duration.ofSeconds(ackTimeout)) + .confirmTimeout(Duration.ofSeconds(producerConfirmTimeout)) + .enqueueTimeout(Duration.ofSeconds(producerEnqueueTimeout)) + .batchPublishingDelay(Duration.ofMillis(producerBatchPublishingDelay)) + .maxUnconfirmedMessages(producerMaxUnconfirmedMessages) + .subEntrySize(producerSubEntrySize) + .batchSize(producerBatchSize) + .name(producerName.orElse(null)) .stream(topic) .build(); diff --git a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java index 08981c94..f0635778 100644 --- a/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java +++ b/debezium-server-rabbitmq/src/test/java/io/debezium/server/rabbitmq/RabbitMqStreamTestResourceLifecycleManager.java @@ -42,8 +42,8 @@ public Map start() { throw new RuntimeException(e); } Map params = new ConcurrentHashMap<>(); - params.put("debezium.sink.rabbitmqstream.connection.host", container.getHost()); - params.put("debezium.sink.rabbitmqstream.connection.port", String.valueOf(getPort())); + params.put("debezium.sink.rabbitmqstream.host", container.getHost()); + params.put("debezium.sink.rabbitmqstream.port", String.valueOf(getPort())); return params; }