Skip to content

Commit

Permalink
[improve] [broker] Let the producer request success at the first time…
Browse files Browse the repository at this point in the history
… if the previous is inactive
  • Loading branch information
poorbarcode committed Sep 21, 2023
1 parent 66271e3 commit 3709b39
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,15 +703,14 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

return CompletableFuture.completedFuture(producerEpoch);
return internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
Expand Down Expand Up @@ -957,15 +956,17 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit");
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand All @@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException("Producer with name '"
+ newProducer.getProducerName() + "' replace concurrency error"));
} else {
handleProducerRemoved(oldProducer);
return CompletableFuture.completedFuture(null);
}
} else {
// If a producer with the same name tries to use a new connection, async check the old connection is
// available. The producers related the connection that not available are automatically cleaned up.
if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
oldProducer.getCnx().checkConnectionLiveness();
return oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
if (previousIsActive) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
} else {
return internalAddProducer(newProducer);
}
});
}
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.vertx.core.impl.ConcurrentHashSet;
Expand All @@ -63,6 +64,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -119,6 +121,7 @@
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
Expand Down Expand Up @@ -938,7 +941,7 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
public void testHandleProducerIfPreviousOneIsActive() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
Expand All @@ -958,28 +961,96 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
// Verify the second producer will be reject due to the previous one still is active.
// Every second try once, total 10 times, all requests should fail.
ClientChannel channel2 = new ClientChannel();
AtomicBoolean channel1MonitorStopped1 = startChannelMonitorToHandleUserTask(channel);
AtomicBoolean channel1MonitorStopped2 = startChannelMonitorToHandleUserTask(channel2.channel);
setChannelConnected(channel2.serverCnx);
Awaitility.await().untilAsserted(() -> {

for (int i = 0; i < 10; i++) {
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandProducerSuccess);
triggerReceivePingOnce(channel, clientChannelHelper);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandError);
assertEquals(topicRef.getProducers().size(), 1);
});
assertTrue(channel.isActive());
Thread.sleep(1000);
}
channel1MonitorStopped1.set(true);
channel1MonitorStopped2.set(true);

// cleanup.
channel.finish();
channel2.close();
}

@Test
public void testHandleProducerAfterClientChannelInactive() throws Exception {
final String tName = successTopicName;
final long producerId = 1;
final MutableInt requestId = new MutableInt(1);
final MutableInt epoch = new MutableInt(1);
final Map<String, String> metadata = Collections.emptyMap();
final String pName = "p1";
resetChannel();
setChannelConnected();

// The producer register using the first connection.
ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel.writeInbound(cmdProducer1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

// Verify the second producer using a new connection will override the producer who using a stopped channel.
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
AtomicBoolean channel1MonitorStopped1 = startChannelMonitorToHandleUserTask(channel);
AtomicBoolean channel1MonitorStopped2 = startChannelMonitorToHandleUserTask(channel2.channel);
setChannelConnected(channel2.serverCnx);

ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(), false,
ProducerAccessMode.Shared, Optional.empty(), false);
channel2.channel.writeInbound(cmdProducer2);
triggerReceivePingOnce(channel, clientChannelHelper);
Object response2 = getResponse(channel2.channel, channel2.clientChannelHelper);
assertTrue(response2 instanceof CommandProducerSuccess);
assertEquals(topicRef.getProducers().size(), 1);
channel1MonitorStopped1.set(true);
channel1MonitorStopped2.set(true);

// cleanup.
channel.finish();
channel2.close();
}

private AtomicBoolean startChannelMonitorToHandleUserTask(EmbeddedChannel channel) {
AtomicBoolean channel1MonitorStopped = new AtomicBoolean(false);
Thread channel1Monitor = new Thread(() -> {
while (!channel1MonitorStopped.get()) {
channel.runPendingTasks();
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
}
});
channel1Monitor.start();
return channel1MonitorStopped;
}

private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(),
new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
Expand Down Expand Up @@ -2626,29 +2697,46 @@ private void setConnectionVersion(int version) throws Exception {
}

protected Object getResponse() throws Exception {
return getResponse(channel, clientChannelHelper);
return getResponse(channel, clientChannelHelper, false);
}

protected void triggerReceivePingOnce(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper)
throws Exception {
getResponse(channel, clientChannelHelper, true);
}

protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper) throws Exception {
return getResponse(channel, clientChannelHelper, false);
}

protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper clientChannelHelper,
boolean expectedPing) throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
for (int i = 0; i < iterations; i++) {
if (!channel.outboundMessages().isEmpty()) {
Object outObject = channel.outboundMessages().remove();
Object cmd = clientChannelHelper.getCommand(outObject);
if (cmd instanceof CommandPong) {
log.info("Received Pong");
continue;
}
if (cmd instanceof CommandPing) {
log.info("Received Ping");
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
if (expectedPing) {
return cmd;
} else {
continue;
}
}
channel.writeInbound(Commands.newPong());
if (expectedPing) {
return cmd;
} else {
continue;
}
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since cannot send a pong message.",
channel, future.cause());
channel.close();
}
});
continue;
}
return cmd;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
Expand Down Expand Up @@ -207,6 +209,16 @@ protected void handleEndTxnOnSubscriptionResponse(
CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
}

@Override
protected void handlePing(CommandPing ping) {
queue.offer(new CommandPing().copyFrom(ping));
}

@Override
protected void handlePong(CommandPong pong) {
queue.offer(new CommandPong().copyFrom(pong));
}
};

}

0 comments on commit 3709b39

Please sign in to comment.