Skip to content

Commit

Permalink
simonw feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Apr 26, 2019
1 parent b9f5f02 commit 8ba5ca5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
searcherManager = new SearcherManager(reader, searcherFactory);
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureNoUncommittedOperation(seqNoStats, lastCommittedSegmentInfos);
}
this.seqNoStats = seqNoStats;
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
if (obtainLock) {
ensureNoUncommittedOperation(this.seqNoStats, lastCommittedSegmentInfos);
}
success = true;
} finally {
if (success == false) {
Expand All @@ -129,8 +131,13 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
* so peer recovery of closed indices can skip phase 2 (i.e., not replaying translog operations) without losing data.
*/
private void ensureNoUncommittedOperation(SeqNoStats seqNoStats, SegmentInfos segmentInfos) throws IOException {
// we can't enforce this check on an old index - should we prevent this engine as a recovery source?
assert indexWriterLock != null;
if (config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_7_0)) {
// we can't enforce this check on an old index
return;
}
if (seqNoStats.getGlobalCheckpoint() == seqNoStats.getMaxSeqNo()) {
// we are good - no need to open translog
return;
}
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
Expand All @@ -139,13 +146,13 @@ private void ensureNoUncommittedOperation(SeqNoStats seqNoStats, SegmentInfos se
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(recoverTranslogGeneration.translogFileGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(recoverTranslogGeneration.translogFileGeneration);
final LocalCheckpointTracker localCheckpointTracker;
try (DirectoryReader reader = DirectoryReader.open(indexCommit)) {
localCheckpointTracker = createLocalCheckpointTracker(engineConfig, segmentInfos, logger,
() -> new Searcher("build_checkpoint_tracker", new IndexSearcher(reader), () -> {}), LocalCheckpointTracker::new);
}
try (Translog translog = new Translog(engineConfig.getTranslogConfig(), translogUUID, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier())) {
final LocalCheckpointTracker localCheckpointTracker;
try (DirectoryReader reader = DirectoryReader.open(indexCommit)) {
localCheckpointTracker = createLocalCheckpointTracker(engineConfig, segmentInfos, logger,
() -> new Searcher("build_checkpoint_tracker", new IndexSearcher(reader), () -> {}), LocalCheckpointTracker::new);
}
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(recoverTranslogGeneration, Long.MAX_VALUE)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;

import java.io.IOException;
Expand Down Expand Up @@ -140,29 +139,26 @@ public void testEnsureNoUncommittedOperations() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
boolean hasPendingOps = false;
List<Engine.Operation> ops = generateHistoryOnReplica(between(10, 500), randomBoolean(), false, randomBoolean());
Randomness.shuffle(ops);
final DocsStats docStats;
List<Engine.Operation> uncommittedOps = randomSubsetOf(ops);
ops.removeAll(uncommittedOps);
try (InternalEngine engine = createEngine(config)) {
for (Engine.Operation op : ops) {
hasPendingOps = true;
applyOperation(engine, op);
if (rarely()) {
engine.flush(true, true);
hasPendingOps = false;
}
}
engine.refresh("test");
docStats = engine.docStats();
engine.flush(true, true);
globalCheckpoint.set(engine.getLocalCheckpoint());
for (Engine.Operation op : uncommittedOps) {
applyOperation(engine, op);
}
}
if (hasPendingOps) {
if (uncommittedOps.isEmpty() == false) {
AssertionError error = expectThrows(AssertionError.class,
() -> new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity()));
() -> new ReadOnlyEngine(config, null, null, true, Function.identity()));
assertThat(error.getMessage(), containsString("does not contain operation"));
} else {
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity())) {
assertThat(readOnlyEngine.docStats(), equalTo(docStats));
try (ReadOnlyEngine ignored = new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity())) {
}
}
}
Expand Down

0 comments on commit 8ba5ca5

Please sign in to comment.