Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16257: ZkStateReader changes to avoid race condition between collectionWatches and watchedCollectionStates #909

Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ac85088
ZkStateReader changes to avoid race condition for collectionWatches a…
patsonluk Jun 17, 2022
98813b2
Fixed missing javadoc values
patsonluk Jun 17, 2022
a14a53a
/gradlew :solr:solrj:spotlessApply
patsonluk Jun 17, 2022
548417c
Addressed :solr:solrj:validateLogCalls violations
patsonluk Jun 17, 2022
2189b3a
/gradlew :solr:solrj:spotlessApply
patsonluk Jun 17, 2022
aa026d6
Fixed DocCollectionWatches to keep a set of active collections to mai…
patsonluk Jun 17, 2022
07611a1
Code cleanup
patsonluk Jun 17, 2022
4a6d6a2
forciblyRefreshAllClusterStateSlow should use key set in collectionWa…
patsonluk Jun 17, 2022
7a8f960
Fixed DocCollectionWatches to keep a set of active collections to mai…
patsonluk Jun 17, 2022
eaa0211
/gradlew :solr:solrj:spotlessApply
patsonluk Jun 17, 2022
027436b
Added unit test cases
patsonluk Jun 18, 2022
d94a38b
./gradlew :solr:solrj:spotlessApply
patsonluk Jun 18, 2022
8f87fd8
Fixed check error
patsonluk Jun 18, 2022
174b5c4
Fixed gradle check
patsonluk Jun 18, 2022
8c32e86
Code cleanup
patsonluk Jun 18, 2022
1bfedb7
./gradlew :solr:solrj:spotlessApply
patsonluk Jun 18, 2022
44243f6
Reproduce race condition more consistently in ZkStateReaderTest
patsonluk Jun 21, 2022
22aa937
./gradlew :solr:solrj:spotlessApply
patsonluk Jun 21, 2022
c74c86b
Code cleanup
patsonluk Jun 21, 2022
1861dba
Code cleanup
patsonluk Jun 21, 2022
479227d
Fixed unit test cases
patsonluk Jun 21, 2022
d7b85e4
Do not use * for imports
patsonluk Jun 21, 2022
fbc91ff
./gradlew :solr:solrj:spotlessApply
patsonluk Jun 22, 2022
d8ceb10
Update solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader…
patsonluk Jun 30, 2022
33c99f4
Code cleanup/renaming after code review
patsonluk Jun 30, 2022
cc314e6
Code changes after code review
patsonluk Jul 1, 2022
a293166
./gradlew :solr:core:spotlessApply
patsonluk Jul 1, 2022
658d5b7
Refactoring after review. Creating a new branch as this seems to fail…
patsonluk Jul 6, 2022
07730f5
Refactoring after review. Creating a new branch as this seems to fail…
patsonluk Jul 6, 2022
90a3498
Refactoring after review. Creating a new branch as this seems to fail…
patsonluk Jul 6, 2022
3749dbc
Fixed incorrect static field
patsonluk Jul 7, 2022
c0577d6
Fixed unit test cases inti
patsonluk Jul 7, 2022
be48a38
Change after code review
patsonluk Jul 7, 2022
ac0c077
./gradlew :solr:core:spotlessApply and updated comment
patsonluk Jul 7, 2022
3edf561
./gradlew :solr:solrj:spotlessApply
patsonluk Jul 7, 2022
24a05df
Changes after code review
patsonluk Jul 7, 2022
c39e2a9
Use delay injection in race condition unit test case instead of runni…
patsonluk Jul 14, 2022
0feb4e6
Further improved the unit test case
patsonluk Jul 14, 2022
45181fb
Update solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader…
patsonluk Jul 14, 2022
c0dfcf8
Added comment for concurrent access concerns
patsonluk Jul 15, 2022
14afab4
Changes after code review
patsonluk Jul 15, 2022
7bad924
Added extra unit test case on zk version handling.
patsonluk Jul 15, 2022
2ab50d4
./gradlew :solr:core:spotlessApply
patsonluk Jul 15, 2022
d3e6688
More precise check condition
patsonluk Jul 15, 2022
e300041
Update CHANGES.txt
patsonluk Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,32 @@
*/
package org.apache.solr.cloud.overseer;

import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
HoustonPutman marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkStateReaderTest extends SolrTestCaseJ4 {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long TIMEOUT = 30;

public void testExternalCollectionWatchedNotWatched() throws Exception {
Expand Down Expand Up @@ -196,4 +200,251 @@ public void testWatchedCollectionCreation() throws Exception {
server.shutdown();
}
}

public void testForciblyRefreshAllClusterState() throws Exception {
Path zkDir = createTempDir("testForciblyRefreshAllClusterState");

ZkTestServer server = new ZkTestServer(zkDir);

SolrZkClient zkClient = null;
ZkStateReader reader = null;

try {
server.run();

zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);

reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
reader.registerCore("c1"); // watching c1, so it should get non lazy reference
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

reader.forciblyRefreshAllClusterStateSlow();
// Initially there should be no c1 collection.
assertNull(reader.getClusterState().getCollectionRef("c1"));

ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
madrob marked this conversation as resolved.
Show resolved Hide resolved

// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0);
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();

assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));

