Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17578: Remove ZkController internal core supplier. #2891

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Improvements

Optimizations
---------------------
(No changes)
* SOLR-17578: Remove ZkController internal core supplier, for slightly faster reconnection after Zookeeper session loss. (Pierre Salagnac)

Bug Fixes
---------------------
Expand Down
198 changes: 88 additions & 110 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.solr.client.solrj.SolrClient;
Expand Down Expand Up @@ -298,16 +297,12 @@ public Object call() throws Exception {
* @param zkClientConnectTimeout timeout in ms
* @param cloudConfig configuration for this controller. TODO: possibly redundant with
* CoreContainer
* @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores
* to re-register on reconnect
*/
@SuppressWarnings({"unchecked"})
public ZkController(
final CoreContainer cc,
String zkServerAddress,
int zkClientConnectTimeout,
CloudConfig cloudConfig,
final Supplier<List<CoreDescriptor>> descriptorsSupplier)
CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {

if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
Expand Down Expand Up @@ -366,9 +361,8 @@ public ZkController(
.withUrl(zkServerAddress)
.withTimeout(clientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withReconnectListener(() -> onReconnect(descriptorsSupplier))
.withDisconnectListener(
(sessionExpired) -> onDisconnect(descriptorsSupplier, sessionExpired))
.withReconnectListener(this::onReconnect)
.withDisconnectListener((sessionExpired) -> onDisconnect(sessionExpired))
.withAclProvider(zkACLProvider)
.withClosedCheck(cc::isShutDown)
.withCompressor(compressor)
Expand Down Expand Up @@ -404,18 +398,27 @@ public ZkController(
assert ObjectReleaseTracker.track(this);
}

private void onDisconnect(
Supplier<List<CoreDescriptor>> descriptorsSupplier, boolean sessionExpired) {
private void onDisconnect(boolean sessionExpired) {
try {
overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
closeOutstandingElections(descriptorsSupplier, sessionExpired);
markAllAsNotLeader(descriptorsSupplier);

// Close outstanding leader elections
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor, sessionExpired);
}

// Mark all cores as not leader
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().setLeader(false);
descriptor.getCloudDescriptor().setHasRegistered(false);
}
}

private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
private void onReconnect() {
// on reconnect, reload cloud info
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
clearZkCollectionTerms();
Expand Down Expand Up @@ -456,7 +459,7 @@ private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
cc.cancelCoreRecoveries();

try {
registerAllCoresAsDown(descriptorsSupplier, false);
registerAllCoresAsDown(false);
} catch (SessionExpiredException e) {
// zk has to reconnect and this will all be tried again
throw e;
Expand All @@ -469,26 +472,24 @@ private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier) {
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();

List<CoreDescriptor> descriptors = descriptorsSupplier.get();
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
// re register all descriptors
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was a leader
// that was expired - as well as what to do about leaders/overseers with
// connection loss
try {
// unload solr cores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was a leader
// that was expired - as well as what to do about leaders/overseers with
// connection loss
try {
// unload solr cores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);

if (executorService != null) {
executorService.submit(new RegisterCoreAsync(descriptor, true, true));
} else {
register(descriptor.getName(), descriptor, true, true, false);
}
} catch (Exception e) {
log.error("Error registering SolrCore", e);
if (executorService != null) {
executorService.submit(new RegisterCoreAsync(descriptor, true, true));
} else {
register(descriptor.getName(), descriptor, true, true, false);
}
} catch (Exception e) {
log.error("Error registering SolrCore", e);
}
}

Expand Down Expand Up @@ -588,75 +589,72 @@ public int getLeaderConflictResolveWait() {
return leaderConflictResolveWait;
}

private void registerAllCoresAsDown(
final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean updateLastPublished)
throws SessionExpiredException {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
private void registerAllCoresAsDown(boolean updateLastPublished) throws SessionExpiredException {
List<CoreDescriptor> descriptors = cc.getCoreDescriptors();
if (isClosed) return;
if (descriptors != null) {
// before registering as live, make sure everyone is in a
// down state
publishNodeAsDown(getNodeName());
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String slice = cloudDesc.getShardId();
try {

int children =
zkStateReader
.getZkClient()
.getChildren(
ZkStateReader.COLLECTIONS_ZKNODE
+ "/"
+ collection
+ "/leader_elect/"
+ slice
+ "/election",
null,
true)
.size();
if (children == 0) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
}
// before registering as live, make sure everyone is in a
// down state
publishNodeAsDown(getNodeName());
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String slice = cloudDesc.getShardId();
try {

} catch (NoNodeException e) {
int children =
zkStateReader
.getZkClient()
.getChildren(
ZkStateReader.COLLECTIONS_ZKNODE
+ "/"
+ collection
+ "/leader_elect/"
+ slice
+ "/election",
null,
true)
.size();
if (children == 0) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
} catch (SessionExpiredException e) {
// zk has to reconnect
throw e;
} catch (KeeperException e) {
log.warn("", e);
Thread.currentThread().interrupt();
}

final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
log.debug(
"calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}",
coreZkNodeName,
collection,
slice);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
log.warn(
"There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration",
e);
if (isClosed) {
return;
}
} catch (NoNodeException e) {
log.debug(
"looks like we are going to be the leader for collection {} shard {}",
collection,
slice);
continue;
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
} catch (SessionExpiredException e) {
// zk has to reconnect
throw e;
} catch (KeeperException e) {
log.warn("", e);
Thread.currentThread().interrupt();
}

final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
try {
log.debug(
"calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}",
coreZkNodeName,
collection,
slice);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
log.warn(
"There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration",
e);
if (isClosed) {
return;
}
}
}
Expand All @@ -666,16 +664,6 @@ public NodesSysPropsCacher getSysPropsCacher() {
return sysPropsCacher;
}

private void closeOutstandingElections(
final Supplier<List<CoreDescriptor>> registerOnReconnect, boolean sessionExpired) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
closeExistingElectionContext(descriptor, sessionExpired);
}
}
}

private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
Expand All @@ -696,16 +684,6 @@ private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessi
return contextKey;
}

private void markAllAsNotLeader(final Supplier<List<CoreDescriptor>> registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect.get();
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
descriptor.getCloudDescriptor().setLeader(false);
descriptor.getCloudDescriptor().setHasRegistered(false);
}
}
}

