Skip to content

Commit

Permalink
Move GlobalCheckpointTracker and remove SequenceNumbersService (#27837)
Browse files Browse the repository at this point in the history
This commit moves GlobalCheckpointTracker from the engine to IndexShard, where it better fits logically: Tracking the global checkpoint based on the local checkpoints of all shards in the replication group is not a property of the engine, but rather a property fulfilled by the current primary shard. The LocalCheckpointTracker on the other hand is driven by the contents of the local translog. By moving GlobalCheckpointTracker to IndexShard, it makes little sense to keep the SequenceNumbersService class around - it would only wrap the LocalCheckpointTracker. This commit therefore removes the class and replaces occurrences of SequenceNumbersService in the engine directly by LocalCheckpointTracker.
  • Loading branch information
ywelsch authored Dec 18, 2017
1 parent af3f636 commit a5e8a22
Show file tree
Hide file tree
Showing 25 changed files with 330 additions and 611 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -400,14 +399,14 @@ void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable
public interface ReplicaResponse {

/**
* The local checkpoint for the shard. See {@link SequenceNumbersService#getLocalCheckpoint()}.
* The local checkpoint for the shard.
*
* @return the local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard. See {@link SequenceNumbersService#getGlobalCheckpoint()}.
* The global checkpoint for the shard.
*
* @return the global checkpoint
**/
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -567,7 +567,7 @@ public CommitStats commitStats() {
*
* @return the sequence number service
*/
public abstract SequenceNumbersService seqNoService();
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Read the last segments info from the commit pointed to by the searcher manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

/*
* Holds all the configuration that is used to create an {@link Engine}.
Expand Down Expand Up @@ -78,6 +79,7 @@ public final class EngineConfig {
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -124,7 +126,8 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService) {
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand Down Expand Up @@ -155,6 +158,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

/**
Expand Down Expand Up @@ -227,6 +231,13 @@ public Store getStore() {
return store;
}

/**
* Returns the global checkpoint tracker
*/
public LongSupplier getGlobalCheckpointSupplier() {
return globalCheckpointSupplier;
}

/**
* Returns the {@link org.apache.lucene.index.MergePolicy} for the engines {@link org.apache.lucene.index.IndexWriter}
*/
Expand Down
102 changes: 46 additions & 56 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -67,9 +68,8 @@
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -123,7 +123,7 @@ public class InternalEngine extends Engine {

private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
private final LocalCheckpointTracker localCheckpointTracker;

private final String uidField;

Expand All @@ -150,12 +150,12 @@ public class InternalEngine extends Engine {
private final String historyUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, InternalEngine::sequenceNumberService);
this(engineConfig, LocalCheckpointTracker::new);
}

InternalEngine(
final EngineConfig engineConfig,
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier) {
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
Expand All @@ -179,14 +179,12 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
translog = openTranslog(engineConfig, translogDeletionPolicy, seqNoService::getGlobalCheckpoint);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
Expand Down Expand Up @@ -237,6 +235,27 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
break;
case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo();
maxSeqNo = seqNoStats.v1();
localCheckpoint = seqNoStats.v2();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break;
default: throw new IllegalArgumentException("unknown type: " + openMode);
}
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
Expand Down Expand Up @@ -310,12 +329,12 @@ protected int getRefCount(IndexSearcher reference) {
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
seqNoService.markSeqNoAsCompleted(operation.seqNo());
localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
}
}
}
Expand All @@ -326,17 +345,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqNo = seqNoService.getMaxSeqNo();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final long maxSeqNo = localCheckpointTracker.getMaxSeqNo();
int numNoOpsAdded = 0;
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
seqNo = localCheckpointTracker.getCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]";
assert seqNo <= localCheckpointTracker.getCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + localCheckpointTracker.getCheckpoint() + "]";

}
return numNoOpsAdded;
Expand All @@ -354,35 +373,6 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}

static SequenceNumbersService sequenceNumberService(
final EngineConfig engineConfig,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
default:
throw new IllegalArgumentException(openMode.toString());
}
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -732,7 +722,7 @@ private long generateSeqNoForOperation(final Operation operation) {
* @return the sequence number
*/
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoService.generateSeqNo();
return localCheckpointTracker.generateSeqNo();
}

@Override
Expand Down Expand Up @@ -803,7 +793,7 @@ public IndexResult index(Index index) throws IOException {
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand Down Expand Up @@ -834,7 +824,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1100,7 +1090,7 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -1126,7 +1116,7 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
Expand Down Expand Up @@ -1273,7 +1263,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(seqNo);
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
}
}
Expand Down Expand Up @@ -2017,7 +2007,7 @@ private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
Expand All @@ -2040,7 +2030,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
logger.trace("committing writer with commit data [{}]", commitData);
Expand Down Expand Up @@ -2104,8 +2094,8 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

public final SequenceNumbersService seqNoService() {
return seqNoService;
public final LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand All @@ -55,7 +56,7 @@
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier {

/**
* The allocation ID for the shard to which this tracker is a component of.
Expand Down Expand Up @@ -214,7 +215,7 @@ public int hashCode() {
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
public synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
Expand Down Expand Up @@ -329,7 +330,7 @@ private static long inSyncCheckpointStates(
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointTracker(
public GlobalCheckpointTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -374,6 +375,11 @@ public synchronized long getGlobalCheckpoint() {
return cps.globalCheckpoint;
}

@Override
public long getAsLong() {
return getGlobalCheckpoint();
}

/**
* Updates the global checkpoint on a replica shard after it has been updated by the primary.
*
Expand Down
Loading

0 comments on commit a5e8a22

Please sign in to comment.