Skip to content

Commit

Permalink
[improve] [broker] Let the producer request success at the first time…
Browse files Browse the repository at this point in the history
… if the previous is inactive
  • Loading branch information
poorbarcode committed Oct 31, 2023
1 parent bc84721 commit 8710eb5
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,14 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

return CompletableFuture.completedFuture(producerEpoch);
return internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
Expand Down Expand Up @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand All @@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+ newProducer.getProducerName() + "' replace concurrency error"));
} else {
handleProducerRemoved(oldProducer);
return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
if (previousIsActive) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
} else {
return internalAddProducer(newProducer);
}
});
}
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -981,7 +980,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
public void testDuplicateProducer() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
Expand All @@ -1001,21 +1000,74 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
// Verify the second producer will be reject due to the previous one still is active.
// Every second try once, total 10 times, all requests should fail.
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor autoResponseForHeartBeat = autoResponseForHeartBeat(channel, clientChannelHelper);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);
Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
channel.runPendingTasks();

for (int i = 0; i < 10; i++) {
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandError);
assertEquals(topicRef.getProducers().size(), 1);
});
assertTrue(channel.isActive());
Thread.sleep(500);
}

// cleanup.
autoResponseForHeartBeat.close();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}

@Test
public void testProducerChangeSocket() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
final MutableInt epoch = new MutableInt(1);
final Map<String, String> metadata = Collections.emptyMap();
final String pName = "p1";
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel.writeInbound(cmdProducer1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
BackGroundExecutor backGroundExecutor1 = startBackgroundExecutorForEmbeddedChannel(channel);
BackGroundExecutor backGroundExecutor2 = startBackgroundExecutorForEmbeddedChannel(channel2.channel);
setChannelConnected(channel2.serverCnx);

ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandProducerSuccess);
assertEquals(topicRef.getProducers().size(), 1);

// cleanup.
channelsStoppedAnswerHealthCheck.clear();
backGroundExecutor1.close();
backGroundExecutor2.close();
channel.finish();
channel2.close();
}
Expand Down Expand Up @@ -1125,6 +1177,20 @@ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final Embed
return new BackGroundExecutor(executor, scheduledFuture);
}

/**
* Auto answer `Pong` for the `Cmd-Ping`.
* Node: This will result in additional threads pop Command from the Command queue, so do not call this
* method if the channel needs to accept other Command.
*/
private BackGroundExecutor autoResponseForHeartBeat(EmbeddedChannel channel,
ClientChannelHelper clientChannelHelper) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> {
tryPeekResponse(channel, clientChannelHelper);
}, 100, 100, TimeUnit.MILLISECONDS);
return new BackGroundExecutor(executor, scheduledFuture);
}

@AllArgsConstructor
private static class BackGroundExecutor implements Closeable {

Expand Down Expand Up @@ -2800,6 +2866,15 @@ protected Object getResponse() throws Exception {
return getResponse(channel, clientChannelHelper);
}

// protected void triggerReceivePingOnce(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper)
// throws Exception {
// getResponse(channel, clientChannelHelper, true);
// }

// protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) throws Exception {
// return getResponse(channel, clientChannelHelper);
// }

protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
Expand All @@ -2824,6 +2899,26 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client
throw new IOException("Failed to get response from socket within 10s");
}

protected Object tryPeekResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) {
while (true) {
if (channel.outboundMessages().isEmpty()) {
return null;
} else {
Object outObject = channel.outboundMessages().peek();
Object cmd = clientChannelHelper.getCommand(outObject);
if (cmd instanceof CommandPing) {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
channel.writeInbound(Commands.newPong());
channel.outboundMessages().remove();
continue;
}
return cmd;
}
}
}

private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
Expand Down

0 comments on commit 8710eb5

Please sign in to comment.