public void preClose() {
this.isClosed = true;

Expand Down
12 changes: 1 addition & 11 deletions solr/core/src/java/org/apache/solr/core/ZkContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
Expand Down Expand Up @@ -132,15 +129,8 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) {
"A chroot was specified in ZkHost but the znode doesn't exist. " + zookeeperHost);
}

Supplier<List<CoreDescriptor>> descriptorsSupplier =
() ->
cc.getCores().stream()
.map(SolrCore::getCoreDescriptor)
.collect(Collectors.toList());

ZkController zkController =
new ZkController(
cc, zookeeperHost, zkClientConnectTimeout, config, descriptorsSupplier);
new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config);

if (zkRun != null) {
if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ public void testSimpleSliceLeaderElection() throws Exception {
"shard1",
jetty
.getCoreContainer()
.getCores()
.getCoreDescriptors()
.iterator()
.next()
.getCoreDescriptor()
.getCloudDescriptor()
.getShardId());
String jettyNodeName = jetty.getNodeName(); // must get before shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@
package org.apache.solr.cloud;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;

public class MockSimpleZkController extends ZkController {

public MockSimpleZkController(
CoreContainer cc,
String zkServerAddress,
int zkClientConnectTimeout,
CloudConfig cloudConfig,
Supplier<List<CoreDescriptor>> descriptorsSupplier)
CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, descriptorsSupplier);
super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.util.LuceneTestCase.BadApple;
Expand Down Expand Up @@ -63,8 +62,7 @@ public void testLeaderElectionWithZkExpiry() throws Exception {
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory(this.getTestName()));
try (ZkController zkController =
new ZkController(
cc, server.getZkAddress(), 15000, cloudConfig, Collections::emptyList)) {
new ZkController(cc, server.getZkAddress(), 15000, cloudConfig)) {
threadExecutor.execute(
() -> {
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
Expand Down
Loading
Loading