diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index dfff87935400..95f6ade16b03 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -16,12 +16,20 @@ */ package org.apache.solr.cloud.overseer; +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.util.IOUtils; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.OverseerTest; @@ -30,17 +38,86 @@ import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocCollectionWatcher; import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.PerReplicaStates; +import org.apache.solr.common.cloud.PerReplicaStatesOps; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.CommonTestInjection; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; +import org.apache.solr.handler.admin.ConfigSetsHandler; import org.apache.solr.util.TimeOut; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkStateReaderTest extends SolrTestCaseJ4 { - + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final long TIMEOUT = 30; + private static class TestFixture implements Closeable { + private final ZkTestServer server; + private final SolrZkClient zkClient; + private final ZkStateReader reader; + private final ZkStateWriter writer; + + private TestFixture( + ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) { + this.server = server; + this.zkClient = zkClient; + this.reader = reader; + this.writer = writer; + } + + @Override + public void close() throws IOException { + IOUtils.close(reader, zkClient); + try { + server.shutdown(); + } catch (InterruptedException e) { + // ok. Shutting down anyway + } + } + } + + private TestFixture fixture = null; + + @Before + public void setUp() throws Exception { + super.setUp(); + fixture = setupTestFixture(getTestName()); + } + + @After + public void tearDown() throws Exception { + if (fixture != null) { + fixture.close(); + } + super.tearDown(); + } + + private static TestFixture setupTestFixture(String testPrefix) throws Exception { + Path zkDir = createTempDir(testPrefix); + ZkTestServer server = new ZkTestServer(zkDir); + server.run(); + SolrZkClient zkClient = + new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); + ZkController.createClusterZkNodes(zkClient); + + ZkStateReader reader = new ZkStateReader(zkClient); + reader.createClusterStateWatchersAndUpdate(); + + ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), -1); + + return new TestFixture(server, zkClient, reader, writer); + } + /** Uses explicit refresh to ensure latest changes are visible. */ public void testStateFormatUpdateWithExplicitRefresh() throws Exception { testStateFormatUpdate(true, true, false); @@ -61,221 +138,475 @@ public void testStateFormatUpdateWithTimeDelayLazy() throws Exception { testStateFormatUpdate(false, false, false); } - /** Uses explicit refresh to ensure latest changes are visible. */ public void testStateFormatUpdateWithExplicitRefreshCompressed() throws Exception { testStateFormatUpdate(true, true, true); } public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting, boolean compressedState) throws Exception { - Path zkDir = createTempDir("testStateFormatUpdate"); - - ZkTestServer server = new ZkTestServer(zkDir); - - SolrZkClient zkClient = null; - ZkStateReader reader = null; - - try { - server.run(); - - zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); - ZkController.createClusterZkNodes(zkClient); - - reader = new ZkStateReader(zkClient); - reader.createClusterStateWatchersAndUpdate(); - if (isInteresting) { - reader.registerCore("c1"); - } - - ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), compressedState ? 0 : -1); + ZkStateReader reader = fixture.reader; + ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), compressedState ? 0 : -1); - zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + if (isInteresting) { + reader.registerCore("c1"); + } - { - // create new collection with stateFormat = 1 - DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE); - ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); - writer.writePendingUpdates(); + SolrZkClient zkClient = fixture.zkClient; + zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); - Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); - assertNotNull(map.get("c1")); - boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true); - assertFalse(exists); + { + // create new collection with stateFormat = 1 + DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE); + ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); + writer.writePendingUpdates(); - if (explicitRefresh) { - reader.forceUpdateCollection("c1"); - } else { - reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); - } + Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); + assertNotNull(map.get("c1")); + boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true); + assertFalse(exists); - DocCollection collection = reader.getClusterState().getCollection("c1"); - assertEquals(1, collection.getStateFormat()); + if (explicitRefresh) { + reader.forceUpdateCollection("c1"); + } else { + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); } + DocCollection collection = reader.getClusterState().getCollection("c1"); + assertEquals(1, collection.getStateFormat()); + } - { - // Now update the collection to stateFormat = 2 - DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"); - ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null); - writer.writePendingUpdates(); - Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); - assertNull(map.get("c1")); - boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true); - assertTrue(exists); + { + // Now update the collection to stateFormat = 2 + DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"); + ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null); + writer.writePendingUpdates(); - if (explicitRefresh) { - reader.forceUpdateCollection("c1"); - } else { - reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, - (n, c) -> c != null && c.getStateFormat() == 2); - } + Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); + assertNull(map.get("c1")); + boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true); + assertTrue(exists); - DocCollection collection = reader.getClusterState().getCollection("c1"); - assertEquals(2, collection.getStateFormat()); + if (explicitRefresh) { + reader.forceUpdateCollection("c1"); + } else { + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, + (n, c) -> c != null && c.getStateFormat() == 2); } - } finally { - IOUtils.close(reader, zkClient); - server.shutdown(); + DocCollection collection = reader.getClusterState().getCollection("c1"); + assertEquals(2, collection.getStateFormat()); } } - public void testExternalCollectionWatchedNotWatched() throws Exception{ - Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched"); - ZkTestServer server = new ZkTestServer(zkDir); - SolrZkClient zkClient = null; - ZkStateReader reader = null; + public void testExternalCollectionWatchedNotWatched() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); - try { - server.run(); + // create new collection with stateFormat = 2 + ZkWriteCommand c1 = new ZkWriteCommand("c1", + new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); - zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); - ZkController.createClusterZkNodes(zkClient); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); + writer.writePendingUpdates(); + reader.forceUpdateCollection("c1"); - reader = new ZkStateReader(zkClient); - reader.createClusterStateWatchersAndUpdate(); - - ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), -1); - - zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); - - // create new collection with stateFormat = 2 - ZkWriteCommand c1 = new ZkWriteCommand("c1", - new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); - writer.writePendingUpdates(); - reader.forceUpdateCollection("c1"); - - assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.registerCore("c1"); - assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.unregisterCore("c1"); - assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - - } finally { - IOUtils.close(reader, zkClient); - server.shutdown(); - } + assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); + reader.registerCore("c1"); + assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); + reader.unregisterCore("c1"); + assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); } - public void testCollectionStateWatcherCaching() throws Exception { - Path zkDir = createTempDir("testCollectionStateWatcherCaching"); - - ZkTestServer server = new ZkTestServer(zkDir); - - SolrZkClient zkClient = null; - ZkStateReader reader = null; - - try { - server.run(); - - zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); - ZkController.createClusterZkNodes(zkClient); - - reader = new ZkStateReader(zkClient); - reader.createClusterStateWatchersAndUpdate(); - - zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); - - ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), -1); - DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); - ZkWriteCommand wc = new ZkWriteCommand("c1", state); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); - writer.writePendingUpdates(); - assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); - reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null); - - state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); - wc = new ZkWriteCommand("c1", state); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); - writer.writePendingUpdates(); - - boolean found = false; - TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); - while (!timeOut.hasTimedOut()) { - DocCollection c1 = reader.getClusterState().getCollection("c1"); - if ("y".equals(c1.getStr("x"))) { - found = true; - break; - } + public void testCollectionStateWatcherCaching() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + + DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); + writer.writePendingUpdates(); + assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); + reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null); + + state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); + writer.writePendingUpdates(); + + boolean found = false; + TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeOut.hasTimedOut()) { + DocCollection c1 = reader.getClusterState().getCollection("c1"); + if ("y".equals(c1.getStr("x"))) { + found = true; + break; } - assertTrue("Could not find updated property in collection c1 even after 5 seconds", found); - } finally { - IOUtils.close(reader, zkClient); - server.shutdown(); } + assertTrue("Could not find updated property in collection c1 even after 5 seconds", found); } public void testWatchedCollectionCreation() throws Exception { - Path zkDir = createTempDir("testWatchedCollectionCreation"); - - ZkTestServer server = new ZkTestServer(zkDir); + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; - SolrZkClient zkClient = null; - ZkStateReader reader = null; - - try { - server.run(); + reader.registerCore("c1"); - zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); - ZkController.createClusterZkNodes(zkClient); + // Initially there should be no c1 collection. + assertNull(reader.getClusterState().getCollectionRef("c1")); - reader = new ZkStateReader(zkClient); - reader.createClusterStateWatchersAndUpdate(); - reader.registerCore("c1"); + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + reader.forceUpdateCollection("c1"); - // Initially there should be no c1 collection. - assertNull(reader.getClusterState().getCollectionRef("c1")); + // Still no c1 collection, despite a collection path. + assertNull(reader.getClusterState().getCollectionRef("c1")); - zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); - reader.forceUpdateCollection("c1"); + // create new collection with stateFormat = 2 + DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); + writer.writePendingUpdates(); - // Still no c1 collection, despite a collection path. - assertNull(reader.getClusterState().getCollectionRef("c1")); + assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); - ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), -1); + // reader.forceUpdateCollection("c1"); + reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); + ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); + assertNotNull(ref); + assertFalse(ref.isLazilyLoaded()); + assertEquals(2, ref.get().getStateFormat()); + } + /** + * Verifies that znode and child versions are correct and version changes trigger cluster state + * updates + */ + public void testNodeVersion() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + + ClusterState clusterState = reader.getClusterState(); + // create new collection + DocCollection state = + new DocCollection( + "c1", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + 0, + ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + + // have to register it here after the updates, otherwise the child node watch will not be + // inserted + reader.registerCore("c1"); + + TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + timeOut.waitFor( + "Timeout on waiting for c1 to show up in cluster state", + () -> reader.getClusterState().getCollectionOrNull("c1") != null); + + ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); + assertFalse(ref.isLazilyLoaded()); + assertEquals(0, ref.get().getZNodeVersion()); + assertEquals(0, ref.get().getChildNodesVersion()); //default is 0 in solr 8 (current version), but in Solr 9+ it's -1 + + DocCollection collection = ref.get(); + PerReplicaStates prs = + PerReplicaStates.fetch( + collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates()); + PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs) + .persist(collection.getZNode(), fixture.zkClient); + timeOut.waitFor( + "Timeout on waiting for c1 updated to have PRS state r1", + () -> { + DocCollection c = reader.getCollection("c1"); + return c.getPerReplicaStates() != null + && c.getPerReplicaStates().get("r1") != null + && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN; + }); + + ref = reader.getClusterState().getCollectionRef("c1"); + assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version + assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now + + prs = ref.get().getPerReplicaStates(); + PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs) + .persist(collection.getZNode(), fixture.zkClient); + timeOut.waitFor( + "Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN", + () -> + reader.getCollection("c1").getPerReplicaStates().get("r1").state + == Replica.State.ACTIVE); + + ref = reader.getClusterState().getCollectionRef("c1"); + assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version + // but child version should be 3 now (1 del + 1 add) + assertEquals(3, ref.get().getChildNodesVersion()); + + // now delete the collection + wc = new ZkWriteCommand("c1", null); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + timeOut.waitFor( + "Timeout on waiting for c1 to be removed from cluster state", + () -> reader.getClusterState().getCollectionOrNull("c1") == null); + + reader.unregisterCore("c1"); + // re-add the same collection + wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + // re-register, otherwise the child watch would be missing from collection deletion + reader.registerCore("c1"); + + // reader.forceUpdateCollection("c1"); + timeOut.waitFor( + "Timeout on waiting for c1 to show up in cluster state again", + () -> reader.getClusterState().getCollectionOrNull("c1") != null); + ref = reader.getClusterState().getCollectionRef("c1"); + assertFalse(ref.isLazilyLoaded()); + assertEquals(0, ref.get().getZNodeVersion()); + assertEquals(0, ref.get().getChildNodesVersion()); // child node version is reset + + // re-add PRS + collection = ref.get(); + prs = + PerReplicaStates.fetch( + collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates()); + PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs) + .persist(collection.getZNode(), fixture.zkClient); + timeOut.waitFor( + "Timeout on waiting for c1 updated to have PRS state r1", + () -> { + DocCollection c = reader.getCollection("c1"); + return c.getPerReplicaStates() != null + && c.getPerReplicaStates().get("r1") != null + && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN; + }); + + ref = reader.getClusterState().getCollectionRef("c1"); + + // child version should be reset since the state.json node was deleted and re-created + assertEquals(1, ref.get().getChildNodesVersion()); + } - // create new collection with stateFormat = 2 - DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); - ZkWriteCommand wc = new ZkWriteCommand("c1", state); - writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); - writer.writePendingUpdates(); + public void testForciblyRefreshAllClusterState() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + + reader.registerCore("c1"); // watching c1, so it should get non lazy reference + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + + reader.forciblyRefreshAllClusterStateSlow(); + // Initially there should be no c1 collection. + assertNull(reader.getClusterState().getCollectionRef("c1")); + + // create new collection + DocCollection state = + new DocCollection( + "c1", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + 0, + ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); + writer.writePendingUpdates(); + + assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); + + reader.forciblyRefreshAllClusterStateSlow(); + ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); + assertNotNull(ref); + assertFalse(ref.isLazilyLoaded()); + assertEquals(0, ref.get().getZNodeVersion()); + + // update the collection + state = + new DocCollection( + "c1", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + ref.get().getZNodeVersion(), + ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); + writer.writePendingUpdates(); + + reader.forciblyRefreshAllClusterStateSlow(); + ref = reader.getClusterState().getCollectionRef("c1"); + assertNotNull(ref); + assertFalse(ref.isLazilyLoaded()); + assertEquals(1, ref.get().getZNodeVersion()); + + // delete the collection c1, add a collection c2 that is NOT watched + ZkWriteCommand wc1 = new ZkWriteCommand("c1", null); + + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); + state = + new DocCollection( + "c2", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + 0, + ZkStateReader.CLUSTER_STATE + "/c2/state.json"); + ZkWriteCommand wc2 = new ZkWriteCommand("c2", state); + + writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); + writer.writePendingUpdates(); + + reader.forciblyRefreshAllClusterStateSlow(); + ref = reader.getClusterState().getCollectionRef("c1"); + assertNull(ref); + + ref = reader.getClusterState().getCollectionRef("c2"); + assertNotNull(ref); + assertTrue( + "c2 should have been lazily loaded but is not!", + ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched + assertEquals(0, ref.get().getZNodeVersion()); + } - assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); + public void testGetCurrentCollections() throws Exception { + ZkStateWriter writer = fixture.writer; + ZkStateReader reader = fixture.reader; + + reader.registerCore("c1"); // listen to c1. not yet exist + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + reader.forceUpdateCollection("c1"); + Set currentCollections = reader.getCurrentCollections(); + assertEquals(0, currentCollections.size()); // no active collections yet + + // now create both c1 (watched) and c2 (not watched) + DocCollection state1 = + new DocCollection( + "c1", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + 0, + ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1); + DocCollection state2 = + new DocCollection( + "c2", + new HashMap<>(), + Collections.singletonMap(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + 0, + ZkStateReader.CLUSTER_STATE + "/c2/state.json"); + + // do not listen to c2 + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); + ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2); + + writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); + writer.writePendingUpdates(); + + reader.forceUpdateCollection("c1"); + reader.forceUpdateCollection("c2"); + + // should detect both collections (c1 watched, c2 lazy loaded) + currentCollections = reader.getCurrentCollections(); + assertEquals(2, currentCollections.size()); + } - //reader.forceUpdateCollection("c1"); - reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); - ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); - assertNotNull(ref); - assertFalse(ref.isLazilyLoaded()); - assertEquals(2, ref.get().getStateFormat()); + /** + * Simulates race condition that might arise when state updates triggered by watch notification + * contend with removal of collection watches. + * + *

Such race condition should no longer exist with the new code that uses a single map for both + * "collection watches" and "latest state of watched collection" + */ + public void testWatchRaceCondition() throws Exception { + ExecutorService executorService = + ExecutorUtil.newMDCAwareSingleThreadExecutor( + new SolrNamedThreadFactory("zkStateReaderTest")); + CommonTestInjection.setDelay(1000); + final AtomicBoolean stopMutatingThread = new AtomicBoolean(false); + try { + ZkStateWriter writer = fixture.writer; + final ZkStateReader reader = fixture.reader; + fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); + + // start another thread to constantly updating the state + final AtomicReference updateException = new AtomicReference<>(); + executorService.submit( + () -> { + try { + ClusterState clusterState = reader.getClusterState(); + while (!stopMutatingThread.get()) { + DocCollection collection = clusterState.getCollectionOrNull("c1"); + int currentVersion = collection != null ? collection.getZNodeVersion() : 0; + // create new collection + DocCollection state = + new DocCollection( + "c1", + new HashMap<>(), + Collections.singletonMap( + ZkStateReader.CONFIGNAME_PROP, + ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), + DocRouter.DEFAULT, + currentVersion, + ZkStateReader.CLUSTER_STATE + "/c1/state.json"); + ZkWriteCommand wc = new ZkWriteCommand("c1", state); + writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); + clusterState = writer.writePendingUpdates(); + TimeUnit.MILLISECONDS.sleep(100); + } + } catch (Exception e) { + updateException.set(e); + } + return null; + }); + executorService.shutdown(); + + reader.waitForState( + "c1", + 10, + TimeUnit.SECONDS, + slices -> slices != null); // wait for the state to become available + + final CountDownLatch latch = new CountDownLatch(2); + + // remove itself on 2nd trigger + DocCollectionWatcher dummyWatcher = + collection -> { + latch.countDown(); + return latch.getCount() == 0; + }; + reader.registerDocCollectionWatcher("c1", dummyWatcher); + assertTrue( + "Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS)); + reader.removeDocCollectionWatcher("c1", dummyWatcher); + + // cluster state might not be updated right the way from the removeDocCollectionWatcher call + // above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher + // as well and might still be in the middle of updating the cluster state. + TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); + timeOut.waitFor( + "The ref is not lazily loaded after waiting", + () -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); + + if (updateException.get() != null) { + throw (updateException.get()); + } } finally { - IOUtils.close(reader, zkClient); - server.shutdown(); - + stopMutatingThread.set(true); + CommonTestInjection.reset(); + ExecutorUtil.awaitTermination(executorService); } } -} +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 8df1879030af..5f10e6ae5403 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -180,11 +181,6 @@ public class ZkStateReader implements SolrCloseable { */ private int legacyClusterStateVersion = 0; - /** - * Collections with format2 state.json, "interesting" and actively watched. - */ - private final ConcurrentHashMap watchedCollectionStates = new ConcurrentHashMap<>(); - /** * Collections with format2 state.json, not "interesting" and not actively watched. */ @@ -216,7 +212,11 @@ public class ZkStateReader implements SolrCloseable { private final Runnable securityNodeListener; - private ConcurrentHashMap> collectionWatches = new ConcurrentHashMap<>(); + /** + * Collections with active watches. The {@link StatefulCollectionWatch} inside for each collection + * might also contain the latest DocCollection (state) observed + */ + private DocCollectionWatches collectionWatches = new DocCollectionWatches(); // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map. private ConcurrentHashMap> collectionPropsObservers = new ConcurrentHashMap<>(); @@ -283,6 +283,140 @@ public boolean canBeRemoved() { } + /** + * A ConcurrentHashMap of active watcher by collection name + * + *

Each watcher DocCollectionWatch also contains the latest DocCollection (state) observed + */ + private static class DocCollectionWatches { + private final ConcurrentHashMap + statefulWatchesByCollectionName = new ConcurrentHashMap<>(); + + /** + * Gets the DocCollection (state) of the collection which the corresponding watch last observed + * + * @param collection the collection name to get DocCollection on + * @return The last observed DocCollection(state). if null, that means there's no such + * collection. + */ + private DocCollection getDocCollection(String collection) { + StatefulCollectionWatch watch = statefulWatchesByCollectionName.get(collection); + return watch != null ? watch.currentState : null; + } + + /** + * Gets the active collections (collections that exist) being watched + * + * @return an immutable set of active collection names + */ + private Set activeCollections() { + return Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet().stream() + .filter( + (Entry entry) -> + entry.getValue().currentState != null) + .map(Entry::getKey) + .collect(Collectors.toSet())); + } + + /** + * Gets the count of active collections (collections that exist) being watched + * + * @return the count of active collections + */ + private long activeCollectionCount() { + return statefulWatchesByCollectionName.entrySet().stream() + .filter( + (Entry entry) -> + entry.getValue().currentState != null) + .count(); + } + + /** + * Gets a Set of watched collection names. The returned value is thread-safe and unmodifiable. + * + * @return Set of watched collection names + */ + private Set watchedCollections() { + return Collections.unmodifiableSet(statefulWatchesByCollectionName.keySet()); + } + + private Set> watchedCollectionEntries() { + return Collections.unmodifiableSet(statefulWatchesByCollectionName.entrySet()); + } + + /** + * Updates the latest observed DocCollection (state) of the {@link StatefulCollectionWatch} if + * the collection is being watched + * + * @param collection the collection name + * @param newState the new DocCollection (state) observed + * @return whether the state has changed for the watched collection + */ + private boolean updateDocCollection(String collection, DocCollection newState) { + AtomicBoolean stateHasChanged = new AtomicBoolean(false); + statefulWatchesByCollectionName.computeIfPresent( + collection, + (col, watch) -> { + DocCollection oldState = watch.currentState; + if (oldState == null && newState == null) { + // OK, the collection not yet exist in ZK or already deleted + } else if (oldState == null) { + if (log.isDebugEnabled()) { + log.debug("Add data for [{}] ver [{}]", collection, newState.getZNodeVersion()); + } + watch.currentState = newState; + } else if (newState == null) { + log.debug("Removing cached collection state for [{}]", collection); + watch.currentState = null; + } else { // both new and old states are non-null + int oldCVersion = + oldState.getPerReplicaStates() == null + ? -1 + : oldState.getPerReplicaStates().cversion; + int newCVersion = + newState.getPerReplicaStates() == null + ? -1 + : newState.getPerReplicaStates().cversion; + if (oldState.getZNodeVersion() < newState.getZNodeVersion() + || oldCVersion < newCVersion) { + watch.currentState = newState; + if (log.isDebugEnabled()) { + log.debug( + "Updating data for [{}] from [{}] to [{}]", + collection, + oldState.getZNodeVersion(), + newState.getZNodeVersion()); + } + } + } + stateHasChanged.set(oldState != watch.currentState); + return watch; + }); + + return stateHasChanged.get(); + } + + /** + * Computes the new StatefulCollectionWatch by the supplied remappingFunction. + * + * @param collectionName collection name + * @param remappingFunction remaps the StatefulCollectionWatch. If this returns null, the + * associated StatefulCollectionWatch will be removed; otherwise, the returned value will be + * assigned to such collection + * @return the new StatefulCollectionWatch associated with the collection + * @see ConcurrentHashMap#compute(Object, BiFunction) + */ + private StatefulCollectionWatch compute( + String collectionName, + BiFunction remappingFunction) { + return statefulWatchesByCollectionName.compute(collectionName, remappingFunction); + } + } + + private static class StatefulCollectionWatch extends CollectionWatch { + private DocCollection currentState; + } + public static final Set KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList( LEGACY_CLOUD, URL_SCHEME, @@ -402,12 +536,16 @@ public void forciblyRefreshAllClusterStateSlow() throws KeeperException, Interru refreshCollectionList(null); refreshLiveNodes(null); refreshLegacyClusterState(null); - // Need a copy so we don't delete from what we're iterating over. - Collection safeCopy = new ArrayList<>(watchedCollectionStates.keySet()); Set updatedCollections = new HashSet<>(); - for (String coll : safeCopy) { + + // Iterate through the actively watched collections. Take note that the returned watched + // collections might change during the iteration, but it should not throw exception as + // it's thread-safe. + // If such set is modified elsewhere during the iteration, the code logic should still + // handle such missing/extra collection w/o issues. + for (String coll : collectionWatches.watchedCollections()) { DocCollection newState = fetchCollectionState(coll, null, null); - if (updateWatchedCollection(coll, newState)) { + if (collectionWatches.updateDocCollection(coll, newState)) { updatedCollections.add(coll); } } @@ -451,11 +589,11 @@ public void forceUpdateCollection(String collection) throws KeeperException, Int } // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there. refreshLegacyClusterState(null); - } else if (watchedCollectionStates.containsKey(collection)) { + } else if (collectionWatches.watchedCollections().contains(collection)) { // Exists as a watched collection, force a refresh. log.debug("Forcing refresh of watched collection state for {}", collection); DocCollection newState = fetchCollectionState(collection, null, null); - if (updateWatchedCollection(collection, newState)) { + if (collectionWatches.updateDocCollection(collection, newState)) { constructState(Collections.singleton(collection)); } } else { @@ -482,7 +620,7 @@ public Integer compareStateVersions(String coll, int version) { DocCollection nu = getCollectionLive(this, coll); if (nu == null) return -1; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { - if (updateWatchedCollection(coll, nu)) { + if (collectionWatches.updateDocCollection(coll, nu)) { synchronized (getUpdateLock()) { constructState(Collections.singleton(coll)); } @@ -602,8 +740,13 @@ private void constructState(Set changedCollections) { Map result = new LinkedHashMap<>(legacyCollectionStates); // Add state format2 collections, but don't override legacy collection states. - for (Map.Entry entry : watchedCollectionStates.entrySet()) { - result.putIfAbsent(entry.getKey(), new ClusterState.CollectionRef(entry.getValue())); + for (Entry entry : + collectionWatches.watchedCollectionEntries()) { + if (entry.getValue().currentState != null) { + // if the doc is null for the collection watch, then it should not be inserted into the + // state + result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue().currentState)); + } } // Finally, add any lazy collections that aren't already accounted for. @@ -616,8 +759,8 @@ private void constructState(Set changedCollections) { if (log.isDebugEnabled()) { log.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet().size(), - collectionWatches.keySet().size(), - watchedCollectionStates.keySet().size(), + collectionWatches.watchedCollections().size(), + collectionWatches.activeCollectionCount(), lazyCollectionStates.keySet().size(), clusterState.getCollectionStates().size()); } @@ -625,8 +768,8 @@ private void constructState(Set changedCollections) { if (log.isTraceEnabled()) { log.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet(), - collectionWatches.keySet(), - watchedCollectionStates.keySet(), + collectionWatches.watchedCollections(), + collectionWatches.activeCollections(), lazyCollectionStates.keySet(), clusterState.getCollectionStates()); } @@ -653,7 +796,7 @@ private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, return; } Set updatedCollections = new HashSet<>(); - for (String coll : this.collectionWatches.keySet()) { + for (String coll : this.collectionWatches.watchedCollections()) { ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); // legacy collections are always in-memory DocCollection oldState = ref == null ? null : ref.get(); @@ -661,7 +804,7 @@ private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, DocCollection newState = newRef == null ? null : newRef.get(); if (newState == null) { // check that we haven't just migrated - newState = watchedCollectionStates.get(coll); + newState = collectionWatches.getDocCollection(coll); } if (!Objects.equals(oldState, newState)) { updatedCollections.add(coll); @@ -685,7 +828,7 @@ private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, * Refresh state format2 collections. */ private void refreshStateFormat2Collections() { - for (String coll : collectionWatches.keySet()) { + for (String coll : collectionWatches.watchedCollections()) { new StateWatcher(coll).refreshAndWatch(); } } @@ -727,7 +870,7 @@ private void refreshCollectionList(Watcher watcher) throws KeeperException, Inte this.lazyCollectionStates.keySet().retainAll(children); for (String coll : children) { // We will create an eager collection for any interesting collections, so don't add to lazy. - if (!collectionWatches.containsKey(coll)) { + if (!collectionWatches.watchedCollections().contains(coll)) { // Double check contains just to avoid allocating an object. LazyCollectionRef existing = lazyCollectionStates.get(coll); if (existing == null) { @@ -776,10 +919,10 @@ private void notifyCloudCollectionsListeners(boolean notifyIfSame) { } } - private Set getCurrentCollections() { + public Set getCurrentCollections() { Set collections = new HashSet<>(); collections.addAll(legacyCollectionStates.keySet()); - collections.addAll(watchedCollectionStates.keySet()); + collections.addAll(collectionWatches.activeCollections()); collections.addAll(lazyCollectionStates.keySet()); return collections; } @@ -1375,7 +1518,7 @@ public void process(WatchedEvent event) { return; } - if (!collectionWatches.containsKey(coll)) { + if (!collectionWatches.watchedCollections().contains(coll)) { // This collection is no longer interesting, stop watching. log.debug("Uninteresting collection {}", coll); return; @@ -1395,9 +1538,9 @@ public void refreshAndWatch() { } /** - * Refresh collection state from ZK and leave a watch for future changes. - * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates} - * with the results of the refresh. + * Refresh collection state from ZK and leave a watch for future changes. As a side effect, + * updates {@link #clusterState} and collection ref within {@link #collectionWatches} with the + * results of the refresh. */ public void refreshAndWatch(EventType eventType) { try { @@ -1410,7 +1553,7 @@ public void refreshAndWatch(EventType eventType) { } DocCollection newState = fetchCollectionState(coll, this, collectionPath); - updateWatchedCollection(coll, newState); + collectionWatches.updateDocCollection(coll, newState); synchronized (getUpdateLock()) { constructState(Collections.singleton(coll)); } @@ -1432,11 +1575,11 @@ private void refreshAndWatchChildren() throws KeeperException, InterruptedExcept try { replicaStates = zkClient.getChildren(collectionPath, this, stat, true); PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates); - DocCollection oldState = watchedCollectionStates.get(coll); + DocCollection oldState = collectionWatches.getDocCollection(coll); final DocCollection newState = oldState != null ? oldState.copyWith(newStates) : fetchCollectionState(coll, null, collectionPath); - updateWatchedCollection(coll, newState); + collectionWatches.updateDocCollection(coll, newState); synchronized (getUpdateLock()) { constructState(Collections.singleton(coll)); } @@ -1751,7 +1894,7 @@ public void registerCore(String collection) { collectionWatches.compute(collection, (k, v) -> { if (v == null) { reconstructState.set(true); - v = new CollectionWatch<>(); + v = new StatefulCollectionWatch();; } v.coreRefCount++; return v; @@ -1779,7 +1922,6 @@ public void unregisterCore(String collection) { if (v.coreRefCount > 0) v.coreRefCount--; if (v.canBeRemoved()) { - watchedCollectionStates.remove(collection); lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); reconstructState.set(true); return null; @@ -1837,7 +1979,7 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher AtomicBoolean watchSet = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) { - v = new CollectionWatch<>(); + v = new StatefulCollectionWatch(); watchSet.set(true); } log.debug("already watching: {} , added to stateWatchers", !watchSet.get()); @@ -2045,7 +2187,6 @@ public void removeDocCollectionWatcher(String collection, DocCollectionWatcher w return null; v.stateWatchers.remove(watcher); if (v.canBeRemoved()) { - watchedCollectionStates.remove(collection); lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); reconstructState.set(true); return null; @@ -2071,58 +2212,6 @@ public Set getStateWatchers(String collection) { return watchers; } - // returns true if the state has changed - private boolean updateWatchedCollection(String coll, DocCollection newState) { - - if (newState == null) { - log.debug("Removing cached collection state for [{}]", coll); - watchedCollectionStates.remove(coll); - return true; - } - - boolean updated = false; - // CAS update loop - while (true) { - if (!collectionWatches.containsKey(coll)) { - break; - } - DocCollection oldState = watchedCollectionStates.get(coll); - if (oldState == null) { - if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { - if (log.isDebugEnabled()) { - log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); - } - updated = true; - break; - } - } else { - int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion; - int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion; - if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) { - // no change to state, but we might have been triggered by the addition of a - // state watcher, so run notifications - updated = true; - break; - } - if (watchedCollectionStates.replace(coll, oldState, newState)) { - if (log.isDebugEnabled()) { - log.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); - } - updated = true; - break; - } - } - } - - // Resolve race with unregisterCore. - if (!collectionWatches.containsKey(coll)) { - watchedCollectionStates.remove(coll); - log.debug("Removing uninteresting collection [{}]", coll); - } - - return updated; - } - public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); collectionPropsObservers.compute(collection, (k, v) -> { diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java index eed71759278a..dc388bdf44ff 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java @@ -17,21 +17,29 @@ package org.apache.solr.common.util; +import java.lang.invoke.MethodHandles; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Allows random faults to be injected in running code during test runs across all solr packages. * * @lucene.internal */ public class CommonTestInjection { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private volatile static Map additionalSystemProps = null; public static void reset() { additionalSystemProps = null; + delay = null; } + private static volatile Integer delay = null; + public static void setAdditionalProps(Map additionalSystemProps) { CommonTestInjection.additionalSystemProps = additionalSystemProps; } @@ -39,4 +47,32 @@ public static void setAdditionalProps(Map additionalSystemProps) public static Map injectAdditionalProps() { return additionalSystemProps; } + + /** + * Set test delay (sleep) in unit of millisec + * + * @param delay delay in millisec, null to remove such delay + */ + public static void setDelay(Integer delay) { + CommonTestInjection.delay = delay; + } + + /** + * Inject an artificial delay(sleep) into the code + * + * @return true + */ + public static boolean injectDelay() { + if (delay != null) { + try { + log.info("Start: artificial delay for {}ms", delay); + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + log.info("Finish: artificial delay for {}ms", delay); + } + } + return true; + } }