From 96ecc9ad51874c597f457cb1e83b6211efbfd4e2 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 23 Feb 2017 15:00:16 -0800 Subject: [PATCH] Fail unsupportedBatchVersion-subscriber if topic has served batch-message --- .../service/BrokerServiceException.java | 8 ++++ .../yahoo/pulsar/broker/service/Consumer.java | 15 ++++--- .../pulsar/broker/service/ServerCnx.java | 5 +++ .../pulsar/broker/service/Subscription.java | 2 + .../persistent/PersistentSubscription.java | 5 +++ .../service/persistent/PersistentTopic.java | 17 ++++++++ .../pulsar/broker/service/ServerCnxTest.java | 40 +++++++++++++++++++ .../impl/BrokerClientIntegrationTest.java | 3 ++ .../pulsar/common/api/proto/PulsarApi.java | 3 ++ pulsar-common/src/main/proto/PulsarApi.proto | 1 + 10 files changed, 91 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java index d33091a63fd2d..4e4653a2934ae 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerServiceException.java @@ -101,6 +101,12 @@ public SubscriptionInvalidCursorPosition(String msg) { super(msg); } } + + public static class UnsupportedVersionException extends BrokerServiceException { + public UnsupportedVersionException(String msg) { + super(msg); + } + } public static PulsarApi.ServerError getClientErrorCode(Throwable t) { if (t instanceof ServerMetadataException) { @@ -109,6 +115,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) { return PulsarApi.ServerError.PersistenceError; } else if (t instanceof ConsumerBusyException) { return PulsarApi.ServerError.ConsumerBusy; + } else if (t instanceof UnsupportedVersionException) { + return PulsarApi.ServerError.UnsupportedVersionError; } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException || t instanceof SubscriptionFencedException) { return PulsarApi.ServerError.ServiceNotReady; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 317d73c85d8c5..86dba5f303257 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -154,14 +154,12 @@ public Pair sendMessages(final List entries) { } catch (PulsarServerException pe) { log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId, cnx.getRemoteEndpointProtocolVersion()); + + subscription.markTopicWithBatchMessagePublished(); sentMessages.setRight(0); - // remove consumer from subscription and cnx: it will update dispatcher's availablePermits and resends - // pendingAck-messages of this consumer to other consumer - try { - close(); - } catch (BrokerServiceException e) { - log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e); - } + // disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this + // consumer to other consumer + disconnect(); return sentMessages; } @@ -232,6 +230,7 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx int permitsToReduce = 0; Iterator iter = entries.iterator(); boolean unsupportedVersion = false; + boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion(); while (iter.hasNext()) { Entry entry = iter.next(); ByteBuf metadataAndPayload = entry.getDataBuffer(); @@ -249,7 +248,7 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx pendingAcks.put(pos, batchSize); } // check if consumer supports batch message - if (batchSize > 1 && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v4.getNumber()) { + if (batchSize > 1 && !clientSupportBatchMessages) { unsupportedVersion = true; } permitsToReduce += batchSize; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 1a398e8718dc2..bde51e3cf5328 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -50,6 +50,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.BacklogQuota; @@ -768,4 +769,8 @@ public String getRole() { boolean hasConsumer(long consumerId) { return consumers.containsKey(consumerId); } + + public boolean isBatchMessageCompatibleVersion() { + return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); + } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java index e1c588d8dc8f3..a44945e5a9e0a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java @@ -62,4 +62,6 @@ public interface Subscription { void redeliverUnacknowledgedMessages(Consumer consumer); void redeliverUnacknowledgedMessages(Consumer consumer, List positions); + + void markTopicWithBatchMessagePublished(); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index 49b21a99f9a6c..37a16ed6b640d 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -592,5 +592,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List dispatcher.redeliverUnacknowledgedMessages(consumer, positions); } + @Override + public void markTopicWithBatchMessagePublished() { + topic.markBatchMessagePublished(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 6fc13a9b3aa88..0c24007ed4c03 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -58,6 +58,7 @@ import com.yahoo.pulsar.broker.service.BrokerServiceException; import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException; +import com.yahoo.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException; import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -128,6 +129,10 @@ public class PersistentTopic implements Topic, AddEntryCallback { // Timestamp of when this topic was last seen active private volatile long lastActive; + + // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which + // doesn't support batch-message + private volatile boolean hasBatchMessagePublished = false; private static final FastThreadLocal threadLocalTopicStats = new FastThreadLocal() { @Override @@ -303,6 +308,14 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri final CompletableFuture future = new CompletableFuture<>(); + if(hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { + if(log.isDebugEnabled()) { + log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); + } + future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); + return future; + } + if (subscriptionName.startsWith(replicatorPrefix)) { log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted")); @@ -1206,5 +1219,9 @@ public CompletableFuture clearBacklog(String cursorName) { return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found")); } + public void markBatchMessagePublished() { + this.hasBatchMessagePublished = true; + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 345b3ca9c4bd8..9787be4f03aa6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -72,6 +72,7 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper; import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.PulsarHandler; import com.yahoo.pulsar.common.api.Commands.ChecksumType; import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -1064,6 +1065,38 @@ public void testSubscribeCommand() throws Exception { channel.finish(); } + + @Test(timeOut = 30000) + public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { + final String failSubName = "failSub"; + + resetChannel(); + setChannelConnected(); + setConnectionVersion(ProtocolVersion.v3.getNumber()); + doReturn(false).when(brokerService).isAuthenticationEnabled(); + doReturn(false).when(brokerService).isAuthorizationEnabled(); + // test SUBSCRIBE on topic and cursor creation success + ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // + successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */); + channel.writeInbound(clientCommand); + assertTrue(getResponse() instanceof CommandSuccess); + + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + topicRef.markBatchMessagePublished(); + + // test SUBSCRIBE on topic and cursor creation success + clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, + "test" /* consumer name */); + channel.writeInbound(clientCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandError); + assertTrue(((CommandError) response).getError().equals(ServerError.UnsupportedVersionError)); + + // Server will not close the connection + assertTrue(channel.isOpen()); + + channel.finish(); + } @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationPositive() throws Exception { @@ -1163,6 +1196,13 @@ private void setChannelConnected() throws Exception { channelState.set(serverCnx, State.Connected); } + private void setConnectionVersion(int version) throws Exception { + PulsarHandler cnx = (PulsarHandler) serverCnx; + Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion"); + versionField.setAccessible(true); + versionField.set(cnx, version); + } + private Object getResponse() throws Exception { // Wait at most for 10s to get a response final long sleepTimeMs = 10; diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java index f5833a49196b3..aedc085358382 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -303,6 +303,9 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws consumer1.acknowledge(msg); } + // Also set clientCnx of the consumer to null so, it avoid reconnection so, other consumer can consume for + // verification + consumer1.setClientCnx(null); // (2) send batch-message which should not be able to consume: as broker will disconnect the consumer for (int i = 0; i < 10; i++) { String message = "my-message-" + i; diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index 0e6147836df08..0a2e47e57aea7 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -64,6 +64,7 @@ public enum ServerError ProducerBlockedQuotaExceededError(7, 7), ProducerBlockedQuotaExceededException(8, 8), ChecksumError(9, 9), + UnsupportedVersionError(10, 10), ; public static final int UnknownError_VALUE = 0; @@ -76,6 +77,7 @@ public enum ServerError public static final int ProducerBlockedQuotaExceededError_VALUE = 7; public static final int ProducerBlockedQuotaExceededException_VALUE = 8; public static final int ChecksumError_VALUE = 9; + public static final int UnsupportedVersionError_VALUE = 10; public final int getNumber() { return value; } @@ -92,6 +94,7 @@ public static ServerError valueOf(int value) { case 7: return ProducerBlockedQuotaExceededError; case 8: return ProducerBlockedQuotaExceededException; case 9: return ChecksumError; + case 10: return UnsupportedVersionError; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 6118b7ae01501..20e0379acef2b 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -77,6 +77,7 @@ enum ServerError { ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded ChecksumError = 9; // Error while verifying message checksum + UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature } enum AuthMethod {