Skip to content

Commit

Permalink
Use Lucene history in primary-replica resync (#33178)
Browse files Browse the repository at this point in the history
This commit makes primary-replica resyncer use Lucene as the source of
history operation instead of translog if soft-deletes is enabled. With
this change, we no longer expose translog snapshot directly in IndexShard.

Relates #29530
  • Loading branch information
dnhatn committed Aug 28, 2018
1 parent 3e879b4 commit 6fb5436
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,6 @@ public enum SearcherScope {
*/
public abstract Closeable acquireRetentionLockForPeerRecovery();

/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
* The caller has to close the returned snapshot after finishing the reading.
*/
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;

public abstract TranslogStats getTranslogStats();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,6 @@ public void syncTranslog() throws IOException {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1668,15 +1668,6 @@ public Closeable acquireRetentionLockForPeerRecovery() {
return getEngine().acquireRetentionLockForPeerRecovery();
}

/**
* Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
// TODO: Remove this method after primary-replica resync use soft-deletes
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
// TODO: A follow-up to make resync using soft-deletes
snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
snapshot = indexShard.getHistoryOperations("resync", startingSeqNo);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,22 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
.isPresent(),
is(false));
}

assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations());
if (syncNeeded && globalCheckPoint < numDocs - 1) {
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
assertEquals(skippedOps, resyncTask.getSkippedOperations());
assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations());
if (shard.indexSettings.isSoftDeleteEnabled()) {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
} else {
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
}
} else {
assertEquals(0, resyncTask.getSkippedOperations());
assertEquals(0, resyncTask.getResyncedOperations());
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
assertThat(resyncTask.getTotalOperations(), equalTo(0));
}

closeShards(shard);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testPrimaryTermFromFollower() throws IOException {
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);

try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) {
try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
Expand Down

0 comments on commit 6fb5436

Please sign in to comment.