reader.forciblyRefreshAllClusterStateSlow();
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());

// update the collection
state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
ref.get().getZNodeVersion());
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();

reader.forciblyRefreshAllClusterStateSlow();
ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
assertEquals(1, ref.get().getZNodeVersion());

// delete the collection c1, add a collection c2 that is NOT watched
ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);

zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
state =
new DocCollection(
"c2",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0);
ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);

writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
writer.writePendingUpdates();

reader.forciblyRefreshAllClusterStateSlow();
ref = reader.getClusterState().getCollectionRef("c1");
assertNull(ref);

ref = reader.getClusterState().getCollectionRef("c2");
assertNotNull(ref);
assert (ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
patsonluk marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0, ref.get().getZNodeVersion());
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}

public void testGetCurrentCollections() throws Exception {
Path zkDir = createTempDir("testGetCurrentCollections");

ZkTestServer server = new ZkTestServer(zkDir);

SolrZkClient zkClient = null;
ZkStateReader reader = null;

try {
server.run();

zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);

reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
reader.registerCore("c1"); // listen to c1. not yet exist
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
reader.forceUpdateCollection("c1");
Set<String> currentCollections = reader.getCurrentCollections();
assertEquals(0, currentCollections.size()); // no active collections yet

// now create both c1 (watched) and c2 (not watched)
DocCollection state1 =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0);
ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
DocCollection state2 =
new DocCollection(
"c2",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0);

// do not listen to c2
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);

ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
writer.writePendingUpdates();

reader.forceUpdateCollection("c1");
reader.forceUpdateCollection("c2");
currentCollections =
reader.getCurrentCollections(); // should detect both collections (c1 watched, c2 lazy
// loaded)
assertEquals(2, currentCollections.size());
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}

public void testWatchRaceCondition() throws Exception {
final int RUN_COUNT = 10000;
Path zkDir = createTempDir("testWatchRaceCondition");

ZkTestServer server = new ZkTestServer(zkDir);

SolrZkClient zkClient = null;
ZkStateReader reader = null;
ExecutorService executorService =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("zkStateReaderTest"));

try {
server.run();

zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);

reader = new ZkStateReader(zkClient);
final ZkStateReader readerRef = reader;
reader.createClusterStateWatchersAndUpdate();
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);

// start another thread to constantly updating the state
final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
final ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
final AtomicInteger updateCounts = new AtomicInteger(0);
HoustonPutman marked this conversation as resolved.
Show resolved Hide resolved
final AtomicReference<Exception> updateException = new AtomicReference<>();
executorService.submit(
() -> {
try {
ClusterState clusterState = readerRef.getClusterState();
while (!stopMutatingThread.get()) {
DocCollection collection = clusterState.getCollectionOrNull("c1");
int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(
ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
currentVersion);
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
}
} catch (Exception e) {
updateException.set(e);
}
return null;
});
executorService.shutdown();

for (int i = 0; i < RUN_COUNT; i++) {
final CountDownLatch latch = new CountDownLatch(2);

// remove itself on 2nd trigger
DocCollectionWatcher dummyWatcher =
collection -> {
latch.countDown();
return latch.getCount() == 0;
};
reader.registerDocCollectionWatcher("c1", dummyWatcher);
latch.await(10, TimeUnit.SECONDS);
patsonluk marked this conversation as resolved.
Show resolved Hide resolved
reader.removeDocCollectionWatcher("c1", dummyWatcher);

boolean refLazilyLoaded = false;
for (int j = 0; j < 10; j++) {
if (reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()) {
refLazilyLoaded = true; // it should eventually be lazily loaded
break;
}
int attempt = j + 1;
log.info("ref is not lazily loaded yet. Attempt : {}", attempt);

TimeUnit.MILLISECONDS.sleep(100);
}
madrob marked this conversation as resolved.
Show resolved Hide resolved
assert (refLazilyLoaded);
madrob marked this conversation as resolved.
Show resolved Hide resolved
}

stopMutatingThread.set(true);
if (updateException.get() != null) {
throw (updateException.get());
}

} finally {
IOUtils.close(reader, zkClient);
executorService.awaitTermination(10, TimeUnit.SECONDS);
patsonluk marked this conversation as resolved.
Show resolved Hide resolved
server.shutdown();
}
}
}
Loading