Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Brokers spuriously error in response to the PRODUCER command for newly added partitions to a topic #23451

Open
3 tasks done
dwang-qm opened this issue Oct 14, 2024 · 9 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@dwang-qm
Copy link

dwang-qm commented Oct 14, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Broker: Pulsar 3.1.2
Client: Pulsar v3.4.2

Using Zookeeper as the metadata store.

Minimal reproduce step

  1. Connect to a partitioned topic with the C++ client
  2. Update the topic to add a new partition.
  3. The C++ client will attempt to create producers for the new partitions. These will often fail.

What did you expect to see?

Producers successfully created.

What did you see instead?

In the broker logs, the "Illegal topic partition name" error message.

Anything else?

I believe the issue is that when the broker responds to a PRODUCER command, it calls ServerCnx:: handleProducer, which calls BrokerService::getOrCreateTopic, which calls BrokerService::getTopic, which calls BrokerService::fetchPartitionedTopicMetadataAsync(TopicName topicName), which calls BrokerService::fetchPartitionedTopicMetadataAsync(TopicName topicName, boolean refreshCacheAndGet), with refreshCacheAndGet set to false. This means that NamespaceResources:: getPartitionedTopicMetadataAsync is called with refresh always false, which means that getAsync is called on NamespaceResources rather than refreshAndGetAsync. This means that the sync call on Zookeeper is not called before performing the read.

According to the Zookeeper Programmer Guide (https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html), "ZooKeeper does not guarantee that at every instance in time, two different clients will have identical views of ZooKeeper data. Due to factors like network delays, one client may perform an update before another client gets notified of the change. Consider the scenario of two clients, A and B. If client A sets the value of a znode /a from 0 to 1, then tells client B to read /a, client B may read the old value of 0, depending on which server it is connected to. If it is important that Client A and Client B read the same value, Client B should should call the sync() method from the ZooKeeper API method before it performs its read."

This seems to indicate that without doing the sync, the broker could get an out of date picture of the number of partitions the topic has, resulting in spuriously erroring. I believe that reading the partition metadata for handling the PRODUCER command should use authoritative reads (calling sync before performing the Zookeeper reads).

This bug filed against pulsar-cpp-client may be related: apache/pulsar-client-cpp#319. The pulsar-cpp-client developers seem to have mitigated it by adding retries, but the broker-side failure should still not happen.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@dwang-qm dwang-qm added the type/bug The PR fixed a bug or issue reported a bug label Oct 14, 2024
@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

Broker: Pulsar 3.1.2

@dwang-qm Pulsar 3.1.x is not supported. Please re-test with a supported version. That is currently Pulsar broker 3.0.7 or 3.3.2 . Thanks!

@dwang-qm
Copy link
Author

That's the version on our production environment. It would take a significant investment of hours to setup an environment to reproduce this in, since this appears to be a race condition. My observations on the cause of the bug are valid for current master since I reviewed the Pulsar source code. Could you at least tell me if my reasoning is sound (under "Anything else?") before I invest a large amount of time reproducing it? Thanks!

@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

That's the version on our production environment. It would take a significant investment of hours to setup an environment to reproduce this in, since this appears to be a race condition. My observations on the cause of the bug are valid for current master since I reviewed the Pulsar source code. Could you at least tell me if my reasoning is sound (under "Anything else?") before I invest a large amount of time reproducing it? Thanks!

Thanks, @dwang-qm. That's understandable. In any case, thanks for reporting the issue, even it if it's for a version that the OSS project doesn't support.

In the OSS Project, we request reporters at least to try to test the most recent released versions to see if the problem is already fixed. In many cases problems are fixed in supported versions. Isolating the issue will be a useful contribution to the project and after isolating an issue, the fix is usually very easy to implement.

@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

I believe the issue is that when the broker responds to a PRODUCER command, it calls ServerCnx:: handleProducer, which calls BrokerService::getOrCreateTopic, which calls BrokerService::getTopic, which calls BrokerService::fetchPartitionedTopicMetadataAsync(TopicName topicName), which calls BrokerService::fetchPartitionedTopicMetadataAsync(TopicName topicName, boolean refreshCacheAndGet), with refreshCacheAndGet set to false. This means that NamespaceResources:: getPartitionedTopicMetadataAsync is called with refresh always false, which means that getAsync is called on NamespaceResources rather than refreshAndGetAsync. This means that the sync call on Zookeeper is not called before performing the read.

