Skip to content

Commit

Permalink
[improve] [broker] Let the producer request success at the first time…
Browse files Browse the repository at this point in the history
… if the previous one is inactive (apache#21220)

If a producer establishes a new connection when it is reconnecting, while the previous connection is now inactive, the initial request made on the new connection will fail.  This failure will trigger the topic of cleaning up the inactive producers.  However, upon making a second request, the producer will be able to successfully establish a connection and proceed with the operation.

Make the initial request made on the new connection success.

(cherry picked from commit 711b621)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Dec 20, 2023
1 parent 15af3d0 commit 4b0713d
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,14 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

return CompletableFuture.completedFuture(producerEpoch);
return internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
Expand Down Expand Up @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand All @@ -974,31 +975,46 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+ newProducer.getProducerName() + "' replace concurrency error"));
} else {
handleProducerRemoved(oldProducer);
return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
if (previousIsActive) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
} else {
// If the connection of the previous producer is not active, the method
// "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
// producer. So try to add current producer again.
// The recursive call will be stopped by these two case(This prevents infinite call):
// 1. add current producer success.
// 2. once another same name producer registered.
return internalAddProducer(newProducer);
}
});
}
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.vertx.core.impl.ConcurrentHashSet;
Expand All @@ -62,10 +63,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -973,7 +978,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
public void testDuplicateProducer() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
Expand All @@ -993,33 +998,131 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
// Verify the second producer will be reject due to the previous one still is active.
// Every second try once, total 10 times, all requests should fail.
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);
Awaitility.await().untilAsserted(() -> {

for (int i = 0; i < 10; i++) {
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandError);
assertEquals(topicRef.getProducers().size(), 1);
});
assertTrue(channel.isActive());
Thread.sleep(500);
}

// cleanup.
autoResponseForHeartBeat.close();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}

@Test
public void testProducerChangeSocket() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
final MutableInt epoch = new MutableInt(1);
final Map<String, String> metadata = Collections.emptyMap();
final String pName = "p1";
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel.writeInbound(cmdProducer1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);

ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandProducerSuccess);
assertEquals(topicRef.getProducers().size(), 1);

// cleanup.
channelsStoppedAnswerHealthCheck.clear();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}

/**
* When a channel typed "EmbeddedChannel", once we call channel.execute(runnable), there is no background thread
* to run it.
* So starting a background thread to trigger the tasks in the queue.
*/
private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final EmbeddedChannel channel) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
channel.runPendingTasks();
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

/**
* Auto answer `Pong` for the `Cmd-Ping`.
* Node: This will result in additional threads pop Command from the Command queue, so do not call this
* method if the channel needs to accept other Command.
*/
private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel,
ClientChannelHelper clientChannelHelper) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
tryPeekResponse(channel, clientChannelHelper);
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

@AllArgsConstructor
private static class BackGroundExecutor implements Closeable {

private ScheduledExecutorService executor;

private ScheduledFuture scheduledFuture;

@Override
public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
executor.shutdown();
}
}

private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
0,
4),
private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(),
new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
0,
4),
serverCnx);
public ClientChannel() {
serverCnx.setAuthRole("");
Expand Down Expand Up @@ -2694,6 +2797,26 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
throw new IOException("Failed to get response from socket within 10s");
}

protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) {
while (true) {
if (channel.outboundMessages().isEmpty()) {
return null;
} else {
Object outObject = channel.outboundMessages().peek();
Object cmd = clientChannelHelper.getCommand(outObject);
if (cmd instanceof CommandPing) {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeInbound(Commands.newPong());
channel.outboundMessages().remove();
continue;
}
return cmd;
}
}
}

private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
Expand Down Expand Up @@ -207,6 +209,16 @@ protected void handleEndTxnOnSubscriptionResponse(
CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
}

@Override
protected void handlePing(CommandPing ping) {
queue.offer(new CommandPing().copyFrom(ping));
}

@Override
protected void handlePong(CommandPong pong) {
return;
}
};

}

0 comments on commit 4b0713d

Please sign in to comment.