Skip to content

Commit

Permalink
[fix][broker] Return getOwnerAsync without waiting on source broker u…
Browse files Browse the repository at this point in the history
…pon Assigning and Releasing and handle role change during role init (#22112)
  • Loading branch information
heesung-sn authored Feb 28, 2024
1 parent 91de98a commit 56eb758
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,13 @@ synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
initWaiter.await();
// Confirm the system topics have been created or create them if they do not exist.
// If the leader has changed, the new leader need to reset
Expand All @@ -851,6 +856,13 @@ synchronized void playLeader() {
}
}
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
playFollower();
return;
}

role = Leader;
log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());

Expand All @@ -864,8 +876,13 @@ synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
initWaiter.await();
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
Expand All @@ -885,6 +902,13 @@ synchronized void playFollower() {
}
}
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
playLeader();
return;
}

role = Follower;
log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,16 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
if (isTargetBroker(data.dstBroker())) {
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
// If this broker is not the dst broker, return the dst broker as the owner(or empty).
// Clients need to connect(redirect) to the dst broker anyway
// and wait for the dst broker to receive `Owned`.
// This is also required to return getOwnerAsync on the src broker immediately during unloading.
// Otherwise, topic creation(getOwnerAsync) could block unloading bundles,
// if the topic creation(getOwnerAsync) happens during unloading on the src broker.
return CompletableFuture.completedFuture(Optional.ofNullable(data.dstBroker()));
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
Expand Down Expand Up @@ -88,6 +90,7 @@
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand All @@ -106,6 +109,7 @@
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -167,8 +171,6 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
private LookupService lookupService;

private static void initConfig(ServiceConfiguration conf){
conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000);
conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
Expand Down Expand Up @@ -1148,7 +1150,6 @@ public void testRoleChange() throws Exception {
reset();
return null;
}).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true);

var topBundlesLoadDataStoreSecondary = (LoadDataStore<TopBundlesLoadData>)
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true);
Expand All @@ -1171,36 +1172,65 @@ public void testRoleChange() throws Exception {
reset();
return null;
}).when(topBundlesLoadDataStoreSecondarySpy).closeTableView();
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
primaryLoadManager.playFollower(); // close 3 times
primaryLoadManager.playFollower(); // close 1 time
secondaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
primaryLoadManager.playLeader(); // close 3 times and open 3 times
primaryLoadManager.playLeader(); // close 1 time and open 1 time,
secondaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
} else {
primaryLoadManager.playLeader();
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
primaryLoadManager.playFollower();
try {
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStorePrimarySpy, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStoreSecondarySpy, true);


if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
primaryLoadManager.playLeader();
secondaryLoadManager.playFollower();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
} else {
primaryLoadManager.playFollower();
secondaryLoadManager.playLeader();
verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(5)).closeTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
}

primaryLoadManager.playFollower();
secondaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
}
secondaryLoadManager.playFollower();

if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}

verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();
primaryLoadManager.playLeader();
secondaryLoadManager.playLeader();

FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
} else {
assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
FieldUtils.readDeclaredField(primaryLoadManager, "role", true));
assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
FieldUtils.readDeclaredField(secondaryLoadManager, "role", true));
}
} finally {
FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStorePrimary, true);
FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
topBundlesLoadDataStoreSecondary, true);
}
}

@Test
Expand Down Expand Up @@ -1613,6 +1643,32 @@ public void compactionScheduleTest() {
});
}

@Test(timeOut = 10 * 1000)
public void unloadTimeoutCheckTest()
throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("unload-timeout");
String topic = topicAndBundle.getLeft().toString();
var bundle = topicAndBundle.getRight().toString();
var releasing = new ServiceUnitStateData(Releasing, pulsar2.getBrokerId(), pulsar1.getBrokerId(), 1);
overrideTableView(channel1, bundle, releasing);
var topicFuture = pulsar1.getBrokerService().getOrCreateTopic(topic);


try {
topicFuture.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
log.info("getOrCreateTopic failed", e);
if (!(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException && e.getMessage()
.contains("Please redo the lookup"))) {
fail();
}
}

pulsar1.getBrokerService()
.unloadServiceUnit(topicAndBundle.getRight(), true, true, 5,
TimeUnit.SECONDS).get(2, TimeUnit.SECONDS);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,19 +487,17 @@ public void transferTestWhenDestBrokerFails()
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);

assertFalse(owner1.isDone());
assertTrue(owner1.isDone());
assertEquals(brokerId2, owner1.get().get());
assertFalse(owner2.isDone());

assertEquals(1, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests1.size());
assertEquals(1, getOwnerRequests2.size());

// In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));

assertEquals(0, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests2.size());

// recovered, check the monitor update state : Assigned -> Owned
Expand Down Expand Up @@ -1136,12 +1134,10 @@ public void assignTestWhenDestBrokerProducerFails()
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);

assertFalse(owner1.isDone());
assertTrue(owner1.isDone());
assertFalse(owner2.isDone());

// In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));

Expand Down Expand Up @@ -1320,6 +1316,68 @@ public void testIsOwner() throws IllegalAccessException {
assertFalse(channel1.isOwner(bundle));
}

@Test(priority = 15)
public void testGetOwnerAsync() throws Exception {

overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId1, 1));
var owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId1, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId2, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId2, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(!owner.isDone());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId2, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId2, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(!owner.isDone());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, brokerId2, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId2, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(Optional.empty(), owner.get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId1, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId2, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(brokerId2, owner.get().get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(Optional.empty(), owner.get());

overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, brokerId1, 1));
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertTrue(owner.isCompletedExceptionally());

overrideTableView(channel1, bundle, null);
owner = channel1.getOwnerAsync(bundle);
assertTrue(owner.isDone());
assertEquals(Optional.empty(), owner.get());
}

@Test(priority = 16)
public void splitAndRetryFailureTest() throws Exception {
channel1.publishAssignEventAsync(bundle3, brokerId1);
Expand Down Expand Up @@ -1778,7 +1836,8 @@ private void overrideTableViews(String serviceUnit, ServiceUnitStateData val) th
overrideTableView(channel2, serviceUnit, val);
}

private static void overrideTableView(ServiceUnitStateChannel channel, String serviceUnit, ServiceUnitStateData val)
@Test(enabled = false)
public static void overrideTableView(ServiceUnitStateChannel channel, String serviceUnit, ServiceUnitStateData val)
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
FieldUtils.readField(channel, "tableview", true);
Expand Down
Loading

0 comments on commit 56eb758

Please sign in to comment.