From c899f63bfd9a2d1012688dd8234f3399f28c5b14 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 22 Feb 2019 11:35:08 +0100 Subject: [PATCH] ReadOnlyEngine should update translog recovery state information (#39238) (#39254) * ReadOnlyEngine should update translog recovery state information (#39238) `ReadOnlyEngine` never recovers operations from translog and never updates translog information in the index shard's recovery state, even though the recovery state goes through the `TRANSLOG` stage during the recovery. It means that recovery information for frozen shards indicates an unkown number of recovered translog ops in the Recovery APIs (translog_ops: `-1` and translog_ops_percent: `-1.0%`) and this is confusing. This commit changes the `recoverFromTranslog()` method in `ReadOnlyEngine` so that it always recover from an empty translog snapshot, allowing the recovery state translog information to be correctly updated. Related to #33888 --- .../index/engine/ReadOnlyEngine.java | 42 +++++++++++++------ .../index/engine/ReadOnlyEngineTests.java | 32 ++++++++++++++ .../index/engine/FrozenIndexTests.java | 37 ++++++++++++++++ 3 files changed, 98 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c464a34e78b01..230b550539ee5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -34,6 +34,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SeqNoStats; @@ -287,18 +288,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - return new Translog.Snapshot() { - @Override - public void close() { } - @Override - public int totalOperations() { - return 0; - } - @Override - public Translog.Operation next() { - return null; - } - }; + return newEmptySnapshot(); } @Override @@ -429,7 +419,15 @@ public int fillSeqNoGaps(long primaryTerm) { } @Override - public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) { + public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + try (Translog.Snapshot snapshot = newEmptySnapshot()) { + translogRecoveryRunner.run(this, snapshot); + } catch (final Exception e) { + throw new EngineException(shardId, "failed to recover from empty translog snapshot", e); + } + } return this; } @@ -468,4 +466,22 @@ protected void processReaders(IndexReader reader, IndexReader previousReader) { public boolean refreshNeeded() { return false; } + + private Translog.Snapshot newEmptySnapshot() { + return new Translog.Snapshot() { + @Override + public void close() { + } + + @Override + public int totalOperations() { + return 0; + } + + @Override + public Translog.Operation next() { + return null; + } + }; + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 1f28888af13e5..5c7f3f537055b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -210,4 +210,36 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { } } } + + public void testRecoverFromTranslogAppliesNoOperations() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, VersionType.EXTERNAL, + Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(i); + } + engine.syncTranslog(); + engine.flushAndClose(); + } + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { + final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); + readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); + + assertThat(translogHandler.appliedOperations(), equalTo(0L)); + } + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 094c79efb50a9..c71cf35960d67 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -26,6 +28,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; @@ -48,8 +51,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; public class FrozenIndexTests extends ESSingleNodeTestCase { @@ -370,4 +375,36 @@ public void testFreezeEmptyIndexWithTranslogOps() throws Exception { assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName))); assertIndexFrozen(indexName); } + + public void testRecoveryState() throws ExecutionException, InterruptedException { + final String indexName = "index_recovery_state"; + createIndex(indexName, Settings.builder() + .put("index.number_of_replicas", 0) + .build()); + + final long nbDocs = randomIntBetween(0, 50); + for (long i = 0; i < nbDocs; i++) { + final IndexResponse indexResponse = client().prepareIndex(indexName, "_doc", Long.toString(i)).setSource("field", i).get(); + assertThat(indexResponse.status(), is(RestStatus.CREATED)); + } + + assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName))); + assertIndexFrozen(indexName); + + final IndexMetaData indexMetaData = client().admin().cluster().prepareState().get().getState().metaData().index(indexName); + final IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(indexMetaData.getIndex()); + for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { + final IndexShard indexShard = indexService.getShardOrNull(i); + assertThat("Shard [" + i + "] is missing for index " + indexMetaData.getIndex(), indexShard, notNullValue()); + final RecoveryState recoveryState = indexShard.recoveryState(); + assertThat(recoveryState.getRecoverySource(), is(RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + assertThat(recoveryState.getStage(), is(RecoveryState.Stage.DONE)); + assertThat(recoveryState.getTargetNode(), notNullValue()); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0)); + assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(0)); + assertThat(recoveryState.getTranslog().totalOperations(), equalTo(0)); + assertThat(recoveryState.getTranslog().recoveredPercent(), equalTo(100.0f)); + } + } }