Skip to content

Commit

Permalink
[fix] [bk] Correctct the bookie info after ZK client is reconnected (#…
Browse files Browse the repository at this point in the history
…21035)

Motivation: After [PIP-118: reconnect broker when ZooKeeper session expires](#13341), the Broker will not shut down after losing the connection of the local metadata store in the default configuration. However, before the ZK client is reconnected, the events of BK online and offline are lost, resulting in incorrect BK info in the memory. You can reproduce the issue by the test `BkEnsemblesChaosTest. testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect`(90% probability of reproduce of the issue, run it again if the issue does not occur)

Modifications: Refresh BK info in memory after the ZK client is reconnected.
(cherry picked from commit db20035)
  • Loading branch information
poorbarcode committed Aug 25, 2023
1 parent bc1019f commit 5f99925
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class BkEnsemblesChaosTest extends CanReconnectZKClientPulsarServiceBaseTest {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Test
public void testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final byte[] msgValue = "test".getBytes();
admin.topics().createNonPartitionedTopic(topicName);
// Ensure broker works.
Producer<byte[]> producer1 = client.newProducer().topic(topicName).create();
producer1.send(msgValue);
producer1.close();
admin.topics().unload(topicName);

// Restart some bookies, which triggers the ZK node of Bookie deleted and created.
// And make the local metadata store reconnect to lose some notification of the ZK node change.
for (int i = 0; i < numberOfBookies - 1; i++){
bkEnsemble.stopBK(i);
}
makeLocalMetadataStoreKeepReconnect();
for (int i = 0; i < numberOfBookies - 1; i++){
bkEnsemble.startBK(i);
}
// Sleep 100ms to lose the notifications of ZK node create.
Thread.sleep(100);
stopLocalMetadataStoreAlwaysReconnect();

// Ensure broker still works.
admin.topics().unload(topicName);
Producer<byte[]> producer2 = client.newProducer().topic(topicName).create();
producer2.send(msgValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import java.net.URL;
import java.nio.channels.SelectionKey;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.reflect.WhiteboxImpl;

@Slf4j
public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetrySupport {

protected final String defaultTenant = "public";
protected final String defaultNamespace = defaultTenant + "/default";
protected int numberOfBookies = 3;
protected final String clusterName = "r1";
protected URL url;
protected URL urlTls;
protected ServiceConfiguration config = new ServiceConfiguration();
protected ZookeeperServerTest brokerConfigZk;
protected LocalBookkeeperEnsemble bkEnsemble;
protected PulsarService pulsar;
protected BrokerService broker;
protected PulsarAdmin admin;
protected PulsarClient client;
protected ZooKeeper localZkOfBroker;
protected Object localMetaDataStoreClientCnx;
protected final AtomicBoolean LocalMetadataStoreInReconnectFinishSignal = new AtomicBoolean();
protected void startZKAndBK() throws Exception {
// Start ZK.
brokerConfigZk = new ZookeeperServerTest(0);
brokerConfigZk.start();

// Start BK.
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 0, () -> 0);
bkEnsemble.start();
}

protected void startBrokers() throws Exception {
// Start brokers.
setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
pulsar = new PulsarService(config);
pulsar.start();
broker = pulsar.getBrokerService();
ZKMetadataStore zkMetadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore();
localZkOfBroker = zkMetadataStore.getZkClient();
ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker, "cnxn");
Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
localMetaDataStoreClientCnx = WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");

url = new URL(pulsar.getWebServiceAddress());
urlTls = new URL(pulsar.getWebServiceAddressTls());
admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build();
client = PulsarClient.builder().serviceUrl(url.toString()).build();
}

protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, true)) {
throw new RuntimeException("Local metadata store is already keeping reconnect");
}
if (localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO")) {
makeLocalMetadataStoreKeepReconnectNIO();
} else {
// ClientCnxnSocketNetty.
makeLocalMetadataStoreKeepReconnectNetty();
}
}

