diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java new file mode 100644 index 0000000000000..8e857d2606d9b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -0,0 +1,149 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; +import java.util.stream.Stream; + +/** + * NoOpEngine is an engine implementation that does nothing but the bare minimum + * required in order to have an engine. All attempts to do something (search, + * index, get), throw {@link UnsupportedOperationException}. This does maintain + * a translog with a deletion policy so that when flushing, no translog is + * retained on disk (setting a retention size and age of 0). + * + * It's also important to notice that this does list the commits of the Store's + * Directory so that the last commit's user data can be read for the historyUUID + * and last committed segment info. + */ +public final class NoOpEngine extends ReadOnlyEngine { + + public NoOpEngine(EngineConfig engineConfig) { + super(engineConfig, null, null, true, directoryReader -> directoryReader); + boolean success = false; + try { + // The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1 + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + + // The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog + try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) { + final int nbOperations = translog.totalOperations(); + if (nbOperations != 0) { + throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations); + } + } + success = true; + } catch (IOException | TranslogCorruptedException e) { + throw new EngineCreationFailureException(shardId, "failed to create engine", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } + } + + @Override + protected DirectoryReader open(final Directory directory) throws IOException { + final List indexCommits = DirectoryReader.listCommits(directory); + assert indexCommits.size() == 1 : "expected only one commit point"; + IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1); + return new DirectoryReader(directory, new LeafReader[0]) { + @Override + protected DirectoryReader doOpenIfChanged() throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() throws IOException { + return true; + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + return indexCommit; + } + + @Override + protected void doClose() throws IOException { + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier) throws IOException { + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final String translogUUID = loadTranslogUUIDFromLastCommit(); + // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, + engineConfig.getPrimaryTermSupplier()); + } + + /** + * Reads the current stored translog ID from the last commit data. + */ + @Nullable + private String loadTranslogUUIDFromLastCommit() { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("Commit doesn't contain translog generation id"); + } + return commitUserData.get(Translog.TRANSLOG_UUID_KEY); + } + + @Override + public boolean ensureTranslogSynced(Stream locations) { + throw new UnsupportedOperationException("Translog synchronization should never be needed"); + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index e3c1a77b84300..27bcdaa0c1ec1 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -84,6 +84,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; @@ -518,6 +519,12 @@ private synchronized IndexService createIndexService(final String reason, } private EngineFactory getEngineFactory(final IndexSettings idxSettings) { + final IndexMetaData indexMetaData = idxSettings.getIndexMetaData(); + if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) { + // NoOpEngine takes precedence as long as the index is closed + return NoOpEngine::new; + } + final List> engineFactories = engineFactoryProviders .stream() diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java new file mode 100644 index 0000000000000..ee76e44e97593 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -0,0 +1,221 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.LockObtainFailedException; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +public class NoOpEngineTests extends EngineTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + + public void testNoopEngine() throws IOException { + engine.close(); + final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir)); + expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null)); + expectThrows(UnsupportedOperationException.class, () -> engine.ensureTranslogSynced(null)); + assertThat(engine.refreshNeeded(), equalTo(false)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + engine.close(); + } + + public void testTwoNoopEngines() throws IOException { + engine.close(); + // Ensure that we can't open two noop engines for the same store + final EngineConfig engineConfig = noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir); + try (NoOpEngine ignored = new NoOpEngine(engineConfig)) { + UncheckedIOException e = expectThrows(UncheckedIOException.class, () -> new NoOpEngine(engineConfig)); + assertThat(e.getCause(), instanceOf(LockObtainFailedException.class)); + } + } + + public void testNoopAfterRegularEngine() throws IOException { + int docs = randomIntBetween(1, 10); + ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier(); + ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node", + null, true, ShardRoutingState.STARTED, allocationId); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); + tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table, Collections.emptySet()); + tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + for (int i = 0; i < docs; i++) { + ParsedDocument doc = testParsedDocument("" + i, null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + tracker.updateLocalCheckpoint(allocationId.getId(), i); + } + + flushAndTrimTranslog(engine); + + long localCheckpoint = engine.getLocalCheckpoint(); + long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo(); + engine.close(); + + final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + assertThat(noOpEngine.getLocalCheckpoint(), equalTo(localCheckpoint)); + assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo)); + try (Engine.IndexCommitRef ref = noOpEngine.acquireLastIndexCommit(false)) { + try (IndexReader reader = DirectoryReader.open(ref.getIndexCommit())) { + assertThat(reader.numDocs(), equalTo(docs)); + } + } + noOpEngine.close(); + } + + public void testNoopEngineWithInvalidTranslogUUID() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + flushAndTrimTranslog(engine); + } + + final Path newTranslogDir = createTempDir(); + // A new translog will have a different UUID than the existing store/noOp engine does + Translog newTranslog = createTranslog(newTranslogDir, () -> 1L); + newTranslog.close(); + + EngineCreationFailureException e = expectThrows(EngineCreationFailureException.class, + () -> new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, newTranslogDir))); + assertThat(e.getCause(), instanceOf(TranslogCorruptedException.class)); + } + } + + public void testNoopEngineWithNonZeroTranslogOperations() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + final MergePolicy mergePolicy = NoMergePolicy.INSTANCE; + EngineConfig config = config(defaultSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + engine.syncTranslog(); + engine.flushAndClose(); + engine.close(); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new NoOpEngine(engine.engineConfig)); + assertThat(e.getMessage(), is("Expected 0 translog operations but there were " + numDocs)); + } + } + } + + public void testNoOpEngineDocStats() throws Exception { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final int numDocs = scaledRandomIntBetween(10, 3000); + int deletions = 0; + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String delId = Integer.toString(i); + Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + assertTrue(result.isFound()); + globalCheckpoint.set(engine.getLocalCheckpoint()); + deletions += 1; + } + } + engine.waitForOpsToComplete(numDocs + deletions - 1); + flushAndTrimTranslog(engine); + engine.close(); + } + + final DocsStats expectedDocStats; + try (InternalEngine engine = createEngine(config)) { + expectedDocStats = engine.docStats(); + } + + try (NoOpEngine noOpEngine = new NoOpEngine(config)) { + assertEquals(expectedDocStats.getCount(), noOpEngine.docStats().getCount()); + assertEquals(expectedDocStats.getDeleted(), noOpEngine.docStats().getDeleted()); + assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes()); + assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes()); + } catch (AssertionError e) { + logger.error(config.getMergePolicy()); + throw e; + } + } + } + + private void flushAndTrimTranslog(final InternalEngine engine) { + engine.flush(true, true); + final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); + deletionPolicy.setRetentionSizeInBytes(-1); + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); + engine.flush(true, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 1bcb8cd29104f..5d96738e2b0b6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -60,6 +60,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; @@ -106,6 +107,7 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -666,7 +668,7 @@ public static final IndexShard newIndexShard( } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { - ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(), + ShardRouting shardRouting = newShardRouting(existingShardRouting.shardId(), existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING, existingShardRouting.allocationId()); shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"), @@ -903,4 +905,38 @@ public void testShardChangesWithDefaultDocType() throws Exception { assertThat(opsFromLucene, equalTo(opsFromTranslog)); } } + + /** + * Test that the {@link org.elasticsearch.index.engine.NoOpEngine} takes precedence over other + * engine factories if the index is closed. + */ + public void testNoOpEngineFactoryTakesPrecedence() throws IOException { + final String indexName = "closed-index"; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen(); + + client().admin().indices().prepareClose(indexName).get(); + + final ClusterService clusterService = getInstanceFromNode(ClusterService.class); + final ClusterState clusterState = clusterService.state(); + + IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final DiscoveryNode node = clusterService.localNode(); + final ShardRouting routing = + newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + + final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList()); + try { + final IndexShard indexShard = indexService.createShard(routing, id -> { + }); + indexShard.markAsRecovering("store", new RecoveryState(indexShard.routingEntry(), node, null)); + indexShard.recoverFromStore(); + assertThat(indexShard.getEngine(), instanceOf(NoOpEngine.class)); + } finally { + indexService.close("test terminated", true); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index b68ec6d0598e5..cb432f2d47531 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -549,7 +549,7 @@ public void testGetEngineFactory() throws IOException { } } - public void testConflictingEngineFactories() throws IOException { + public void testConflictingEngineFactories() { final String indexName = "foobar"; final Index index = new Index(indexName, UUIDs.randomBase64UUID()); final Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8b463f33b9081..1a53850ac0357 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -686,6 +686,14 @@ public EngineConfig config( tombstoneDocSupplier()); } + protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { + return noOpConfig(indexSettings, store, translogPath, null); + } + + protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath, LongSupplier globalCheckpointSupplier) { + return config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier); + } + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3});