Skip to content

Commit

Permalink
[improve] [broker] Let the producer request success at the first time…
Browse files Browse the repository at this point in the history
… if the previous one is inactive (#21220)

### Motivation

If a producer establishes a new connection when it is reconnecting, while the previous connection is now inactive, the initial request made on the new connection will fail.  This failure will trigger the topic of cleaning up the inactive producers.  However, upon making a second request, the producer will be able to successfully establish a connection and proceed with the operation.


### Modifications

Make the initial request made on the new connection success.
  • Loading branch information
poorbarcode authored Nov 2, 2023
1 parent c3f954e commit 40f94d5
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,14 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

return CompletableFuture.completedFuture(producerEpoch);
return internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
Expand Down Expand Up @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand All @@ -974,31 +975,46 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+ newProducer.getProducerName() + "' replace concurrency error"));
} else {
handleProducerRemoved(oldProducer);
return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
if (previousIsActive) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
} else {
// If the connection of the previous producer is not active, the method
// "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
// producer. So try to add current producer again.
// The recursive call will be stopped by these two case(This prevents infinite call):
// 1. add current producer success.
// 2. once another same name producer registered.
return internalAddProducer(newProducer);
}
});
}
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -981,7 +980,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
public void testDuplicateProducer() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
Expand All @@ -1001,21 +1000,74 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
// Verify the second producer will be reject due to the previous one still is active.
// Every second try once, total 10 times, all requests should fail.
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);
Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
channel.runPendingTasks();

for (int i = 0; i < 10; i++) {
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandError);
assertEquals(topicRef.getProducers().size(), 1);
});
assertTrue(channel.isActive());
Thread.sleep(500);
}

// cleanup.
autoResponseForHeartBeat.close();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}

@Test
public void testProducerChangeSocket() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
final MutableInt epoch = new MutableInt(1);
final Map<String, String> metadata = Collections.emptyMap();
final String pName = "p1";
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel.writeInbound(cmdProducer1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);

ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandProducerSuccess);
assertEquals(topicRef.getProducers().size(), 1);

// cleanup.
channelsStoppedAnswerHealthCheck.clear();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}
Expand Down Expand Up @@ -1125,6 +1177,20 @@ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final Embed
return new BackGroundExecutor(executor, scheduledFuture);
}

/**
* Auto answer `Pong` for the `Cmd-Ping`.
* Node: This will result in additional threads pop Command from the Command queue, so do not call this
* method if the channel needs to accept other Command.
*/
private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel,
ClientChannelHelper clientChannelHelper) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
tryPeekResponse(channel, clientChannelHelper);
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

@AllArgsConstructor
private static class BackGroundExecutor implements Closeable {

Expand Down Expand Up @@ -2824,6 +2890,26 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
throw new IOException("Failed to get response from socket within 10s");
}

protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) {
while (true) {
if (channel.outboundMessages().isEmpty()) {
return null;
} else {
Object outObject = channel.outboundMessages().peek();
Object cmd = clientChannelHelper.getCommand(outObject);
if (cmd instanceof CommandPing) {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeInbound(Commands.newPong());
channel.outboundMessages().remove();
continue;
}
return cmd;
}
}
}

private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
Expand Down

0 comments on commit 40f94d5

Please sign in to comment.