protected void makeLocalMetadataStoreKeepReconnectNIO() {
new Thread(() -> {
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
try {
SelectionKey sockKey = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
if (sockKey != null) {
sockKey.channel().close();
}
// Prevents high cpu usage.
Thread.sleep(5);
} catch (Exception e) {
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
}
}
}).start();
}

protected void makeLocalMetadataStoreKeepReconnectNetty() {
new Thread(() -> {
while (LocalMetadataStoreInReconnectFinishSignal.get()) {
try {
Channel channel = WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "channel");
if (channel != null) {
channel.close();
}
// Prevents high cpu usage.
Thread.sleep(5);
} catch (Exception e) {
log.error("Try close the ZK connection of local metadata store failed: {}", e.toString());
}
}
}).start();
}

protected void stopLocalMetadataStoreAlwaysReconnect() {
LocalMetadataStoreInReconnectFinishSignal.set(false);
}

protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
admin.clusters().createCluster(clusterName, ClusterData.builder()
.serviceUrl(url.toString())
.serviceUrlTls(urlTls.toString())
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());

admin.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(clusterName)));

admin.namespaces().createNamespace(defaultNamespace, Sets.newHashSet(clusterName));
}

@Override
protected void setup() throws Exception {
incrementSetupNumber();

log.info("--- Starting OneWayReplicatorTestBase::setup ---");

startZKAndBK();

startBrokers();

createDefaultTenantsAndClustersAndNamespace();

Thread.sleep(100);
log.info("--- OneWayReplicatorTestBase::setup completed ---");
}

private void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bookkeeperEnsemble.getZookeeperPort());
config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + brokerConfigZk.getZookeeperPort() + "/foo");
config.setBrokerDeleteInactiveTopicsEnabled(false);
config.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
}

@Override
protected void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
log.info("--- Shutting down ---");

stopLocalMetadataStoreAlwaysReconnect();

// Stop brokers.
client.close();
admin.close();
if (pulsar != null) {
pulsar.close();
}

// Stop ZK and BK.
bkEnsemble.stop();
brokerConfigZk.stop();

// Reset configs.
config = new ServiceConfiguration();
setConfigDefaults(config, clusterName, bkEnsemble, brokerConfigZk);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public interface MetadataCache<T> {
*/
void invalidate(String path);

/**
* Force the invalidation of all object in the metadata cache.
*/
void invalidateAll();

/**
* Invalidate and reload an object in the metadata cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;

@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {

private final MetadataStore store;
private final AbstractMetadataStore store;
private final String ledgersRootPath;
// registration paths
private final String bookieRegistrationPath;
Expand All @@ -68,10 +70,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo;
private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo;
private final FutureUtil.Sequencer<Void> sequencer;
private SessionEvent lastMetadataSessionEvent;

public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.store = (AbstractMetadataStore) store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
this.sequencer = Sequencer.create();
Expand All @@ -88,13 +91,29 @@ public PulsarRegistrationClient(MetadataStore store,
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

store.registerListener(this::updatedBookies);
this.store.registerSessionListener(this::refreshBookies);
}

@Override
public void close() {
executor.shutdownNow();
}

private void refreshBookies(SessionEvent sessionEvent) {
lastMetadataSessionEvent = sessionEvent;
if (!SessionEvent.Reconnected.equals(sessionEvent) && !SessionEvent.SessionReestablished.equals(sessionEvent)){
return;
}
// Clean caches.
store.invalidateCaches(bookieRegistrationPath, bookieAllRegistrationPath, bookieReadonlyRegistrationPath);
bookieServiceInfoMetadataCache.invalidateAll();
// Refresh caches of the listeners.
getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
}

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
return getBookiesThenFreshCache(bookieRegistrationPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Instant;
Expand Down Expand Up @@ -523,6 +524,13 @@ public void invalidateAll() {
existsCache.synchronous().invalidateAll();
}

public void invalidateCaches(String...paths) {
LoadingCache<String, List<String>> loadingCache = childrenCache.synchronous();
for (String path : paths) {
loadingCache.invalidate(path);
}
}

/**
* Run the task in the executor thread and fail the future if the executor is shutting down.
*/
Expand Down

0 comments on commit 5f99925

Please sign in to comment.