Skip to content

Commit

Permalink
[fix][broker] fix the error that elected leader thinks it's a follower.
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Aug 7, 2024
1 parent 10f4e02 commit 52fa676
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ private synchronized CompletableFuture<LeaderElectionState> handleExistingLeader
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
log.info("Keeping the existing value {} for {} as it's fom the same session stat={}", existingValue,
path, res.getStat());
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
return CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
log.info("Conditionally deleting existing equals value {} for {} because it's not created in the "
+ "current session. stat={}", existingValue, path, res.getStat());
Expand Down Expand Up @@ -271,7 +274,13 @@ public synchronized CompletableFuture<Void> asyncClose() {
return CompletableFuture.completedFuture(null);
}

return store.delete(path, version);
return store.delete(path, version)
.thenAccept(__ -> {
synchronized (LeaderElectionImpl.this) {
leaderElectionState = LeaderElectionState.NoLeader;
}
}
);
}

@Override
Expand All @@ -292,8 +301,8 @@ public Optional<T> getLeaderValueIfPresent() {
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}, event:{}", path, event);
return elect().thenAccept(leaderState -> {
log.info("Resynced leadership for {} - State: {}", path, leaderState);
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public void basicTest(String provider, Supplier<String> urlSupplier) throws Exce

leaderElection.close();

assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);

assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
Expand Down Expand Up @@ -180,4 +181,58 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}


@Test
public void testElectAfterReconnected() throws Exception {
// --- init
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis(2_000)
.build());


BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
store.registerSessionListener(sessionEvents::add);
BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
String path = newKey();

@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
leaderElectionEvents::add);

// --- test manual elect
String proposed = "value-1";
le1.elect(proposed).join();
assertEquals(le1.getState(), LeaderElectionState.Leading);
LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(les, LeaderElectionState.Leading);


// simulate no leader state
FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);

// reconnect
zks.stop();

SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.ConnectionLost);

zks.start();


// --- test le1 can be leader
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> {
assertEquals(le1.getState(),LeaderElectionState.Leading);
}); // reacquire leadership


assertTrue(store.get(path).join().isPresent());
}
}

0 comments on commit 52fa676

Please sign in to comment.