Good observations @dwang-qm!
There have been multiple issues in the past with topic metadata handling due to the undefined consistency model.
2 years ago I wrote a blog post about this and the "Metadata consistency issues from user’s point of view" part describes the state of problems at that time.
For topic creation, deletion and partition modification, it might still be the case that operations aren't strongly consistent as you have observed. The current workarounds are around retries.
Some improvements have been made over time in individual use cases, such as by using the ZK sync support, it was introduced in #18518.
I guess solving the metadata inconsistencies should start by defining the consistency model. It's currently eventually consistent for many cases which is surprising for many users since it's not well defined.

@dwang-qm
Copy link
Author

Thanks for the response! When Pulsar 4.0 is released, we will upgrade to it and see if we can observe the issue. In the meantime, we're going to update the client, since it looks similar to the issue filed against the client I referenced. I did see many retries in the logs, albeit against the same broker, within a short period of time, and they all failed. Unfortunately, the application code is not in control of any retries necessary due to "eventual consistency," since the client tries to create the new producer automatically after it periodically polls and detects new partitions have been added.

If you don't mind answering, I noticed that the reason that the client was trying to talk to the broker to create the producer in the first place is because it got the broker (in my case pulsar-broker-35) from a LOOKUP it did earlier. If so, that's confusing because it means it thinks that broker owns the topic (in my case named xxxx-partition-1), but wouldn't that broker have to actively claim the topic somehow, in which case it should've had the correct metadata to see that there was now more than one partition? See, xxxx is originally a partitioned topic with only one partition (so with only xxxx-partition-0 as a valid topic name), but after the topic metadata update, the C++ client looked up the new partitions and was told to connect to xxxx-partition-1 on pulsar-broker-35. But pulsar-broker-35 thinks there's only one partition in the topic. How could it when it seems to have claimed xxxx-partition-1?

@lhotari
Copy link
Member

lhotari commented Oct 15, 2024

the first place is because it got the broker (in my case pulsar-broker-35) from a LOOKUP it did earlier. If so, that's confusing because it means it thinks that broker owns the topic (in my case named xxxx-partition-1), but wouldn't that broker have to actively claim the topic somehow, in which case it should've had the correct metadata to see that there was now more than one partition? See, xxxx is originally a partitioned topic with only one partition (so with only xxxx-partition-0 as a valid topic name), but after the topic metadata update, the C++ client looked up the new partitions and was told to connect to xxxx-partition-1 on pulsar-broker-35. But pulsar-broker-35 thinks there's only one partition in the topic. How could it when it seems to have claimed xxxx-partition-1?

The default lookup logic is in code in NamespaceService.findBrokerServiceUrl and searchForCandidateBroker methods:

private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
NamespaceBundle bundle, LookupOptions options) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
}
Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
targetMap = findingBundlesNotAuthoritative;
}
return targetMap.computeIfAbsent(bundle, (k) -> {
CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>();
// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
if (nsData.isEmpty()) {
// No one owns this bundle
if (options.isReadOnly()) {
// Do not attempt to acquire ownership
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> searchForCandidateBroker(bundle, future, options));
}
} else if (nsData.get().isDisabled()) {
future.completeExceptionally(
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
// find the target
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener =
nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
future.completeExceptionally(
new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
future.complete(Optional.of(new LookupResult(nsData.get(),
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
}
} else {
future.complete(Optional.of(new LookupResult(nsData.get())));
}
}
}).exceptionally(exception -> {
LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
future.completeExceptionally(exception);
return null;
});
future.whenComplete((r, t) -> pulsar.getExecutor().execute(
() -> targetMap.remove(bundle)
));
return future;
});
}

private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
String candidateBroker;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", bundle);
lookupFuture.completeExceptionally(
new IllegalStateException("The leader election has not yet been completed!"));
return;
}
boolean authoritativeRedirect = les.isLeader();
try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
CompletableFuture.completedFuture(isBrokerActive(cb)))
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
if (candidateBroker == null) {
Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getBrokerId();
} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getBrokerId());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (currentLeader.isEmpty()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (availableBroker.isEmpty()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getBrokerId();
}
}
}
} catch (Exception e) {
LOG.warn("Error when searching for candidate broker to acquire {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
return;
}
try {
Objects.requireNonNull(candidateBroker);
if (candidateBroker.equals(pulsar.getBrokerId())) {
// Load manager decided that the local broker should try to become the owner
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
if (ownerInfo.isDisabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} is currently being unloaded", bundle);
}
lookupFuture.completeExceptionally(new IllegalStateException(
String.format("Namespace bundle %s is currently being unloaded", bundle)));
} else {
// Found owner for the namespace bundle
if (options.isLoadTopicsInBundle()) {
// Schedule the task to preload topics
pulsar.loadNamespaceTopics(bundle);
}
// find the target
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener =
ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
lookupFuture.completeExceptionally(
new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(Optional.of(
new LookupResult(ownerInfo,
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
}
} else {
lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
}
}
}).exceptionally(exception -> {
LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", bundle, exception);
lookupFuture.completeExceptionally(new PulsarServerException(
"Failed to acquire ownership for namespace bundle " + bundle, exception));
return null;
});
} else {
// Load managed decider some other broker should try to acquire ownership
if (LOG.isDebugEnabled()) {
LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", candidateBroker, bundle);
}
// Now setting the redirect url
createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
}
} catch (Exception e) {
LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
}
}

