diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 31e37d0f176d77..07188639a45375 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -703,15 +703,14 @@ public CompletableFuture> addProducer(Producer producer, log.warn("[{}] Attempting to add producer to a terminated topic", topic); throw new TopicTerminatedException("Topic was already terminated"); } - internalAddProducer(producer); - - USAGE_COUNT_UPDATER.incrementAndGet(this); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), - USAGE_COUNT_UPDATER.get(this)); - } - - return CompletableFuture.completedFuture(producerEpoch); + return internalAddProducer(producer).thenApply(ignore -> { + USAGE_COUNT_UPDATER.incrementAndGet(this); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), + USAGE_COUNT_UPDATER.get(this)); + } + return producerEpoch; + }); } catch (BrokerServiceException e) { return FutureUtil.failedFuture(e); } finally { @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException { + protected CompletableFuture internalAddProducer(Producer producer) { if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); - throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); + return CompletableFuture.failedFuture( + new BrokerServiceException.ProducerBusyException("Topic reached max producers limit")); } if (isSameAddressProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); - throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"); + return CompletableFuture.failedFuture( + new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit")); } if (log.isDebugEnabled()) { @@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); if (existProducer != null) { - tryOverwriteOldProducer(existProducer, producer); + return tryOverwriteOldProducer(existProducer, producer); } else if (!producer.isRemote()) { USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); } + return CompletableFuture.completedFuture(null); } - private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) - throws BrokerServiceException { + private CompletableFuture tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) { if (newProducer.isSuccessorTo(oldProducer)) { oldProducer.close(false); if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. - throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() - + "' replace concurrency error"); + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '" + + newProducer.getProducerName() + "' replace concurrency error")); } else { handleProducerRemoved(oldProducer); + return CompletableFuture.completedFuture(null); } } else { // If a producer with the same name tries to use a new connection, async check the old connection is // available. The producers related the connection that not available are automatically cleaned up. if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) { - oldProducer.getCnx().checkConnectionLiveness(); + return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> { + if (previousIsActive) { + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException( + "Producer with name '" + newProducer.getProducerName() + + "' is already connected to topic")); + } else { + return internalAddProducer(newProducer); + } + }); } - throw new BrokerServiceException.NamingException( - "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"); + return CompletableFuture.failedFuture(new BrokerServiceException.NamingException( + "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2ea5e28880bf84..303b10b5212f7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -45,6 +45,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.DefaultChannelId; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.vertx.core.impl.ConcurrentHashSet; @@ -63,6 +64,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; @@ -119,6 +121,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandPing; +import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; @@ -938,7 +941,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E } @Test - public void testHandleProducerAfterClientChannelInactive() throws Exception { + public void testHandleProducerIfPreviousOneIsActive() throws Exception { final String tName = successTopicName; final long producerId = 1; final MutableInt requestId = new MutableInt(1); @@ -958,28 +961,96 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception { assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); - // Verify the second producer using a new connection will override the producer who using a stopped channel. - channelsStoppedAnswerHealthCheck.add(channel); + // Verify the second producer will be reject due to the previous one still is active. + // Every second try once, total 10 times, all requests should fail. ClientChannel channel2 = new ClientChannel(); + AtomicBoolean channel1MonitorStopped1 = startChannelMonitorToHandleUserTask(channel); + AtomicBoolean channel1MonitorStopped2 = startChannelMonitorToHandleUserTask(channel2.channel); setChannelConnected(channel2.serverCnx); - Awaitility.await().untilAsserted(() -> { + + for (int i = 0; i < 10; i++) { ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), pName, false, metadata, null, epoch.incrementAndGet(), false, ProducerAccessMode.Shared, Optional.empty(), false); channel2.channel.writeInbound(cmdProducer2); - assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess); + triggerReceivePingOnce(channel, clientChannelHelper); + Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper); + assertTrue(response2 instanceof CommandError); assertEquals(topicRef.getProducers().size(), 1); - }); + assertTrue(channel.isActive()); + Thread.sleep(1000); + } + channel1MonitorStopped1.set(true); + channel1MonitorStopped2.set(true); // cleanup. channel.finish(); channel2.close(); } + @Test + public void testHandleProducerAfterClientChannelInactive() throws Exception { + final String tName = successTopicName; + final long producerId = 1; + final MutableInt requestId = new MutableInt(1); + final MutableInt epoch = new MutableInt(1); + final Map metadata = Collections.emptyMap(); + final String pName = "p1"; + resetChannel(); + setChannelConnected(); + + // The producer register using the first connection. + ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), + pName, false, metadata, null, epoch.incrementAndGet(), false, + ProducerAccessMode.Shared, Optional.empty(), false); + channel.writeInbound(cmdProducer1); + assertTrue(getResponse() instanceof CommandProducerSuccess); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + + // Verify the second producer using a new connection will override the producer who using a stopped channel. + channelsStoppedAnswerHealthCheck.add(channel); + ClientChannel channel2 = new ClientChannel(); + AtomicBoolean channel1MonitorStopped1 = startChannelMonitorToHandleUserTask(channel); + AtomicBoolean channel1MonitorStopped2 = startChannelMonitorToHandleUserTask(channel2.channel); + setChannelConnected(channel2.serverCnx); + + ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), + pName, false, metadata, null, epoch.incrementAndGet(), false, + ProducerAccessMode.Shared, Optional.empty(), false); + channel2.channel.writeInbound(cmdProducer2); + triggerReceivePingOnce(channel, clientChannelHelper); + Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper); + assertTrue(response2 instanceof CommandProducerSuccess); + assertEquals(topicRef.getProducers().size(), 1); + channel1MonitorStopped1.set(true); + channel1MonitorStopped2.set(true); + + // cleanup. + channel.finish(); + channel2.close(); + } + + private AtomicBoolean startChannelMonitorToHandleUserTask(EmbeddedChannel channel) { + AtomicBoolean channel1MonitorStopped = new AtomicBoolean(false); + Thread channel1Monitor = new Thread(() -> { + while (!channel1MonitorStopped.get()) { + channel.runPendingTasks(); + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + }); + channel1Monitor.start(); + return channel1MonitorStopped; + } + private class ClientChannel implements Closeable { private ClientChannelHelper clientChannelHelper = new ClientChannelHelper(); private ServerCnx serverCnx = new ServerCnx(pulsar); - private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder( + private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(), + new LengthFieldBasedFrameDecoder( 5 * 1024 * 1024, 0, 4, @@ -2626,10 +2697,20 @@ private void setConnectionVersion(int version) throws Exception { } protected Object getResponse() throws Exception { - return getResponse(channel, clientChannelHelper); + return getResponse(channel, clientChannelHelper, false); + } + + protected void triggerReceivePingOnce(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) + throws Exception { + getResponse(channel, clientChannelHelper, true); } protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) throws Exception { + return getResponse(channel, clientChannelHelper, false); + } + + protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper, + boolean expectedPing) throws Exception { // Wait at most for 10s to get a response final long sleepTimeMs = 10; final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs; @@ -2637,18 +2718,25 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client if (!channel.outboundMessages().isEmpty()) { Object outObject = channel.outboundMessages().remove(); Object cmd = clientChannelHelper.getCommand(outObject); + if (cmd instanceof CommandPong) { + log.info("Received Pong"); + continue; + } if (cmd instanceof CommandPing) { + log.info("Received Ping"); if (channelsStoppedAnswerHealthCheck.contains(channel)) { + if (expectedPing) { + return cmd; + } else { + continue; + } + } + channel.writeInbound(Commands.newPong()); + if (expectedPing) { + return cmd; + } else { continue; } - channel.writeAndFlush(Commands.newPong()).addListener(future -> { - if (!future.isSuccess()) { - log.warn("[{}] Forcing connection to close since cannot send a pong message.", - channel, future.cause()); - channel.close(); - } - }); - continue; } return cmd; } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index bf0dd3aa9c1c5d..e4595c14562f4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -27,6 +27,8 @@ import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; +import org.apache.pulsar.common.api.proto.CommandPing; +import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.api.proto.CommandAck; @@ -207,6 +209,16 @@ protected void handleEndTxnOnSubscriptionResponse( CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) { queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse)); } + + @Override + protected void handlePing(CommandPing ping) { + queue.offer(new CommandPing().copyFrom(ping)); + } + + @Override + protected void handlePong(CommandPong pong) { + queue.offer(new CommandPong().copyFrom(pong)); + } }; }