Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] check if the owner is active when returning ownership (ExtensibleLoadManager) #58

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,27 @@ public boolean isOwner(String serviceUnit) {
return isOwner(serviceUnit, lookupServiceAddress);
}

private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
Expand All @@ -498,18 +519,13 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
case Splitting -> {
return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
}).thenApply(
broker -> broker == null ? Optional.empty() : Optional.of(broker));
return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -812,9 +828,14 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit, true)
: CompletableFuture.completedFuture(0), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down Expand Up @@ -1202,38 +1223,43 @@ private void scheduleCleanup(String broker, long delayInSecs) {


private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
String selectedBroker,
Optional<String> selectedBroker,
String inactiveBroker) {


if (selectedBroker.isEmpty()) {
return new ServiceUnitStateData(Free, null, inactiveBroker,
true, getNextVersionId(orphanData));
}

if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
true, getNextVersionId(orphanData));
}
}

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isPresent()) {
var override = getOverrideInactiveBrokerStateData(
orphanData, selectedBroker.get(), inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
} else {
log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. "
if (selectedBroker.isEmpty()) {
log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
}
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
}

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
Expand Down Expand Up @@ -1335,7 +1361,7 @@ private synchronized void doCleanup(String broker) {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());

}
Expand Down Expand Up @@ -1524,7 +1550,7 @@ protected void monitorOwnerships(List<String> brokers) {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.expectThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -1560,6 +1561,80 @@ public void testOverrideOrphanStateData()
cleanTableViews();
}

@Test(priority = 19)
public void testActiveGetOwner() throws Exception {


// set the bundle owner is the broker
String broker = lookupServiceAddress2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);


// verify getOwnerAsync times out because the owner is inactive now.
long start = System.currentTimeMillis();
var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
assertTrue(ex.getCause() instanceof TimeoutException);
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup(no selected owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any());
var leaderChannel = channel1;
String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader1, leader2);
if (leader1.equals(lookupServiceAddress2)) {
leaderChannel = channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);

// simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
.when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
getCleanupJobs(leaderChannel).clear();
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
start = System.currentTimeMillis();
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
assertTrue(System.currentTimeMillis() - start < 20_000);

// test clean-up
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", registry , true);
cleanTableViews();

}

private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ public void startBroker() {
brokerContainer.start();
}
});
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck";
Awaitility.await().atMost(120, TimeUnit.SECONDS).ignoreExceptions().until(
() -> {
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(
brokerContainer.getHttpServiceUrl()).build()) {
if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
return false;
}
try {
admin.topics().createPartitionedTopic(topicName, 10);
} catch (PulsarAdminException.ConflictException e) {
// expected
}
admin.lookups().lookupPartitionedTopic(topicName);
}
}
return true;
}
);
}
}

Expand Down Expand Up @@ -243,7 +263,7 @@ public void testDeleteNamespace() throws Exception {
assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 120 * 1000)
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic";

Expand All @@ -259,9 +279,11 @@ public void testStopBroker() throws Exception {
}
}

String broker1 = admin.lookups().lookupTopic(topicName);
Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String broker1 = admin.lookups().lookupTopic(topicName);
assertNotEquals(broker1, broker);
});

assertNotEquals(broker1, broker);
}

@Test(timeOut = 80 * 1000)
Expand Down Expand Up @@ -309,7 +331,7 @@ public void testIsolationPolicy() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
Expand Down Expand Up @@ -350,14 +372,14 @@ public void testIsolationPolicy() throws Exception {
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});
Expand All @@ -369,7 +391,7 @@ public void testIsolationPolicy() throws Exception {
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
Expand All @@ -380,8 +402,7 @@ public void testIsolationPolicy() throws Exception {
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle");
}
}

Expand Down
Loading