Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Jun 28, 2022
1 parent dc1d0c7 commit 492ffc0
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void onTragicFailure(AlreadyClosedException ex) {
logger,
translogDeletionPolicy,
softDeletesPolicy,
translogManager.getTranslog(false)::getLastSyncedGlobalCheckpoint
translogManager.getTranslog()::getLastSyncedGlobalCheckpoint
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
Expand Down Expand Up @@ -335,7 +335,7 @@ public void onTragicFailure(AlreadyClosedException ex) {
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo())
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo())
);
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
Expand All @@ -355,7 +355,7 @@ public void onTragicFailure(AlreadyClosedException ex) {
if (success == false) {
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog(false);
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler);
if (isClosed.get() == false) {
Expand Down Expand Up @@ -390,7 +390,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(
translogManager.getTranslog(false)::getLastSyncedGlobalCheckpoint,
translogManager.getTranslog()::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier()
Expand Down Expand Up @@ -560,7 +560,7 @@ private void revisitIndexDeletionPolicyOnTranslogSynced() {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
translogManager.getTranslog(false).trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex);
}
Expand Down Expand Up @@ -652,7 +652,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translogManager.getTranslog(false).readOperation(versionValue.getLocation());
Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
Expand Down Expand Up @@ -957,7 +957,7 @@ public IndexResult index(Index index) throws IOException {
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
Expand Down Expand Up @@ -1397,7 +1397,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
Expand Down Expand Up @@ -1724,7 +1724,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translogManager.getTranslog(false)
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
Expand Down Expand Up @@ -1826,10 +1826,10 @@ public boolean shouldPeriodicallyFlush() {
final long localCheckpointOfLastCommit = Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
final long translogGenerationOfLastCommit = translogManager.getTranslog(false)
final long translogGenerationOfLastCommit = translogManager.getTranslog()
.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translogManager.getTranslog(false).sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
if (translogManager.getTranslog().sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
}
/*
Expand All @@ -1847,7 +1847,7 @@ public boolean shouldPeriodicallyFlush() {
*
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long translogGenerationOfNewCommit = translogManager.getTranslog(false)
final long translogGenerationOfNewCommit = translogManager.getTranslog()
.getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
|| localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo();
Expand Down Expand Up @@ -1889,9 +1889,9 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
)) {
translogManager.ensureCanFlush();
try {
translogManager.getTranslog(false).rollGeneration();
translogManager.getTranslog().rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translogManager.getTranslog(false));
commitIndexWriter(indexWriter, translogManager.getTranslog());
logger.trace("finished commit for flush");

// a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
Expand All @@ -1904,7 +1904,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {

// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);
translogManager.getTranslog(false).trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
Expand Down Expand Up @@ -2121,17 +2121,16 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) {
}
failEngine("already closed by tragic event on the index writer", tragicException);
engineFailed = true;
} else if (translogManager.getTranslog(false).isOpen() == false
&& translogManager.getTranslog(false).getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException());
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
} else if (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translogManager.getTranslog().getTragicException());
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
return engineFailed;
}

Expand All @@ -2148,13 +2147,12 @@ protected boolean maybeFailEngine(String source, Exception e) {
return failOnTragicEvent((AlreadyClosedException) e);
} else if (e != null
&& ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translogManager.getTranslog(false).isOpen() == false
&& translogManager.getTranslog(false).getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
|| (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
return false;
}

Expand Down Expand Up @@ -2251,7 +2249,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
logger.warn("Failed to close ReaderManager", e);
}
try {
IOUtils.close(translogManager.getTranslog(false));
IOUtils.close(translogManager.getTranslog());
} catch (Exception e) {
logger.warn("Failed to close translog", e);
}
Expand Down Expand Up @@ -2586,7 +2584,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
// the setting will be re-interpreted if it's set to true
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog(false).getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
Expand All @@ -2602,7 +2600,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog(false).getLastSyncedGlobalCheckpoint();
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void onFailure(String reason, Exception ex) {
@Override
public void onAfterTranslogSync() {
try {
translogManager.getTranslog(false).trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex);
}
Expand All @@ -104,7 +104,7 @@ public void onAfterTranslogSync() {
} catch (IOException e) {
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog(false);
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -158,7 +158,7 @@ public boolean isThrottled() {
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand All @@ -170,7 +170,7 @@ public IndexResult index(Index index) throws IOException {
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -182,7 +182,7 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translogManager.getTranslog(false)
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
Expand Down Expand Up @@ -249,7 +249,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog(false).getLastSyncedGlobalCheckpoint();
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
Expand Down Expand Up @@ -317,7 +317,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translogManager().getTranslog(false), store::decRef);
IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down Expand Up @@ -354,7 +354,7 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog(false).getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,14 @@ private Translog openTranslog(

/**
* Returns the the translog instance
* @param ensureOpen check if the engine is open
* @return the {@link Translog} instance
*/
@Override
public Translog getTranslog(boolean ensureOpen) {
public Translog getTranslog() {
return translog;
}

private Translog getTranslog(boolean ensureOpen) {
if (ensureOpen) {
this.engineLifeCycleAware.ensureOpen();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public boolean shouldRollTranslogGeneration() {
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {}

@Override
public Translog getTranslog(boolean ensureOpen) {
public Translog getTranslog() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ public interface TranslogManager {

/**
* Returns the instance of the translog with a precondition
* @param ensureOpen check if the engine is open
* @return the translog instance
*/
Translog getTranslog(boolean ensureOpen);
Translog getTranslog();

/**
* Checks if the translog has a pending recovery
Expand Down
Loading

0 comments on commit 492ffc0

Please sign in to comment.