diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java index 10e6b79609721..ae108646532dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java @@ -19,8 +19,12 @@ package org.apache.pulsar.broker.service; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.LongAdder; import java.util.function.ToLongFunction; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.util.FutureUtil; public abstract class AbstractSubscription implements Subscription { protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); @@ -39,4 +43,46 @@ private long sumConsumers(ToLongFunction toCounter) { .map(dispatcher -> dispatcher.getConsumers().stream().mapToLong(toCounter).sum()) .orElse(0L); } + + /** + * Checks if the given consumer is compatible with the given dispatcher. They are incompatible if + * + * @param dispatcher The dispatcher of the subscription + * @param consumer New consumer to be added to the subscription + * @return Optional containing failed future with {@link BrokerServiceException.SubscriptionBusyException} if + * consumer and dispatcher are incompatible or empty optional otherwise. + */ + protected Optional> checkForConsumerCompatibilityErrorWithDispatcher(Dispatcher dispatcher, + Consumer consumer) { + if (consumer.subType() != dispatcher.getType()) { + return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException( + String.format("Subscription is of different type. Active subscription type of '%s' is different " + + "than the connecting consumer's type '%s'.", + dispatcher.getType(), consumer.subType())))); + } else if (dispatcher.getType() == CommandSubscribe.SubType.Key_Shared) { + KeySharedMeta dispatcherKsm = dispatcher.getConsumers().get(0).getKeySharedMeta(); + KeySharedMeta consumerKsm = consumer.getKeySharedMeta(); + if (dispatcherKsm.getKeySharedMode() != consumerKsm.getKeySharedMode()) { + return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException( + String.format("Subscription is of different type. Active subscription key_shared mode of '%s' " + + "is different than the connecting consumer's key_shared mode '%s'.", + dispatcherKsm.getKeySharedMode(), consumerKsm.getKeySharedMode())))); + } + if (dispatcherKsm.isAllowOutOfOrderDelivery() != consumerKsm.isAllowOutOfOrderDelivery()) { + return Optional.of(FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException( + String.format("Subscription is of different type. %s", + dispatcherKsm.isAllowOutOfOrderDelivery() + ? "Active subscription allows out of order delivery while the connecting " + + "consumer does not allow it." : + "Active subscription does not allow out of order delivery while the connecting " + + "consumer allows it.")))); + } + } + return Optional.empty(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index e92eef5cb7bff..55639868e0ac1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -34,7 +34,6 @@ import org.apache.pulsar.broker.service.AnalyzeBacklogResult; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; -import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; @@ -159,8 +158,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { }); } } else { - if (consumer.subType() != dispatcher.getType()) { - return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription is of different type")); + Optional> compatibilityError = + checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer); + if (compatibilityError.isPresent()) { + return compatibilityError.get(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index df1c23cbbcb30..0096f398ada91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -307,9 +307,10 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { }); } } else { - if (consumer.subType() != dispatcher.getType()) { - return FutureUtil.failedFuture( - new SubscriptionBusyException("Subscription is of different type")); + Optional> compatibilityError = + checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer); + if (compatibilityError.isPresent()) { + return compatibilityError.get(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionConsumerCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionConsumerCompatibilityTest.java new file mode 100644 index 0000000000000..dc1d0170f9d72 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionConsumerCompatibilityTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.KeySharedMode; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class SubscriptionConsumerCompatibilityTest { + + private PulsarTestContext pulsarTestContext; + private ManagedLedger ledgerMock; + private ManagedCursorImpl cursorMock; + private final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; + private final String subName = "subscriptionName"; + + @BeforeMethod + public void setup() throws Exception { + pulsarTestContext = PulsarTestContext.builderForNonStartableContext().build(); + + ledgerMock = mock(ManagedLedgerImpl.class); + ManagedLedgerConfig managedLedgerConfigMock = mock(ManagedLedgerConfig.class); + doReturn(managedLedgerConfigMock).when(ledgerMock).getConfig(); + + cursorMock = mock(ManagedCursorImpl.class); + doReturn("mockCursor").when(cursorMock).getName(); + } + + @AfterMethod(alwaysRun = true) + public void teardown() throws Exception { + if (pulsarTestContext != null) { + pulsarTestContext.close(); + pulsarTestContext = null; + } + } + + @DataProvider(name = "incompatibleKeySharedPolicies") + public Object[][] incompatibleKeySharedPolicies() { + KeySharedMeta ksmSticky = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY); + ksmSticky.addHashRange().setStart(0).setEnd(2); + + KeySharedMeta ksmStickyAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY) + .setAllowOutOfOrderDelivery(true); + ksmStickyAllowOutOfOrder.addHashRange().setStart(3).setEnd(5); + + KeySharedMeta ksmAutoSplit = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT); + KeySharedMeta ksmAutoSplitAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT) + .setAllowOutOfOrderDelivery(true); + + String errorMessagePrefix = "Subscription is of different type. "; + String errorMessageSubscriptionModeSticky = errorMessagePrefix + "Active subscription key_shared mode of " + + "'STICKY' is different than the connecting consumer's key_shared mode 'AUTO_SPLIT'."; + String errorMessageSubscriptionModeAutoSplit = errorMessagePrefix + "Active subscription key_shared mode of " + + "'AUTO_SPLIT' is different than the connecting consumer's key_shared mode 'STICKY'."; + String errorMessageOutOfOrderNotAllowed = errorMessagePrefix + "Active subscription does not allow out of " + + "order delivery while the connecting consumer allows it."; + String errorMessageOutOfOrderAllowed = errorMessagePrefix + "Active subscription allows out of order delivery " + + "while the connecting consumer does not allow it."; + + return new Object[][] { + { ksmAutoSplit, ksmSticky, errorMessageSubscriptionModeAutoSplit }, + { ksmAutoSplit, ksmStickyAllowOutOfOrder, errorMessageSubscriptionModeAutoSplit }, + { ksmAutoSplit, ksmAutoSplitAllowOutOfOrder, errorMessageOutOfOrderNotAllowed }, + + { ksmAutoSplitAllowOutOfOrder, ksmSticky, errorMessageSubscriptionModeAutoSplit }, + { ksmAutoSplitAllowOutOfOrder, ksmStickyAllowOutOfOrder, errorMessageSubscriptionModeAutoSplit }, + { ksmAutoSplitAllowOutOfOrder, ksmAutoSplit, errorMessageOutOfOrderAllowed }, + + { ksmSticky, ksmStickyAllowOutOfOrder, errorMessageOutOfOrderNotAllowed }, + { ksmSticky, ksmAutoSplit, errorMessageSubscriptionModeSticky }, + { ksmSticky, ksmAutoSplitAllowOutOfOrder, errorMessageSubscriptionModeSticky }, + + { ksmStickyAllowOutOfOrder, ksmSticky, errorMessageOutOfOrderAllowed }, + { ksmStickyAllowOutOfOrder, ksmAutoSplit, errorMessageSubscriptionModeSticky }, + { ksmStickyAllowOutOfOrder, ksmAutoSplitAllowOutOfOrder, errorMessageSubscriptionModeSticky } + }; + } + + @Test(dataProvider = "incompatibleKeySharedPolicies") + public void testIncompatibleKeySharedPoliciesNotAllowedInNonPersistentSub(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm, + String expectedErrorMessage) throws Exception { + NonPersistentTopic topic = new NonPersistentTopic(successTopicName, pulsarTestContext.getBrokerService()); + NonPersistentSubscription sub = new NonPersistentSubscription(topic, subName, Map.of()); + + // two consumers with incompatible key_shared policies + Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm); + Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm); + + // first consumer defines key_shared mode of subscription and whether out of order delivery is allowed + sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS); + + try { + // add second consumer with incompatible key_shared policy + sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS); + fail(BrokerServiceException.SubscriptionBusyException.class.getSimpleName() + " not thrown"); + } catch (Exception e) { + // subscription throws exception when consumer with incompatible key_shared policy is added + Throwable cause = e.getCause(); + assertTrue(cause instanceof BrokerServiceException.SubscriptionBusyException); + assertEquals(cause.getMessage(), expectedErrorMessage); + } + } + + @Test(dataProvider = "incompatibleKeySharedPolicies") + public void testIncompatibleKeySharedPoliciesNotAllowedInPersistentSub(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm, + String expectedErrorMessage) throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, pulsarTestContext.getBrokerService()); + PersistentSubscription sub = new PersistentSubscription(topic, subName, cursorMock, false); + + // two consumers with incompatible key_shared policies + Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm); + Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm); + + // first consumer defines key_shared mode of subscription and whether out of order delivery is allowed + sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS); + + try { + // add second consumer with incompatible key_shared policy + sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS); + fail(BrokerServiceException.SubscriptionBusyException.class.getSimpleName() + " not thrown"); + } catch (Exception e) { + // subscription throws exception when consumer with incompatible key_shared policy is added + Throwable cause = e.getCause(); + assertTrue(cause instanceof BrokerServiceException.SubscriptionBusyException); + assertEquals(cause.getMessage(), expectedErrorMessage); + } + } + + protected Consumer createKeySharedMockConsumer(String name, KeySharedMeta ksm) { + Consumer consumer = BrokerTestUtil.createMockConsumer(name); + doReturn(CommandSubscribe.SubType.Key_Shared).when(consumer).subType(); + doReturn(ksm).when(consumer).getKeySharedMeta(); + doReturn(mock(PendingAcksMap.class)).when(consumer).getPendingAcks(); + return consumer; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 309cd7b55ac0c..9a46a2919d12a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -26,7 +26,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import io.netty.channel.EventLoopGroup; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -60,8 +58,6 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; import org.awaitility.Awaitility; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -83,11 +79,6 @@ public class PersistentSubscriptionTest { final TxnID txnID1 = new TxnID(1,1); final TxnID txnID2 = new TxnID(1,2); - private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class); - - private OrderedExecutor executor; - private EventLoopGroup eventLoopGroup; - @BeforeMethod public void setup() throws Exception { pulsarTestContext = PulsarTestContext.builderForNonStartableContext()