diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index aa606084173e5..ab35eb7040c10 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -134,8 +134,11 @@ private synchronized CompletableFuture handleExistingLeader // If the value is the same as our proposed value, it means this instance was the leader at some // point before. The existing value can either be for this same session or for a previous one. if (res.getStat().isCreatedBySelf()) { + log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue, + path, res.getStat()); // The value is still valid because it was created in the same session changeState(LeaderElectionState.Leading); + return CompletableFuture.completedFuture(LeaderElectionState.Leading); } else { log.info("Conditionally deleting existing equals value {} for {} because it's not created in the " + "current session. stat={}", existingValue, path, res.getStat()); @@ -271,7 +274,13 @@ public synchronized CompletableFuture asyncClose() { return CompletableFuture.completedFuture(null); } - return store.delete(path, version); + return store.delete(path, version) + .thenAccept(__ -> { + synchronized (LeaderElectionImpl.this) { + leaderElectionState = LeaderElectionState.NoLeader; + } + } + ); } @Override @@ -292,8 +301,8 @@ public Optional getLeaderValueIfPresent() { private void handleSessionNotification(SessionEvent event) { // Ensure we're only processing one session event at a time. sequencer.sequential(() -> FutureUtil.composeAsync(() -> { - if (event == SessionEvent.SessionReestablished) { - log.info("Revalidating leadership for {}", path); + if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) { + log.info("Revalidating leadership for {}, event:{}", path, event); return elect().thenAccept(leaderState -> { log.info("Resynced leadership for {} - State: {}", path, leaderState); }).exceptionally(ex -> { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java index 6b4f74a30b563..4b48f3c20b02b 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java @@ -69,6 +69,8 @@ public void basicTest(String provider, Supplier urlSupplier) throws Exce leaderElection.close(); + assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader); + assertEquals(cache.get("/my/leader-election").join(), Optional.empty()); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 36cb0f132ba58..02d65fd21ed5c 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.coordination.LeaderElection; @@ -180,4 +181,58 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { .untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading)); assertTrue(store.get(path).join().isPresent()); } + + + @Test + public void testElectAfterReconnected() throws Exception { + // --- init + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis(2_000) + .build()); + + + BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); + store.registerSessionListener(sessionEvents::add); + BlockingQueue leaderElectionEvents = new LinkedBlockingQueue<>(); + String path = newKey(); + + @Cleanup + CoordinationService coordinationService = new CoordinationServiceImpl(store); + @Cleanup + LeaderElection le1 = coordinationService.getLeaderElection(String.class, path, + leaderElectionEvents::add); + + // --- test manual elect + String proposed = "value-1"; + le1.elect(proposed).join(); + assertEquals(le1.getState(), LeaderElectionState.Leading); + LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS); + assertEquals(les, LeaderElectionState.Leading); + + + // simulate no leader state + FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true); + + // reconnect + zks.stop(); + + SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.ConnectionLost); + + zks.start(); + + + // --- test le1 can be leader + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.Reconnected); + Awaitility.await().atMost(Duration.ofSeconds(15)) + .untilAsserted(()-> { + assertEquals(le1.getState(),LeaderElectionState.Leading); + }); // reacquire leadership + + + assertTrue(store.get(path).join().isPresent()); + } }