diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index b922b53da0..57a15d62fe 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -82,6 +82,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel private final SortedSet unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet()); + /** Whether the confirm select method has been successfully activated */ + private boolean confirmSelectActivated = false; + /** Whether any nacks have been received since the last waitForConfirms(). */ private volatile boolean onlyAcksReceived = true; @@ -1553,10 +1556,16 @@ public Tx.RollbackOk txRollback() public Confirm.SelectOk confirmSelect() throws IOException { + if (confirmSelectActivated) { + return new Confirm.SelectOk(); + } + if (nextPublishSeqNo == 0) nextPublishSeqNo = 1; - return (Confirm.SelectOk) + Confirm.SelectOk result = (Confirm.SelectOk) exnWrappingRpc(new Confirm.Select(false)).getMethod(); + confirmSelectActivated = true; + return result; } /** Public API - {@inheritDoc} */ diff --git a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java index 76d44c816e..24b6d1e941 100644 --- a/src/test/java/com/rabbitmq/client/test/ChannelNTest.java +++ b/src/test/java/com/rabbitmq/client/test/ChannelNTest.java @@ -15,7 +15,9 @@ package com.rabbitmq.client.test; +import com.rabbitmq.client.Command; import com.rabbitmq.client.Method; +import com.rabbitmq.client.TrafficListener; import com.rabbitmq.client.impl.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -26,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class ChannelNTest { @@ -81,6 +84,29 @@ public TestConfig(int value, Consumer call) { .forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class)); } + @Test + public void confirmSelectOnlySendsRPCCallOnce() throws Exception { + AMQConnection connection = Mockito.mock(AMQConnection.class); + TrafficListener trafficListener = Mockito.mock(TrafficListener.class); + + Mockito.when(connection.getTrafficListener()).thenReturn(trafficListener); + + ChannelN channel = new ChannelN(connection, 1, consumerWorkService); + + new Thread(() -> { + try { + Thread.sleep(15); + channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk())); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).start(); + + assertNotNull(channel.confirmSelect()); + assertNotNull(channel.confirmSelect()); + Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class)); + } + interface Consumer { void apply(int value) throws Exception;