-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix lookup heartbeat and sla namespace bundle when using extensible load manager #21213
[fix][broker] Fix lookup heartbeat and sla namespace bundle when using extensible load manager #21213
Conversation
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
Show resolved
Hide resolved
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); | ||
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); | ||
|
||
createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heesung-sn Somehow, the creation topic might time out, so the broker will shut down. And the brokerRegistry
is not closed, the zk path exists, and the broker start will fail again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we might need to add a retry for system topic creation, and ignore the topic already exists exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do skip if it already exists. If the system topic creation fails here, k8s should restart the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why the topic creation will time out? I saw several times, but after broker restarted, it will be successful start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. Do we have some logs/strack-traces?
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Outdated
Show resolved
Hide resolved
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); | ||
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); | ||
|
||
createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do skip if it already exists. If the system topic creation fails here, k8s should restart the broker.
@@ -207,6 +210,36 @@ public Set<NamespaceBundle> getOwnedServiceUnits() { | |||
var bundle = entry.getKey(); | |||
return getNamespaceBundle(pulsar, bundle); | |||
}).collect(Collectors.toSet()); | |||
// Add heartbeat and SLA monitor namespace bundle. | |||
NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add these static bundles(bundles that are not stored in BSC) for getOwnedServiceUnits
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for getOwnedServiceUnits
it needs to know the heartbeat and SLA monitor the ownership. it just keeps the behavior the same as previously.
orphanServiceUnits.put(serviceUnit, stateData); | ||
} | ||
} else if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { | ||
if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isActiveState(state)
check is repeated in the below cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm more like to return first, so the logic can be easy to understand. Of course, I can revert this change since not relevant, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im fine with this change.
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
6586a05
to
decf7fa
Compare
decf7fa
to
a812637
Compare
Codecov Report
@@ Coverage Diff @@
## master #21213 +/- ##
=============================================
+ Coverage 36.96% 73.23% +36.27%
- Complexity 12294 32505 +20211
=============================================
Files 1698 1887 +189
Lines 130510 140223 +9713
Branches 14260 15435 +1175
=============================================
+ Hits 48240 102694 +54454
+ Misses 75982 29435 -46547
- Partials 6288 8094 +1806
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…g extensible load manager (apache#21213) (cherry picked from commit f85e0dc)
…g extensible load manager (apache#21213) (cherry picked from commit f85e0dc)
…g extensible load manager (apache#21213)
…g extensible load manager (apache#21213)
Cherry-pick by #21314 |
continue; | ||
} | ||
|
||
if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. I missed this.
This should be if(!isActiveState(state) && now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis)
. Otherwise, we will clean active states, including Owned.
…le when using extensible load manager (apache#21213) (apache#21314)
…le when using extensible load manager (apache#21213) (apache#21314)
…le when using extensible load manager (apache#21213) (apache#21314) (cherry picked from commit 0454410)
…le when using extensible load manager (apache#21213) (apache#21314) (cherry picked from commit 0454410)
Motivation
Currently, if the cluster has multiple brokers, and the cluster is doing rolling restart, the heartbeat namespace topic's lookup result might be wrong, because the
ExtensibleLoadManagerImpl
does not check the heartbeat and SLA namespace bundle lookup candidate broker to let them own by the specified broker.This ownership selection is wrong:
After the ownership assignment, the broker-0 will fail to start.
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete