Skip to content

Commit

Permalink
[fix] [broker] Make the new exclusive consumer instead the inactive o…
Browse files Browse the repository at this point in the history
…ne faster (#21183)

### Motivation

There is an issue similar to the #21155 fixed one.

The client assumed the connection was inactive, but the Broker assumed the connection was fine. The Client tried to  use a new connection to reconnect an exclusive consumer, then got an error `Exclusive consumer is already connected`

### Modifications

- Check the connection of the old consumer is available when the new one tries to subscribe
  • Loading branch information
poorbarcode authored Oct 30, 2023
1 parent 113d9f6 commit 29db8f8
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,22 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
}

if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected"));
Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (actConsumer != null) {
return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> {
if (actConsumerStillAlive == null || actConsumerStillAlive) {
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already"
+ " connected"));
} else {
return addConsumer(consumer);
}
});
} else {
// It should never happen.

return FutureUtil.failedFuture(new ConsumerBusyException("Active consumer is in a strange state."
+ " Active consumer is null, but there are " + consumers.size() + " registered."));
}
}

if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public void setup() throws Exception {
doReturn(eventLoopGroup.next()).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();

NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
Expand Down Expand Up @@ -693,7 +694,15 @@ public void testSubscribeUnsubscribe() throws Exception {
f1.get();

// 2. duplicate subscribe
Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
CommandSubscribe cmd2 = new CommandSubscribe()
.setConsumerId(2)
.setTopic(successTopicName)
.setSubscription(successSubName)
.setConsumerName("consumer-name")
.setReadCompacted(false)
.setRequestId(2)
.setSubType(SubType.Exclusive);
Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
try {
f2.get();
fail("should fail with exception");
Expand Down Expand Up @@ -758,19 +767,11 @@ public void testAddRemoveConsumer() throws Exception {
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());

// 2. duplicate add consumer
try {
sub.addConsumer(consumer).get();
fail("Should fail with ConsumerBusyException");
} catch (Exception e) {
assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
}

// 3. simple remove consumer
// 2. simple remove consumer
sub.removeConsumer(consumer);
assertFalse(sub.getDispatcher().isConsumerConnected());

// 4. duplicate remove consumer
// 3. duplicate remove consumer
try {
sub.removeConsumer(consumer);
fail("Should fail with ServerMetadataException");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.vertx.core.impl.ConcurrentHashSet;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -62,10 +64,15 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -99,6 +106,7 @@
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
Expand Down Expand Up @@ -997,7 +1005,8 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
channel.runPendingTasks();
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
Expand All @@ -1011,10 +1020,132 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
channel2.close();
}

@Test
public void testHandleConsumerAfterClientChannelInactive() throws Exception {
final String tName = successTopicName;
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
SubType.Exclusive, 0, cName1, 0);
channel.writeInbound(cmdSubscribe1);
assertTrue(getResponse() instanceof CommandSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertNotNull(topicRef.getSubscription(sName).getConsumers());
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1);

// Verify the second producer using a new connection will override the consumer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
channel2.channel.writeInbound(cmdSubscribe2);
BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandSuccess);
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName2);
backGroundExecutor.close();

// cleanup.
channel.finish();
channel2.close();
}

@Test
public void test2ndSubFailedIfDisabledConCheck()
throws Exception {
final String tName = successTopicName;
final long consumerId = 1;
final MutableInt requestId = new MutableInt(1);
final String sName = successSubName;
final String cName1 = ConsumerName.generateRandomName();
final String cName2 = ConsumerName.generateRandomName();
// Disabled connection check.
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1);
resetChannel();
setChannelConnected();

// The consumer register using the first connection.
ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
SubType.Exclusive, 0, cName1, 0);
channel.writeInbound(cmdSubscribe1);
assertTrue(getResponse() instanceof CommandSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).orElse(null);
assertNotNull(topicRef);
assertNotNull(topicRef.getSubscription(sName).getConsumers());
assertEquals(topicRef.getSubscription(sName).getConsumers().stream().map(Consumer::consumerName)
.collect(Collectors.toList()), Collections.singletonList(cName1));

// Verify the consumer using a new connection will override the consumer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(),
CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
channel2.channel.writeInbound(cmdSubscribe2);
BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

// Since the feature "ConnectionLiveness" has been disabled, the fix
// by https://github.com/apache/pulsar/pull/21183 will not be affected, so the client will still get an error.
Object responseOfConnection2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(responseOfConnection2 instanceof CommandError);
assertTrue(((CommandError) responseOfConnection2).getMessage()
.contains("Exclusive consumer is already connected"));
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1);
backGroundExecutor.close();

// cleanup.
channel.finish();
channel2.close();
// Reset configuration.
pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000);
}

/**
* When a channel typed "EmbeddedChannel", once we call channel.execute(runnable), there is no background thread
* to run it.
* So starting a background thread to trigger the tasks in the queue.
*/
private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final EmbeddedChannel channel) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
channel.runPendingTasks();
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

@AllArgsConstructor
private static class BackGroundExecutor implements Closeable {

private ScheduledExecutorService executor;

private ScheduledFuture scheduledFuture;

@Override
public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
executor.shutdown();
}
}

private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(),
new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
Expand Down Expand Up @@ -1810,9 +1941,11 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);

BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel);

// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
successSubName, 2 /* consumer id */, 2 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);

Expand All @@ -1822,6 +1955,9 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ConsumerBusy);
});

// cleanup.
backGroundExecutor.close();
channel.finish();
}

Expand Down Expand Up @@ -2676,13 +2812,7 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since cannot send a pong message.",
channel, future.cause());
channel.close();
}
});
channel.writeInbound(Commands.newPong());
continue;
}
return cmd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
Expand Down Expand Up @@ -207,6 +209,16 @@ protected void handleEndTxnOnSubscriptionResponse(
CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
}

@Override
protected void handlePing(CommandPing ping) {
queue.offer(new CommandPing().copyFrom(ping));
}

@Override
protected void handlePong(CommandPong pong) {
return;
}
};

}

0 comments on commit 29db8f8

Please sign in to comment.