Skip to content

Commit

Permalink
Allow searching of snapshot taken while indexing (#55511)
Browse files Browse the repository at this point in the history
Today a read-only engine requires a complete history of operations, in the
sense that its local checkpoint must equal its maximum sequence number. This is
a valid check for read-only engines that were obtained by closing an index
since closing an index waits for all in-flight operations to complete. However
a snapshot may not have this property if it was taken while indexing was
ongoing, but that's ok.

This commit weakens the check for a complete history to exclude the case of a
searchable snapshot.

Relates #50999
  • Loading branch information
DaveCTurner authored Apr 21, 2020
1 parent 2e76898 commit d6fb306
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
private final DocsStats docsStats;

public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
super(config, null, null, true, Function.identity(), true);
this.segmentsStats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = openDirectory(directory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
* Note: this engine can be opened side-by-side with a read-write engine but will not reflect any changes made to the read-write
* engine.
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function, boolean)
*/
public class ReadOnlyEngine extends Engine {

Expand All @@ -78,6 +78,7 @@ public class ReadOnlyEngine extends Engine {
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;

protected volatile TranslogStats translogStats;

Expand All @@ -92,11 +93,13 @@ public class ReadOnlyEngine extends Engine {
* @param obtainLock if <code>true</code> this engine will try to obtain the {@link IndexWriter#WRITE_LOCK_NAME} lock. Otherwise
* the lock won't be obtained
* @param readerWrapperFunction allows to wrap the index-reader for this engine.
* @param requireCompleteHistory indicates whether this engine permits an incomplete history (i.e. LCP &lt; MSN)
*/
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
Function<DirectoryReader, DirectoryReader> readerWrapperFunction, boolean requireCompleteHistory) {
super(config);
this.refreshListener = new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService());
this.requireCompleteHistory = requireCompleteHistory;
try {
Store store = config.getStore();
store.incRef();
Expand Down Expand Up @@ -137,6 +140,9 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}

protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) {
if (requireCompleteHistory == false) {
return;
}
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3325,7 +3325,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity(), true) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
engine.flush();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()),
engine.getTranslogStats(), false, Function.identity());
engine.getTranslogStats(), false, Function.identity(), true);
lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
Expand Down Expand Up @@ -136,7 +136,7 @@ public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
engine.flushAndClose();

IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> new ReadOnlyEngine(config, null, null, true, Function.identity()) {
() -> new ReadOnlyEngine(config, null, null, true, Function.identity(), true) {
@Override
protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
// we don't want the assertion to trip in this test
Expand All @@ -155,7 +155,7 @@ public void testReadOnly() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;
expectThrows(expectedException, () -> readOnlyEngine.index(null));
Expand All @@ -175,7 +175,7 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , new TranslogStats(), true, Function.identity(), true)) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
Expand Down Expand Up @@ -205,7 +205,7 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
engine.syncTranslog();
engine.flushAndClose();
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity(), true)) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());

Expand Down Expand Up @@ -247,7 +247,7 @@ public void testTranslogStats() throws IOException {
engine.flush(true, true);
}

try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4057,7 +4057,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true,
ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE);
final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetadata(),
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) {
engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity(), true) {
@Override
protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
// just like a following shard, we need to skip this check for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static EngineFactory getEngineFactory() {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.Build;
import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

public class SearchableSnapshotsConstants {
public static final boolean SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED;

static {
final String property = System.getProperty("es.searchable_snapshots_feature_enabled");
if ("true".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = true;
} else if ("false".equals(property)) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = false;
} else if (property == null) {
SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED = Build.CURRENT.isSnapshot();
} else {
throw new IllegalArgumentException(
"expected es.searchable_snapshots_feature_enabled to be unset or [true|false] but was [" + property + "]"
);
}
}

public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";

public static boolean isSearchableSnapshotStore(Settings indexSettings) {
return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
&& SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public final class FrozenEngine extends ReadOnlyEngine {
private volatile ElasticsearchDirectoryReader lastOpenedReader;
private final ElasticsearchDirectoryReader canMatchReader;

public FrozenEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
public FrozenEngine(EngineConfig config, boolean requireCompleteHistory) {
super(config, null, null, true, Function.identity(), requireCompleteHistory);

boolean success = false;
Directory directory = store.directory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
import org.elasticsearch.xpack.frozen.rest.action.RestFreezeIndexAction;
import org.elasticsearch.xpack.frozen.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.frozen.rest.action.RestFreezeIndexAction;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,12 +36,15 @@
import java.util.Optional;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class FrozenIndices extends Plugin implements ActionPlugin, EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) {
return Optional.of(FrozenEngine::new);
final boolean requireCompleteHistory = isSearchableSnapshotStore(indexSettings.getSettings()) == false;
return Optional.of(config -> new FrozenEngine(config, requireCompleteHistory));
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testAcquireReleaseReset() throws IOException {
int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
assertFalse(frozenEngine.isReaderOpen());
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException {
int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
assertFalse(frozenEngine.isReaderOpen());
Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test");
assertTrue(frozenEngine.isReaderOpen());
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testSegmentStats() throws IOException {
addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false);
assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount());
Expand Down Expand Up @@ -156,15 +156,15 @@ public void testCircuitBreakerAccounting() throws IOException {
engine.refresh("test"); // pull the reader to account for RAM in the breaker.
}
final long expectedUse;
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i)) {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i, true)) {
expectedUse = breaker.getUsed();
DocsStats docsStats = readOnlyEngine.docStats();
assertEquals(docs, docsStats.getCount());
}
assertTrue(expectedUse > 0);
assertEquals(0, breaker.getUsed());
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) {
Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
assertEquals(expectedUse, breaker.getUsed());
FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testSearchConcurrently() throws IOException, InterruptedException {
int numDocsAdded = addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
int numIters = randomIntBetween(100, 1000);
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
int numThreads = randomIntBetween(2, 4);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
Expand Down Expand Up @@ -305,7 +305,7 @@ public void testCanMatch() throws IOException {
addDocuments(globalCheckpoint, engine);
engine.flushAndClose();
listener.reset();
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) {
DirectoryReader reader;
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("can_match")) {
assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader()));
Expand Down Expand Up @@ -350,7 +350,7 @@ public void testSearchers() throws Exception {
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
}
}
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) {
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public void testRecoverFromFrozenPrimary() throws IOException {
final ShardRouting shardRouting = indexShard.routingEntry();
IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting,
shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
), indexShard.indexSettings().getIndexMetadata(), FrozenEngine::new);
), indexShard.indexSettings().getIndexMetadata(), config -> new FrozenEngine(config, true));
recoverShardFromStore(frozenShard);
assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo()));
assertDocCount(frozenShard, 3);

IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new);
IndexShard replica = newShard(false, Settings.EMPTY, config -> new FrozenEngine(config, true));
recoverReplica(replica, frozenShard, true);
assertDocCount(replica, 3);
closeShards(frozenShard, replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.nio.file.Path;

import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore;

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

Expand Down
Loading

0 comments on commit d6fb306

Please sign in to comment.