Skip to content

Commit

Permalink
[improve] [broker] Not close the socket if lookup failed caused by bu…
Browse files Browse the repository at this point in the history
…ndle unloading or metadata ex (apache#21211)

### Motivation

**Background**: The Pulsar client will close the socket if it receives a ServiceNotReady error when doing a lookup.
Closing the socket causes the other consumer or producer to reconnect and does not make the lookup more efficient.

There are two cases that should be improved:
- If the broker gets a metadata read/write error, the broker responds with a `ServiceNotReady` error, but it should respond with a `MetadataError`
- If the topic is unloading, the broker responds with a `ServiceNotReady` error.

### Modifications
- Respond to the client with a `MetadataError` if the broker gets a metadata read/write error.
- Respond to the client with a `MetadataError` if the topic is unloading

(cherry picked from commit 16349e6)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Mar 1, 2024
1 parent 8d33e37 commit bf25adf
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
Expand All @@ -48,6 +47,7 @@
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -319,35 +319,37 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
requestId, shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
}
lookupfuture.complete(
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
return null;
});
}

}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getMessage(), ex);
}

lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
return null;
});

return lookupfuture;
}

private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
final String errorMsg = unwrapEx.getMessage();
if (unwrapEx instanceof IllegalStateException) {
// Current broker still hold the bundle's lock, but the bundle is being unloading.
log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, errorMsg, requestId));
} else if (unwrapEx instanceof MetadataStoreException){
// Load bundle ownership or acquire lock failed.
// Differ with "IllegalStateException", print warning log.
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, errorMsg, requestId));
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, errorMsg, requestId));
}
}

protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace,
@Encoded String encodedTopic) {
String decodedName = Codec.decode(encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class ServiceUnitUtils {
*/
private static final String OWNER_INFO_ROOT = "/namespace";

static String path(NamespaceBundle suname) {
public static String path(NamespaceBundle suname) {
// The ephemeral node path for new namespaces should always have bundle name appended
return OWNER_INFO_ROOT + "/" + suname.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
.whenComplete((clusterDataResult, ex) -> {
if (ex != null) {
log.warn("[{}] Load cluster data failed: requested={}", clientAppId, cluster);
clusterDataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ private void checkLookupException(String tenant, String namespace, PulsarClient
.topic("persistent://" + tenant + "/" + namespace + "/1p")
.create();
} catch (PulsarClientException t) {
Assert.assertTrue(t instanceof PulsarClientException.LookupException);
Assert.assertTrue(t instanceof PulsarClientException.BrokerMetadataException
|| t instanceof PulsarClientException.LookupException);
Assert.assertTrue(
t.getMessage().contains(
"java.lang.IllegalStateException: The leader election has not yet been completed!"));
t.getMessage().contains("The leader election has not yet been completed"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -71,9 +72,13 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -87,6 +92,7 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.zookeeper.KeeperException;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
Expand Down Expand Up @@ -1105,4 +1111,101 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
return "invalid";
}
}

@Test
public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
Producer<String> producer = pulsarClientImpl.newProducer(Schema.STRING).topic(tpName).create();
Consumer<String> consumer = pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
.subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
LookupService lookupService = pulsarClientImpl.getLookup();
assertTrue(lookupService instanceof BinaryProtoLookupService);
ClientCnx lookupConnection = pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();

// Verify the socket will not be closed if the bundle is unloading.
BundleOfTopic bundleOfTopic = new BundleOfTopic(tpName);
bundleOfTopic.setBundleIsUnloading();
try {
lookupService.getBroker(TopicName.get(tpName)).get();
fail("It should failed due to the namespace bundle is unloading.");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("is being unloaded"));
}
// Do unload topic, trigger producer & consumer reconnection.
pulsar.getBrokerService().getTopic(tpName, false).join().get().close(true);
assertTrue(lookupConnection.ctx().channel().isActive());
bundleOfTopic.setBundleIsNotUnloading();
// Assert producer & consumer could reconnect successful.
producer.send("1");
HashSet<String> messagesReceived = new HashSet<>();
while (true) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messagesReceived.add(msg.getValue());
}
assertTrue(messagesReceived.contains("1"));

// Verify the socket will not be closed if get a metadata ex.
bundleOfTopic.releaseBundleLockAndMakeAcquireFail();
try {
lookupService.getBroker(TopicName.get(tpName)).get();
fail("It should failed due to the acquire bundle lock fail.");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("OperationTimeout"));
}
// Do unload topic, trigger producer & consumer reconnection.
pulsar.getBrokerService().getTopic(tpName, false).join().get().close(true);
assertTrue(lookupConnection.ctx().channel().isActive());
bundleOfTopic.makeAcquireBundleLockSuccess();
// Assert producer could reconnect successful.
producer.send("2");
while (true) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messagesReceived.add(msg.getValue());
}
assertTrue(messagesReceived.contains("2"));

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(tpName);
}

private class BundleOfTopic {

private NamespaceBundle namespaceBundle;
private OwnershipCache ownershipCache;
private AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache;

public BundleOfTopic(String tpName) {
namespaceBundle = pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
ownedBundlesCache = WhiteboxImpl.getInternalState(ownershipCache, "ownedBundlesCache");
}

private void setBundleIsUnloading() {
ownedBundlesCache.get(namespaceBundle).join().setActive(false);
}

private void setBundleIsNotUnloading() {
ownedBundlesCache.get(namespaceBundle).join().setActive(true);
}

private void releaseBundleLockAndMakeAcquireFail() throws Exception {
ownedBundlesCache.synchronous().invalidateAll();
mockZooKeeper.delete(ServiceUnitUtils.path(namespaceBundle), -1);
mockZooKeeper.setAlwaysFail(KeeperException.Code.OPERATIONTIMEOUT);
}

private void makeAcquireBundleLockSuccess() throws Exception {
mockZooKeeper.unsetAlwaysFail();
}
}
}

0 comments on commit bf25adf

Please sign in to comment.