Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Let the producer request success at the first time if the previous one is inactive #21220

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,14 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

return CompletableFuture.completedFuture(producerEpoch);
return internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
Expand Down Expand Up @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand All @@ -974,31 +975,46 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+ newProducer.getProducerName() + "' replace concurrency error"));
} else {
handleProducerRemoved(oldProducer);
return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
if (previousIsActive) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
} else {
// If the connection of the previous producer is not active, the method
// "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
// producer. So try to add current producer again.
// The recursive call will be stopped by these two case(This prevents infinite call):
// 1. add current producer success.
// 2. once another same name producer registered.
return internalAddProducer(newProducer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a comment here to explain why we should take a recursive call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it also should be BUG before, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a comment here to explain why we should take a recursive call.

Done

And it also should be BUG before, right?

Yes, the issue is that the producer creation will fail if a same-name producer has already closed on the client side.

}
});
}
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -981,7 +980,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
public void testDuplicateProducer() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
Expand All @@ -1001,21 +1000,74 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
// Verify the second producer will be reject due to the previous one still is active.
// Every second try once, total 10 times, all requests should fail.
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);
Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
channel.runPendingTasks();

for (int i = 0; i < 10; i++) {
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandError);
assertEquals(topicRef.getProducers().size(), 1);
});
assertTrue(channel.isActive());
Thread.sleep(500);
}

// cleanup.
autoResponseForHeartBeat.close();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}

@Test
public void testProducerChangeSocket() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
final MutableInt epoch = new MutableInt(1);
final Map<String, String> metadata = Collections.emptyMap();
final String pName = "p1";
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel.writeInbound(cmdProducer1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);

ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandProducerSuccess);
assertEquals(topicRef.getProducers().size(), 1);

// cleanup.
channelsStoppedAnswerHealthCheck.clear();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}
Expand Down Expand Up @@ -1125,6 +1177,20 @@ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final Embed
return new BackGroundExecutor(executor, scheduledFuture);
}

/**
* Auto answer `Pong` for the `Cmd-Ping`.
* Node: This will result in additional threads pop Command from the Command queue, so do not call this
* method if the channel needs to accept other Command.
*/
private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel,
ClientChannelHelper clientChannelHelper) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
tryPeekResponse(channel, clientChannelHelper);
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

@AllArgsConstructor
private static class BackGroundExecutor implements Closeable {

Expand Down Expand Up @@ -2824,6 +2890,26 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
throw new IOException("Failed to get response from socket within 10s");
}

protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) {
while (true) {
if (channel.outboundMessages().isEmpty()) {
return null;
} else {
Object outObject = channel.outboundMessages().peek();
Object cmd = clientChannelHelper.getCommand(outObject);
if (cmd instanceof CommandPing) {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeInbound(Commands.newPong());
channel.outboundMessages().remove();
continue;
}
return cmd;
}
}
}

private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
Expand Down
Loading