Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] fix the error that elected leader thinks it's a follower. #77

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ private synchronized CompletableFuture<LeaderElectionState> handleExistingLeader
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue,
path, res.getStat());
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
return CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
log.info("Conditionally deleting existing equals value {} for {} because it's not created in the "
+ "current session. stat={}", existingValue, path, res.getStat());
Expand Down Expand Up @@ -271,7 +274,13 @@ public synchronized CompletableFuture<Void> asyncClose() {
return CompletableFuture.completedFuture(null);
}

return store.delete(path, version);
return store.delete(path, version)
.thenAccept(__ -> {
synchronized (LeaderElectionImpl.this) {
leaderElectionState = LeaderElectionState.NoLeader;
}
}
);
}

@Override
Expand All @@ -292,8 +301,8 @@ public Optional<T> getLeaderValueIfPresent() {
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}, event:{}", path, event);
return elect().thenAccept(leaderState -> {
log.info("Resynced leadership for {} - State: {}", path, leaderState);
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@

leaderElection.close();

assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);

assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
}

Expand Down Expand Up @@ -209,7 +211,7 @@
LeaderElectionState les = le.elect("test-2").join();
assertEquals(les, LeaderElectionState.Leading);
assertEquals(le.getLeaderValue().join(), Optional.of("test-2"));
assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-2"), Optional.empty());

Check failure on line 214 in pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Pulsar Metadata

LeaderElectionTest.revalidateLeaderWithinSameSession

expected [Optional[test-2]] but found [Optional.empty]
}

@Test(dataProvider = "impl")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
Expand Down Expand Up @@ -180,4 +181,58 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}


@Test
public void testElectAfterReconnected() throws Exception {
// --- init
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis(2_000)
.build());


BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
store.registerSessionListener(sessionEvents::add);
BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
String path = newKey();

@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
leaderElectionEvents::add);

// --- test manual elect
String proposed = "value-1";
le1.elect(proposed).join();
assertEquals(le1.getState(), LeaderElectionState.Leading);
LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(les, LeaderElectionState.Leading);


// simulate no leader state
FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);

// reconnect
zks.stop();

SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.ConnectionLost);

zks.start();


// --- test le1 can be leader
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> {
assertEquals(le1.getState(),LeaderElectionState.Leading);
}); // reacquire leadership


assertTrue(store.get(path).join().isPresent());
}
}
Loading