diff --git a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java index 483b95d1..4ca7ebba 100644 --- a/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java +++ b/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java @@ -94,14 +94,14 @@ public Thread newThread(Runnable r) { ? Schedulers.newSingle(options.transactionalId()) : options.scheduler() ); - + boolean transactional = this.senderOptions.isTransactional(); this.producerMono = Mono .fromCallable(() -> { Producer producer = producerFactory.createProducer(senderOptions); if (senderOptions.producerListener() != null) { senderOptions.producerListener().producerAdded(producerId, producer); } - if (senderOptions.isTransactional()) { + if (transactional) { log.info("Initializing transactions for producer {}", senderOptions.transactionalId()); producer.initTransactions(); @@ -117,7 +117,10 @@ public Thread newThread(Runnable r) { : flux; }); - this.transactionManager = senderOptions.isTransactional() + if (transactional) { + this.producerMono.subscribe().dispose(); + } + this.transactionManager = transactional ? new DefaultTransactionManager<>(producerMono, senderOptions) : null; } diff --git a/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java b/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java index a60fc476..d4b5a345 100644 --- a/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java +++ b/src/test/java/reactor/kafka/sender/internals/MockSenderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -113,6 +114,18 @@ public void producerCreate() { assertEquals(Arrays.asList(producer), producerFactory.producersInUse()); } + /** + * Tests that a transactional Kafka producer is created eagerly. + */ + @Test + public void txProducerCreate() { + Map options = new HashMap<>(); + options.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-"); + sender = new DefaultKafkaSender<>(producerFactory, SenderOptions.create(options)); + await().untilAsserted(() -> assertEquals(1, producerFactory.producersInUse().size())); + assertEquals(Arrays.asList(producer), producerFactory.producersInUse()); + } + /** * Tests that closing KafkaSender closes the underlying producer. */ @@ -843,7 +856,7 @@ public OutgoingRecords append(String topic, int count) { RecordMetadata metadata = null; Exception e = null; if (!fail) - metadata = new RecordMetadata(partition, 0, partitionResponses.size(), 0, (Long) 0L, 0, 0); + metadata = new RecordMetadata(partition, 0, partitionResponses.size(), 0, 0L, 0, 0); else e = new InvalidTopicException("Topic not found: " + topic); partitionResponses.add(new Response<>(metadata, e, correlation));