From 82f9adb4218374590b662a9cc81713122eeda6e3 Mon Sep 17 00:00:00 2001 From: Rajan Date: Fri, 24 Feb 2017 15:57:40 -0800 Subject: [PATCH] On batch-msg dispatching: broker should disconnect consumer which doesn't support batch-message (#215) * On batch-msg dispatching: broker should disconnect consumer which doesn't support batch-message * Close consumer at broker-side without closing connection if consumer doesn't support batch-message * Fail unsupportedBatchVersion-subscriber if topic has served batch-message --- .../service/BrokerServiceException.java | 8 ++ .../yahoo/pulsar/broker/service/Consumer.java | 28 ++++- .../pulsar/broker/service/ServerCnx.java | 5 + .../pulsar/broker/service/Subscription.java | 2 + .../persistent/PersistentSubscription.java | 5 + .../service/persistent/PersistentTopic.java | 17 +++ .../pulsar/broker/service/ServerCnxTest.java | 40 +++++++ .../api/SimpleProducerConsumerTest.java | 2 +- .../impl/BrokerClientIntegrationTest.java | 111 ++++++++++++++++++ .../pulsar/common/api/proto/PulsarApi.java | 3 + pulsar-common/src/main/proto/PulsarApi.proto | 1 + 11 files changed, 218 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java index d33091a63fd2d..4e4653a2934ae 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java @@ -101,6 +101,12 @@ public SubscriptionInvalidCursorPosition(String msg) { super(msg); } } + + public static class UnsupportedVersionException extends BrokerServiceException { + public UnsupportedVersionException(String msg) { + super(msg); + } + } public static PulsarApi.ServerError getClientErrorCode(Throwable t) { if (t instanceof ServerMetadataException) { @@ -109,6 +115,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) { return PulsarApi.ServerError.PersistenceError; } else if (t instanceof ConsumerBusyException) { return PulsarApi.ServerError.ConsumerBusy; + } else if (t instanceof UnsupportedVersionException) { + return PulsarApi.ServerError.UnsupportedVersionError; } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException || t instanceof SubscriptionFencedException) { return PulsarApi.ServerError.ServiceNotReady; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 0d6f74b93d2b0..86dba5f303257 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -34,6 +34,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; +import com.yahoo.pulsar.broker.PulsarServerException; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck; @@ -148,8 +149,20 @@ public Pair sendMessages(final List entries) { return sentMessages; } - sentMessages.setRight(updatePermitsAndPendingAcks(entries)); - + try { + sentMessages.setRight(updatePermitsAndPendingAcks(entries)); + } catch (PulsarServerException pe) { + log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId, + cnx.getRemoteEndpointProtocolVersion()); + + subscription.markTopicWithBatchMessagePublished(); + sentMessages.setRight(0); + // disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this + // consumer to other consumer + disconnect(); + return sentMessages; + } + ctx.channel().eventLoop().execute(() -> { for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); @@ -213,9 +226,11 @@ int getBatchSizeforEntry(ByteBuf metadataAndPayload) { return -1; } - int updatePermitsAndPendingAcks(final List entries) { + int updatePermitsAndPendingAcks(final List entries) throws PulsarServerException { int permitsToReduce = 0; Iterator iter = entries.iterator(); + boolean unsupportedVersion = false; + boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion(); while (iter.hasNext()) { Entry entry = iter.next(); ByteBuf metadataAndPayload = entry.getDataBuffer(); @@ -232,11 +247,18 @@ int updatePermitsAndPendingAcks(final List entries) { PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition()); pendingAcks.put(pos, batchSize); } + // check if consumer supports batch message + if (batchSize > 1 && !clientSupportBatchMessages) { + unsupportedVersion = true; + } permitsToReduce += batchSize; } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce); incrementUnackedMessages(permitsToReduce); + if (unsupportedVersion) { + throw new PulsarServerException("Consumer does not support batch-message"); + } if (permits < 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 1a398e8718dc2..bde51e3cf5328 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -50,6 +50,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.BacklogQuota; @@ -768,4 +769,8 @@ public String getRole() { boolean hasConsumer(long consumerId) { return consumers.containsKey(consumerId); } + + public boolean isBatchMessageCompatibleVersion() { + return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); + } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java index e1c588d8dc8f3..a44945e5a9e0a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java @@ -62,4 +62,6 @@ public interface Subscription { void redeliverUnacknowledgedMessages(Consumer consumer); void redeliverUnacknowledgedMessages(Consumer consumer, List positions); + + void markTopicWithBatchMessagePublished(); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index 49b21a99f9a6c..37a16ed6b640d 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -592,5 +592,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List dispatcher.redeliverUnacknowledgedMessages(consumer, positions); } + @Override + public void markTopicWithBatchMessagePublished() { + topic.markBatchMessagePublished(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 140a2756adbdb..a95aeab4714d1 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -58,6 +58,7 @@ import com.yahoo.pulsar.broker.service.BrokerServiceException; import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException; +import com.yahoo.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException; import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -128,6 +129,10 @@ public class PersistentTopic implements Topic, AddEntryCallback { // Timestamp of when this topic was last seen active private volatile long lastActive; + + // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which + // doesn't support batch-message + private volatile boolean hasBatchMessagePublished = false; private static final FastThreadLocal threadLocalTopicStats = new FastThreadLocal() { @Override @@ -321,6 +326,14 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri final CompletableFuture future = new CompletableFuture<>(); + if(hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { + if(log.isDebugEnabled()) { + log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + } + future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); + return future; + } + if (subscriptionName.startsWith(replicatorPrefix)) { log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted")); @@ -1224,5 +1237,9 @@ public CompletableFuture clearBacklog(String cursorName) { return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found")); } + public void markBatchMessagePublished() { + this.hasBatchMessagePublished = true; + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 345b3ca9c4bd8..9787be4f03aa6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -72,6 +72,7 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper; import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.PulsarHandler; import com.yahoo.pulsar.common.api.Commands.ChecksumType; import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -1064,6 +1065,38 @@ public void testSubscribeCommand() throws Exception { channel.finish(); } + + @Test(timeOut = 30000) + public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { + final String failSubName = "failSub"; + + resetChannel(); + setChannelConnected(); + setConnectionVersion(ProtocolVersion.v3.getNumber()); + doReturn(false).when(brokerService).isAuthenticationEnabled(); + doReturn(false).when(brokerService).isAuthorizationEnabled(); + // test SUBSCRIBE on topic and cursor creation success + ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // + successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */); + channel.writeInbound(clientCommand); + assertTrue(getResponse() instanceof CommandSuccess); + + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + topicRef.markBatchMessagePublished(); + + // test SUBSCRIBE on topic and cursor creation success + clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, + "test" /* consumer name */); + channel.writeInbound(clientCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandError); + assertTrue(((CommandError) response).getError().equals(ServerError.UnsupportedVersionError)); + + // Server will not close the connection + assertTrue(channel.isOpen()); + + channel.finish(); + } @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationPositive() throws Exception { @@ -1163,6 +1196,13 @@ private void setChannelConnected() throws Exception { channelState.set(serverCnx, State.Connected); } + private void setConnectionVersion(int version) throws Exception { + PulsarHandler cnx = (PulsarHandler) serverCnx; + Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion"); + versionField.setAccessible(true); + versionField.set(cnx, version); + } + private Object getResponse() throws Exception { // Wait at most for 10s to get a response final long sleepTimeMs = 10; diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index c8bcf69e80a7c..b984aa35132a8 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1856,4 +1856,4 @@ public void testRedeliveryFailOverConsumer() throws Exception { } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java index 7b323bdc54705..aedc085358382 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -17,21 +17,34 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.*; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.Sets; import com.yahoo.pulsar.broker.namespace.OwnershipCache; +import com.yahoo.pulsar.broker.service.Topic; +import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.Producer; import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.ProducerConsumerBase; +import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.impl.HandlerBase.State; +import com.yahoo.pulsar.common.api.PulsarHandler; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceBundle; @@ -52,6 +65,11 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); } + + @DataProvider + public Object[][] subType() { + return new Object[][] {{SubscriptionType.Shared}, {SubscriptionType.Failover}}; + } /** @@ -226,5 +244,98 @@ public void testCloseBrokerService() throws Exception { } + /** + * It verifies that consumer which doesn't support batch-message: + *

+ * 1. broker disconnects that consumer + *

+ * 2. redeliver all those messages to other supported consumer under the same subscription + * + * @param subType + * @throws Exception + */ + @Test(timeOut = 7000, dataProvider = "subType") + public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws Exception { + log.info("-- Starting {} test --", methodName); + + final int batchMessageDelayMs = 1000; + final String topicName = "persistent://my-property/use/my-ns/my-topic1"; + final String subscriptionName = "my-subscriber-name" + subType; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(subType); + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriptionName, conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + if (batchMessageDelayMs != 0) { + producerConf.setBatchingEnabled(true); + producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); + producerConf.setBatchingMaxMessages(20); + } + + Producer producer = pulsarClient.createProducer(topicName, new ProducerConfiguration()); + Producer batchProducer = pulsarClient.createProducer(topicName, producerConf); + + // update consumer's version to incompatible batch-message version = Version.V3 + Topic topic = pulsar.getBrokerService().getTopic(topicName).get(); + com.yahoo.pulsar.broker.service.Consumer brokerConsumer = topic.getSubscriptions().get(subscriptionName) + .getConsumers().get(0); + Field cnxField = com.yahoo.pulsar.broker.service.Consumer.class.getDeclaredField("cnx"); + cnxField.setAccessible(true); + PulsarHandler cnx = (PulsarHandler) cnxField.get(brokerConsumer); + Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion"); + versionField.setAccessible(true); + versionField.set(cnx, 3); + + // (1) send non-batch message: consumer should be able to consume + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + Set messageSet = Sets.newHashSet(); + Message msg = null; + for (int i = 0; i < 10; i++) { + msg = consumer1.receive(1, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + consumer1.acknowledge(msg); + } + + // Also set clientCnx of the consumer to null so, it avoid reconnection so, other consumer can consume for + // verification + consumer1.setClientCnx(null); + // (2) send batch-message which should not be able to consume: as broker will disconnect the consumer + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + batchProducer.sendAsync(message.getBytes()); + } + + Thread.sleep(batchMessageDelayMs); + + // consumer should have not received any message as it should have been disconnected + msg = consumer1.receive(2, TimeUnit.SECONDS); + assertNull(msg); + + // subscrie consumer2 with supporting batch version + pulsarClient = PulsarClient.create(brokerUrl.toString()); + Consumer consumer2 = pulsarClient.subscribe(topicName, subscriptionName, conf); + + messageSet.clear(); + for (int i = 0; i < 10; i++) { + msg = consumer2.receive(1, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + consumer2.acknowledge(msg); + } + + consumer2.close(); + producer.close(); + batchProducer.close(); + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index 0e6147836df08..0a2e47e57aea7 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -64,6 +64,7 @@ public enum ServerError ProducerBlockedQuotaExceededError(7, 7), ProducerBlockedQuotaExceededException(8, 8), ChecksumError(9, 9), + UnsupportedVersionError(10, 10), ; public static final int UnknownError_VALUE = 0; @@ -76,6 +77,7 @@ public enum ServerError public static final int ProducerBlockedQuotaExceededError_VALUE = 7; public static final int ProducerBlockedQuotaExceededException_VALUE = 8; public static final int ChecksumError_VALUE = 9; + public static final int UnsupportedVersionError_VALUE = 10; public final int getNumber() { return value; } @@ -92,6 +94,7 @@ public static ServerError valueOf(int value) { case 7: return ProducerBlockedQuotaExceededError; case 8: return ProducerBlockedQuotaExceededException; case 9: return ChecksumError; + case 10: return UnsupportedVersionError; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 6118b7ae01501..20e0379acef2b 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -77,6 +77,7 @@ enum ServerError { ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded ChecksumError = 9; // Error while verifying message checksum + UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature } enum AuthMethod {