Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] check if the owner is active when returning ownership (ExtensibleLoadManager) #57

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,30 @@ public boolean isOwner(String serviceUnit) {
return isOwner(serviceUnit, lookupServiceAddress);
}

private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());


return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(
ownerAfterDeferred -> ownerAfterDeferred == null ? Optional.empty()
: Optional.of(ownerAfterDeferred))))
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
Expand All @@ -498,18 +522,13 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
case Splitting -> {
return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
}).thenApply(
broker -> broker == null ? Optional.empty() : Optional.of(broker));
return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -1560,6 +1561,73 @@ public void testOverrideOrphanStateData()
cleanTableViews();
}

@Test(priority = 19)
public void testActiveGetOwner()
throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {


// set the bundle owner is the broker
String broker = lookupServiceAddress2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);


// verify getOwnerAsync times out because the owner is inactive now.
long start = System.currentTimeMillis();
try {
channel1.getOwnerAsync(bundle).get();
fail();
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
// expected
} else {
fail();
}
}
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
.when(loadManager).selectAsync(any(), any());
var leaderChannel = channel1;
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader, leader2);
if (leader.equals(lookupServiceAddress2)) {
leaderChannel = channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns without timeout
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 10 * 1000, true);
start = System.currentTimeMillis();
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
assertTrue(System.currentTimeMillis() - start < 10_000);

// test clean-up
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", registry , true);
cleanTableViews();

}

private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
Expand Down
Loading