Skip to content

Commit

Permalink
Fail unsupportedBatchVersion-subscriber if topic has served batch-mes…
Browse files Browse the repository at this point in the history
…sage
  • Loading branch information
rdhabalia committed Feb 24, 2017
1 parent d987874 commit 96ecc9a
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public SubscriptionInvalidCursorPosition(String msg) {
super(msg);
}
}

public static class UnsupportedVersionException extends BrokerServiceException {
public UnsupportedVersionException(String msg) {
super(msg);
}
}

public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
if (t instanceof ServerMetadataException) {
Expand All @@ -109,6 +115,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return PulsarApi.ServerError.PersistenceError;
} else if (t instanceof ConsumerBusyException) {
return PulsarApi.ServerError.ConsumerBusy;
} else if (t instanceof UnsupportedVersionException) {
return PulsarApi.ServerError.UnsupportedVersionError;
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,12 @@ public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
} catch (PulsarServerException pe) {
log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId,
cnx.getRemoteEndpointProtocolVersion());

subscription.markTopicWithBatchMessagePublished();
sentMessages.setRight(0);
// remove consumer from subscription and cnx: it will update dispatcher's availablePermits and resends
// pendingAck-messages of this consumer to other consumer
try {
close();
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e);
}
// disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this
// consumer to other consumer
disconnect();
return sentMessages;
}

Expand Down Expand Up @@ -232,6 +230,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
Expand All @@ -249,7 +248,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
pendingAcks.put(pos, batchSize);
}
// check if consumer supports batch message
if (batchSize > 1 && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v4.getNumber()) {
if (batchSize > 1 && !clientSupportBatchMessages) {
unsupportedVersion = true;
}
permitsToReduce += batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -768,4 +769,8 @@ public String getRole() {
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}

public boolean isBatchMessageCompatibleVersion() {
return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ public interface Subscription {
void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);

void markTopicWithBatchMessagePublished();
}
Original file line number Diff line number Diff line change
Expand Up @@ -592,5 +592,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
}

@Override
public void markTopicWithBatchMessagePublished() {
topic.markBatchMessagePublished();
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.NamingException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -128,6 +129,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {

// Timestamp of when this topic was last seen active
private volatile long lastActive;

// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;

private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
Expand Down Expand Up @@ -303,6 +308,14 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

final CompletableFuture<Consumer> future = new CompletableFuture<>();

if(hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if(log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message"));
return future;
}

if (subscriptionName.startsWith(replicatorPrefix)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted"));
Expand Down Expand Up @@ -1206,5 +1219,9 @@ public CompletableFuture<Void> clearBacklog(String cursorName) {
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}

public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -1064,6 +1065,38 @@ public void testSubscribeCommand() throws Exception {

channel.finish();
}

@Test(timeOut = 30000)
public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
final String failSubName = "failSub";

resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v3.getNumber());
doReturn(false).when(brokerService).isAuthenticationEnabled();
doReturn(false).when(brokerService).isAuthorizationEnabled();
// test SUBSCRIBE on topic and cursor creation success
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandSuccess);

PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName);
topicRef.markBatchMessagePublished();

// test SUBSCRIBE on topic and cursor creation success
clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive,
"test" /* consumer name */);
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandError);
assertTrue(((CommandError) response).getError().equals(ServerError.UnsupportedVersionError));

// Server will not close the connection
assertTrue(channel.isOpen());

channel.finish();
}

@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
Expand Down Expand Up @@ -1163,6 +1196,13 @@ private void setChannelConnected() throws Exception {
channelState.set(serverCnx, State.Connected);
}

private void setConnectionVersion(int version) throws Exception {
PulsarHandler cnx = (PulsarHandler) serverCnx;
Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
versionField.setAccessible(true);
versionField.set(cnx, version);
}

private Object getResponse() throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
consumer1.acknowledge(msg);
}

// Also set clientCnx of the consumer to null so, it avoid reconnection so, other consumer can consume for
// verification
consumer1.setClientCnx(null);
// (2) send batch-message which should not be able to consume: as broker will disconnect the consumer
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum ServerError {
ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded
ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded
ChecksumError = 9; // Error while verifying message checksum
UnsupportedVersionError = 10; // Error when an older client/version doesn't support a required feature
}

enum AuthMethod {
Expand Down

0 comments on commit 96ecc9a

Please sign in to comment.