Then the client makes a lookup request (CommandLookupTopic in proto) with the authoritative flag, the target broker will attempt to claim the ownership of the topic without doing lookups. That's why the solution tolerates eventual consistency.
Each lookup could get redirected multiple times until it hits the target broker. The target broker will also process the lookup request and when it's authoritative, it will attempt to claim the ownership.
The partitioned topic metadata is a completely separate concern in Pulsar. The client needs that information for subscribing to multi-partitioned topics. The broker itself doesn't need that information to serve topics since an individual partition of a multi-partitioned topic behaves completely independently. I hope this explanation makes sense.

@dwang-qm
Copy link
Author

dwang-qm commented Oct 16, 2024

Thank you for the explanation and source code snippets! After reviewing them, I think I understand.

Suppose, there's a multi-partition topic called xxxx, originally with only one partition (so only xxxx-partition-0 exists). However, when an admin adds another partition, the topic metadata gets updated. The Pulsar client rechecks the topic metadata on an interval and when it discovers the new partition, it tries to create a producer to connect to the new partition (xxxx-partition-1). It first needs to perform a lookup of the broker service url. So it contacts a broker it already is connected to, say pulsar-broker-0, which eventually calls findBrokerServiceUrl. It finds that pulsar-broker-35 is actually the best suited by searchForCandidateBroker. That's not pulsar-broker-0, so it returns a redirect response to the client. The Pulsar client then contacts pulsar-broker-35 and repeats the lookup. pulsar-broker-35 eventually calls findBrokerServiceUrl, finds itself the best candidate with searchForCandidateBroker, and takes ownership of the topic. All good so far.

Then, the client sends the PRODUCER command to pulsar-broker-35, which does have ownership of the topic, which it will check near the top of BrokerService::loadOrCreatePersistentTopic. However, before it can do that, it performs that fetchPartitionedTopicMetadataAsync call and then checks if topicName.getPartitionIndex() < metadata.partitions. As you say, they're separate systems, and pulsar-broker-35 may in fact have acquired ownership of the topic without having an updated view of the partitioned topic metadata from Zookeeper. As you say, the findBrokerServiceUrl would not even be aware that xxxx-partition-1 is a partitioned topic and would never have had to check the special partitioned topic metadata of xxxx to acquire ownership of xxxx-partition-1.

Do you think this could happen? I think just changing

return fetchPartitionedTopicMetadataAsync(topicNameEntity)

to return fetchPartitionedTopicMetadataAsync(topicNameEntity, true) would solve the issue. Do you agree?

The issue with just retrying is that I have observed retries against the same broker (necessarily! since it owns the topic) over the course of seconds and it still failing because it remains unsynced with zookeeper. Of course, it eventually succeeds, but seconds of delays causes an extreme build-up in our ingestion pipeline and stuff starts failing.

Thank you again for your continued attention and responses!

@lhotari
Copy link
Member

lhotari commented Oct 16, 2024

to return fetchPartitionedTopicMetadataAsync(topicNameEntity, true) would solve the issue. Do you agree?

@dwang-qm exactly. Well done!
The main concern of this change is the chances of performance regressions. That's why it would need more thought. One possibility would be to have a policy level setting for this where this could be configured at the broker level, namespace level or topic level. That would avoid performance regressions since users facing these issues or having such use cases could change it to use sync when it's necessary for the use case.

@lhotari
Copy link
Member

lhotari commented Oct 17, 2024

Minimal reproduce step

  1. Connect to a partitioned topic with the C++ client
  2. Update the topic to add a new partition.
  3. The C++ client will attempt to create producers for the new partitions. These will often fail.

What did you expect to see?

Producers successfully created.

What did you see instead?

In the broker logs, the "Illegal topic partition name" error message.

@dwang-qm one more clarification: is the main negative impact the broker logs? Does the creation of the producer from the client perspective succeed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants