From 492ffc0556c6a47fc392ac9af146702080609cf0 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 28 Jun 2022 18:28:07 +0530 Subject: [PATCH] Review comments Signed-off-by: Bukhtawar Khan --- .../index/engine/InternalEngine.java | 70 +++++----- .../index/engine/NRTReplicationEngine.java | 16 +-- .../translog/InternalTranslogManager.java | 7 +- .../index/translog/NoOpTranslogManager.java | 2 +- .../index/translog/TranslogManager.java | 3 +- .../index/engine/InternalEngineTests.java | 127 ++++++++++-------- .../engine/NRTReplicationEngineTests.java | 16 ++- .../index/engine/NoOpEngineTests.java | 3 +- .../InternalTranslogManagerTests.java | 28 ++-- .../index/engine/EngineTestCase.java | 9 +- 10 files changed, 153 insertions(+), 128 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index cf0fd7f21d961..b4d1cea6eb136 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -301,7 +301,7 @@ public void onTragicFailure(AlreadyClosedException ex) { logger, translogDeletionPolicy, softDeletesPolicy, - translogManager.getTranslog(false)::getLastSyncedGlobalCheckpoint + translogManager.getTranslog()::getLastSyncedGlobalCheckpoint ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); @@ -335,7 +335,7 @@ public void onTragicFailure(AlreadyClosedException ex) { this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong( - SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo()) + SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo()) ); if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { @@ -355,7 +355,7 @@ public void onTragicFailure(AlreadyClosedException ex) { if (success == false) { Translog translog = null; if (translogManagerRef != null) { - translog = translogManagerRef.getTranslog(false); + translog = translogManagerRef.getTranslog(); } IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler); if (isClosed.get() == false) { @@ -390,7 +390,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; } return new SoftDeletesPolicy( - translogManager.getTranslog(false)::getLastSyncedGlobalCheckpoint, + translogManager.getTranslog()::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(), engineConfig.retentionLeasesSupplier() @@ -560,7 +560,7 @@ private void revisitIndexDeletionPolicyOnTranslogSynced() { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); } - translogManager.getTranslog(false).trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (IOException ex) { throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex); } @@ -652,7 +652,7 @@ public GetResult get(Get get, BiFunction // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 if (versionValue.getLocation() != null) { try { - Translog.Operation operation = translogManager.getTranslog(false).readOperation(versionValue.getLocation()); + Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation()); if (operation != null) { // in the case of a already pruned translog generation we might get null here - yet very unlikely final Translog.Index index = (Translog.Index) operation; @@ -957,7 +957,7 @@ public IndexResult index(Index index) throws IOException { if (index.origin().isFromTranslog() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { - location = translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no final NoOp noOp = new NoOp( @@ -1397,7 +1397,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); @@ -1724,7 +1724,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translogManager.getTranslog(false) + final Translog.Location location = translogManager.getTranslog() .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } @@ -1826,10 +1826,10 @@ public boolean shouldPeriodicallyFlush() { final long localCheckpointOfLastCommit = Long.parseLong( lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); - final long translogGenerationOfLastCommit = translogManager.getTranslog(false) + final long translogGenerationOfLastCommit = translogManager.getTranslog() .getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - if (translogManager.getTranslog(false).sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { + if (translogManager.getTranslog().sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* @@ -1847,7 +1847,7 @@ public boolean shouldPeriodicallyFlush() { * * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long translogGenerationOfNewCommit = translogManager.getTranslog(false) + final long translogGenerationOfNewCommit = translogManager.getTranslog() .getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration; return translogGenerationOfLastCommit < translogGenerationOfNewCommit || localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo(); @@ -1889,9 +1889,9 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { )) { translogManager.ensureCanFlush(); try { - translogManager.getTranslog(false).rollGeneration(); + translogManager.getTranslog().rollGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translogManager.getTranslog(false)); + commitIndexWriter(indexWriter, translogManager.getTranslog()); logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved @@ -1904,7 +1904,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); - translogManager.getTranslog(false).trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2121,17 +2121,16 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) { } failEngine("already closed by tragic event on the index writer", tragicException); engineFailed = true; - } else if (translogManager.getTranslog(false).isOpen() == false - && translogManager.getTranslog(false).getTragicException() != null) { - failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException()); - engineFailed = true; - } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? - // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by - // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error - throw new AssertionError("Unexpected AlreadyClosedException", ex); - } else { - engineFailed = false; - } + } else if (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() != null) { + failEngine("already closed by tragic event on the translog", translogManager.getTranslog().getTragicException()); + engineFailed = true; + } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? + // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by + // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error + throw new AssertionError("Unexpected AlreadyClosedException", ex); + } else { + engineFailed = false; + } return engineFailed; } @@ -2148,13 +2147,12 @@ protected boolean maybeFailEngine(String source, Exception e) { return failOnTragicEvent((AlreadyClosedException) e); } else if (e != null && ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e) - || (translogManager.getTranslog(false).isOpen() == false - && translogManager.getTranslog(false).getTragicException() == e))) { - // this spot on - we are handling the tragic event exception here so we have to fail the engine - // right away - failEngine(source, e); - return true; - } + || (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() == e))) { + // this spot on - we are handling the tragic event exception here so we have to fail the engine + // right away + failEngine(source, e); + return true; + } return false; } @@ -2251,7 +2249,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { logger.warn("Failed to close ReaderManager", e); } try { - IOUtils.close(translogManager.getTranslog(false)); + IOUtils.close(translogManager.getTranslog()); } catch (Exception e) { logger.warn("Failed to close translog", e); } @@ -2586,7 +2584,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran // the setting will be re-interpreted if it's set to true updateAutoIdTimestamp(Long.MAX_VALUE, true); } - final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog(false).getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); @@ -2602,7 +2600,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() { @Override public long getLastSyncedGlobalCheckpoint() { - return translogManager.getTranslog(false).getLastSyncedGlobalCheckpoint(); + return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 26129488d488d..b16e550baeacb 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -92,7 +92,7 @@ public void onFailure(String reason, Exception ex) { @Override public void onAfterTranslogSync() { try { - translogManager.getTranslog(false).trimUnreferencedReaders(); + translogManager.getTranslog().trimUnreferencedReaders(); } catch (IOException ex) { throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex); } @@ -104,7 +104,7 @@ public void onAfterTranslogSync() { } catch (IOException e) { Translog translog = null; if (translogManagerRef != null) { - translog = translogManagerRef.getTranslog(false); + translog = translogManagerRef.getTranslog(); } IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog); throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -158,7 +158,7 @@ public boolean isThrottled() { public IndexResult index(Index index) throws IOException { ensureOpen(); IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); - final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -170,7 +170,7 @@ public IndexResult index(Index index) throws IOException { public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); - final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -182,7 +182,7 @@ public DeleteResult delete(Delete delete) throws IOException { public NoOpResult noOp(NoOp noOp) throws IOException { ensureOpen(); NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); - final Translog.Location location = translogManager.getTranslog(false) + final Translog.Location location = translogManager.getTranslog() .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); noOpResult.setTook(System.nanoTime() - noOp.startTime()); @@ -249,7 +249,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { @Override public long getLastSyncedGlobalCheckpoint() { - return translogManager.getTranslog(false).getLastSyncedGlobalCheckpoint(); + return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); } @Override @@ -317,7 +317,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - IOUtils.close(readerManager, translogManager().getTranslog(false), store::decRef); + IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); } finally { @@ -354,7 +354,7 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { @Override public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog(false).getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 22f72cc3d9acd..3f4acf6ab5f11 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -309,11 +309,14 @@ private Translog openTranslog( /** * Returns the the translog instance - * @param ensureOpen check if the engine is open * @return the {@link Translog} instance */ @Override - public Translog getTranslog(boolean ensureOpen) { + public Translog getTranslog() { + return translog; + } + + private Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { this.engineLifeCycleAware.ensureOpen(); } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 07cae808ce071..88e6ce97b2784 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -93,7 +93,7 @@ public boolean shouldRollTranslogGeneration() { public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} @Override - public Translog getTranslog(boolean ensureOpen) { + public Translog getTranslog() { return null; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 988a88c5d2ae5..dc2c2e20015b0 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -96,10 +96,9 @@ public interface TranslogManager { /** * Returns the instance of the translog with a precondition - * @param ensureOpen check if the engine is open * @return the translog instance */ - Translog getTranslog(boolean ensureOpen); + Translog getTranslog(); /** * Checks if the translog has a pending recovery diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 52b095cc9e059..3d40273328c6f 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -1263,21 +1263,25 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { } engine.flush(); - assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); - assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); - assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); - assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(3L)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(3L)); } public void testSyncTranslogConcurrently() throws Exception { @@ -2781,9 +2785,11 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint) ); - initialEngine.translogManager().getTranslog(true).sync(); // to guarantee the global checkpoint is written to the translog - // checkpoint - assertThat(initialEngine.translogManager().getTranslog(true).getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + initialEngine.ensureOpen(); + initialEngine.translogManager().getTranslog().sync(); // to guarantee the global checkpoint is written to the translog + // checkpoint + initialEngine.ensureOpen(); + assertThat(initialEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat(Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2800,7 +2806,8 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo) ); - assertThat(recoveringEngine.translogManager().getTranslog(true).getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + recoveringEngine.ensureOpen(); + assertThat(recoveringEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert @@ -3213,7 +3220,8 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE) ); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.translogManager().getTranslog(true).getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } // open and recover tlog @@ -3222,17 +3230,13 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals( - engine.translogManager().getTranslog(true).getTranslogUUID(), - userData.get(Translog.TRANSLOG_UUID_KEY) - ); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals( - engine.translogManager().getTranslog(true).getTranslogUUID(), - userData.get(Translog.TRANSLOG_UUID_KEY) - ); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } } @@ -3247,10 +3251,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.translogManager().getTranslog(true).getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); - assertEquals(2, engine.translogManager().getTranslog(true).currentFileGeneration()); - assertEquals(0L, engine.translogManager().getTranslog(true).stats().getUncommittedOperations()); + engine.ensureOpen(); + assertEquals(2, engine.translogManager().getTranslog().currentFileGeneration()); + assertEquals(0L, engine.translogManager().getTranslog().stats().getUncommittedOperations()); } } @@ -3259,16 +3265,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals( - engine.translogManager().getTranslog(true).getTranslogUUID(), - userData.get(Translog.TRANSLOG_UUID_KEY) - ); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals( - engine.translogManager().getTranslog(true).getTranslogUUID(), - userData.get(Translog.TRANSLOG_UUID_KEY) - ); + engine.ensureOpen(); + assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } } @@ -3409,8 +3411,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I final long localCheckpoint = Long.parseLong( engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); + engine.ensureOpen(); final long committedGen = engine.translogManager() - .getTranslog(true) + .getTranslog() .getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); @@ -3596,7 +3599,8 @@ public void testRecoverFromForeignTranslog() throws IOException { assertThat(index.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - Translog.TranslogGeneration generation = engine.translogManager().getTranslog(true).getGeneration(); + engine.ensureOpen(); + Translog.TranslogGeneration generation = engine.translogManager().getTranslog().getGeneration(); engine.close(); final Path badTranslogLog = createTempDir(); @@ -3692,7 +3696,8 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier { long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); }; - final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog(true).stats().getUncommittedSizeInBytes() + final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { @@ -5959,10 +5970,11 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.index(indexForDoc(doc)); } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); + engine.ensureOpen(); long flushThreshold = RandomNumbers.randomLongBetween( random(), 120, - engine.translogManager().getTranslog(true).stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine + engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine ); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) @@ -5978,7 +5990,8 @@ public void testShouldPeriodicallyFlush() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(numDocs)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); @@ -6034,11 +6047,13 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(1)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); - assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(2)); + engine.ensureOpen(); + assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { @@ -6073,7 +6088,8 @@ public void testStressShouldPeriodicallyFlush() throws Exception { final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - if (rarely() && engine.translogManager().getTranslog(true).shouldRollGeneration()) { + engine.ensureOpen(); + if (rarely() && engine.translogManager().getTranslog().shouldRollGeneration()) { engine.translogManager().rollTranslogGeneration(); } if (rarely() || engine.shouldPeriodicallyFlush()) { @@ -6296,7 +6312,8 @@ public void testTrimUnsafeCommits() throws Exception { } globalCheckpoint.set(randomInt(maxSeqNo)); engine.translogManager().syncTranslog(); - minTranslogGen = engine.translogManager().getTranslog(true).getMinFileGeneration(); + engine.ensureOpen(); + minTranslogGen = engine.translogManager().getTranslog().getMinFileGeneration(); } store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath()); @@ -6828,7 +6845,8 @@ public void testStoreHonorsLuceneVersion() throws IOException { public void testMaxSeqNoInCommitUserData() throws Exception { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> { - while (running.get() && engine.translogManager().getTranslog(true).currentFileGeneration() < 500) { + engine.ensureOpen(); + while (running.get() && engine.translogManager().getTranslog().currentFileGeneration() < 500) { engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower } }); @@ -6995,10 +7013,11 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { + engine.ensureOpen(); assertThat( "engine should trim all unreferenced translog after recovery", - engine.translogManager().getTranslog(true).getMinFileGeneration(), - equalTo(engine.translogManager().getTranslog(true).currentFileGeneration()) + engine.translogManager().getTranslog().getMinFileGeneration(), + equalTo(engine.translogManager().getTranslog().currentFileGeneration()) ); } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 5cff37744e5d7..0a590cc0e286f 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -91,7 +91,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertEquals(getDocIds(engine, true), docs); } - assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog(true)); + assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog()); } } @@ -136,7 +136,8 @@ public void testUpdateSegments() throws Exception { Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); - try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog(true).newSnapshot()) { + nrtEngine.ensureOpen(); + try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), @@ -149,11 +150,11 @@ public void testUpdateSegments() throws Exception { assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos); assertEquals( - nrtEngine.translogManager().getTranslog(true).getGeneration().translogFileGeneration, - engine.translogManager().getTranslog(true).getGeneration().translogFileGeneration + nrtEngine.translogManager().getTranslog().getGeneration().translogFileGeneration, + engine.translogManager().getTranslog().getGeneration().translogFileGeneration ); - try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog(true).newSnapshot()) { + try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), @@ -167,7 +168,7 @@ public void testUpdateSegments() throws Exception { expectedDocCount = test.count(Queries.newMatchAllQuery()); assertSearcherHits(nrtEngine, expectedDocCount); } - assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog(true)); + assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog()); } } @@ -186,7 +187,8 @@ public void testTrimTranslogOps() throws Exception { ); applyOperations(nrtEngine, operations); Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); - try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog(true).newSnapshot()) { + nrtEngine.ensureOpen(); + try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index 9f0f376ab3b0f..db3e705c4d765 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -221,7 +221,8 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { } } // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. - final Translog.Snapshot snapshot = engine.translogManager().getTranslog(true).newSnapshot(); + engine.ensureOpen(); + final Translog.Snapshot snapshot = engine.translogManager().getTranslog().newSnapshot(); engine.flush(true, true); engine.close(); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 4db792b4a3fc2..9cc58e250d74e 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -56,7 +56,7 @@ public void testRecoveryFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -64,7 +64,7 @@ public void testRecoveryFromTranslog() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -103,7 +103,7 @@ public void onBeginTranslogRecovery() { assertTrue(onTranslogRecoveryInvoked.get()); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -131,7 +131,7 @@ public void testTranslogRollsGeneration() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -139,7 +139,7 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -164,7 +164,7 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, opsRecovered.get()); assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -192,7 +192,7 @@ public void testTrimOperationsFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); } long maxSeqNo = tracker.getMaxSeqNo(); assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); @@ -202,7 +202,7 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.rollTranslogGeneration(); translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -227,7 +227,7 @@ public void testTrimOperationsFromTranslog() throws IOException { assertEquals(0, opsRecovered.get()); assertEquals(0, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException { @Override public void onAfterTranslogSync() { try { - translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders(); + translogManagerAtomicReference.get().getTranslog().trimUnreferencedReaders(); syncListenerInvoked.set(true); } catch (IOException ex) { fail("Failed due to " + ex); @@ -265,15 +265,15 @@ public void onAfterTranslogSync() { translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false); - translogManager.getTranslog(false).add(new Translog.Index(index, indexResult)); + translogManager.getTranslog().add(new Translog.Index(index, indexResult)); translogManager.syncTranslog(); - assertThat(translogManager.getTranslog(true).currentFileGeneration(), equalTo(2L)); - assertThat(translogManager.getTranslog(true).getMinFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog().currentFileGeneration(), equalTo(2L)); + assertThat(translogManager.getTranslog().getMinFileGeneration(), equalTo(2L)); assertTrue(syncListenerInvoked.get()); } finally { - translogManager.getTranslog(false).close(); + translogManager.getTranslog().close(); } } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 4a8be3e2f8198..aa0689b58baf9 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -328,10 +328,12 @@ public void tearDown() throws Exception { super.tearDown(); try { if (engine != null && engine.isClosed.get() == false) { - assertEngineCleanedUp(engine, engine.translogManager().getTranslog(true)); + engine.ensureOpen(); + assertEngineCleanedUp(engine, engine.translogManager().getTranslog()); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog(true)); + replicaEngine.ensureOpen(); + assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog()); } } finally { IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); @@ -1480,7 +1482,8 @@ public static MapperService createMapperService() throws IOException { public static Translog getTranslog(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.translogManager().getTranslog(true); + internalEngine.ensureOpen(); + return internalEngine.translogManager().getTranslog(); } /**