diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 836bd7ad2d467..4e176c4fc0bd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -703,15 +703,14 @@ public CompletableFuture> addProducer(Producer producer, log.warn("[{}] Attempting to add producer to a terminated topic", topic); throw new TopicTerminatedException("Topic was already terminated"); } - internalAddProducer(producer); - - USAGE_COUNT_UPDATER.incrementAndGet(this); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), - USAGE_COUNT_UPDATER.get(this)); - } - - return CompletableFuture.completedFuture(producerEpoch); + return internalAddProducer(producer).thenApply(ignore -> { + USAGE_COUNT_UPDATER.incrementAndGet(this); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), + USAGE_COUNT_UPDATER.get(this)); + } + return producerEpoch; + }); } catch (BrokerServiceException e) { return FutureUtil.failedFuture(e); } finally { @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException { + protected CompletableFuture internalAddProducer(Producer producer) { if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); - throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); + return CompletableFuture.failedFuture( + new BrokerServiceException.ProducerBusyException("Topic reached max producers limit")); } if (isSameAddressProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); - throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"); + return CompletableFuture.failedFuture( + new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit")); } if (log.isDebugEnabled()) { @@ -974,31 +975,46 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); if (existProducer != null) { - tryOverwriteOldProducer(existProducer, producer); + return tryOverwriteOldProducer(existProducer, producer); } else if (!producer.isRemote()) { USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); } + return CompletableFuture.completedFuture(null); } - private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) - throws BrokerServiceException { + private CompletableFuture tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) { if (newProducer.isSuccessorTo(oldProducer)) { oldProducer.close(false); if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. - throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() - + "' replace concurrency error"); + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '" + + newProducer.getProducerName() + "' replace concurrency error")); } else { handleProducerRemoved(oldProducer); + return CompletableFuture.completedFuture(null); } } else { // If a producer with the same name tries to use a new connection, async check the old connection is // available. The producers related the connection that not available are automatically cleaned up. if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) { - oldProducer.getCnx().checkConnectionLiveness(); + return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> { + if (previousIsActive) { + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException( + "Producer with name '" + newProducer.getProducerName() + + "' is already connected to topic")); + } else { + // If the connection of the previous producer is not active, the method + // "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous + // producer. So try to add current producer again. + // The recursive call will be stopped by these two case(This prevents infinite call): + // 1. add current producer success. + // 2. once another same name producer registered. + return internalAddProducer(newProducer); + } + }); } - throw new BrokerServiceException.NamingException( - "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"); + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException( + "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 79178ec491ff1..0f0440d24dde7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -981,7 +980,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E } @Test - public void testHandleProducerAfterClientChannelInactive() throws Exception { + public void testDuplicateProducer() throws Exception { final String tName = successTopicName; final long producerId = 1; final MutableInt requestId = new MutableInt(1); @@ -1001,21 +1000,74 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception { assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); - // Verify the second producer using a new connection will override the producer who using a stopped channel. - channelsStoppedAnswerHealthCheck.add(channel); + // Verify the second producer will be reject due to the previous one still is active. + // Every second try once, total 10 times, all requests should fail. ClientChannel channel2 = new ClientChannel(); + BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel); + BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper); + BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel); setChannelConnected(channel2.serverCnx); - Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> { - channel.runPendingTasks(); + + for (int i = 0; i < 10; i++) { ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), pName, false, metadata, null, epoch.incrementAndGet(), false, ProducerAccessMode.Shared, Optional.empty(), false); channel2.channel.writeInbound(cmdProducer2); - assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess); + Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper); + assertTrue(response2 instanceof CommandError); assertEquals(topicRef.getProducers().size(), 1); - }); + assertTrue(channel.isActive()); + Thread.sleep(500); + } + + // cleanup. + autoResponseForHeartBeat.close(); + backGroundExecutor1.close(); + backGroundExecutor2.close(); + channel.finish(); + channel2.close(); + } + + @Test + public void testProducerChangeSocket() throws Exception { + final String tName = successTopicName; + final long producerId = 1; + final MutableInt requestId = new MutableInt(1); + final MutableInt epoch = new MutableInt(1); + final Map metadata = Collections.emptyMap(); + final String pName = "p1"; + resetChannel(); + setChannelConnected(); + + // The producer register using the first connection. + ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), + pName, false, metadata, null, epoch.incrementAndGet(), false, + ProducerAccessMode.Shared, Optional.empty(), false); + channel.writeInbound(cmdProducer1); + assertTrue(getResponse() instanceof CommandProducerSuccess); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + + // Verify the second producer using a new connection will override the producer who using a stopped channel. + channelsStoppedAnswerHealthCheck.add(channel); + ClientChannel channel2 = new ClientChannel(); + BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel); + BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel); + setChannelConnected(channel2.serverCnx); + + ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), + pName, false, metadata, null, epoch.incrementAndGet(), false, + ProducerAccessMode.Shared, Optional.empty(), false); + channel2.channel.writeInbound(cmdProducer2); + Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper); + assertTrue(response2 instanceof CommandProducerSuccess); + assertEquals(topicRef.getProducers().size(), 1); // cleanup. + channelsStoppedAnswerHealthCheck.clear(); + backGroundExecutor1.close(); + backGroundExecutor2.close(); channel.finish(); channel2.close(); } @@ -1125,6 +1177,20 @@ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final Embed return new BackGroundExecutor(executor, scheduledFuture); } + /** + * Auto answer `Pong` for the `Cmd-Ping`. + * Node: This will result in additional threads pop Command from the Command queue, so do not call this + * method if the channel needs to accept other Command. + */ + private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel, + ClientChannelHelper clientChannelHelper) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> { + tryPeekResponse(channel, clientChannelHelper); + }, 100, 100, TimeUnit.MILLISECONDS); + return new BackGroundExecutor(executor, scheduledFuture); + } + @AllArgsConstructor private static class BackGroundExecutor implements Closeable { @@ -2824,6 +2890,26 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client throw new IOException("Failed to get response from socket within 10s"); } + protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) { + while (true) { + if (channel.outboundMessages().isEmpty()) { + return null; + } else { + Object outObject = channel.outboundMessages().peek(); + Object cmd = clientChannelHelper.getCommand(outObject); + if (cmd instanceof CommandPing) { + if (channelsStoppedAnswerHealthCheck.contains(channel)) { + continue; + } + channel.writeInbound(Commands.newPong()); + channel.outboundMessages().remove(); + continue; + } + return cmd; + } + } + } + private void setupMLAsyncCallbackMocks() { ledgerMock = mock(ManagedLedger.class); cursorMock = mock(ManagedCursor.class);