Skip to content

Commit

Permalink
Do not load global checkpoint to ReplicationTracker in local recovery…
Browse files Browse the repository at this point in the history
… step (elastic#44781)

If we force allocate an empty or stale primary, the global checkpoint on
replicas might be higher than the primary's as the local recovery step
(introduced in elastic#43463) loads the previous (stale) global checkpoint into
ReplicationTracker. There's no issue with the retention leases for a new
lease with a higher term will supersede the stale one.

Relates elastic#43463
  • Loading branch information
dnhatn authored Jul 24, 2019
1 parent 06d9be6 commit 6275cd7
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 24 deletions.
36 changes: 21 additions & 15 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1407,7 +1408,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
return recoveredOps;
};
innerOpenEngineAndTranslog();
innerOpenEngineAndTranslog(() -> globalCheckpoint);
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
Expand Down Expand Up @@ -1533,6 +1534,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
return opsRecovered;
}

private void loadGlobalCheckpointToReplicationTracker() throws IOException {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
}

/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
Expand All @@ -1548,7 +1558,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

Expand All @@ -1559,25 +1570,20 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
innerOpenEngineAndTranslog();
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog() throws IOException {
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
final EngineConfig config = newEngineConfig();
final EngineConfig config = newEngineConfig(globalCheckpointSupplier);

// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
updateRetentionLeasesOnReplica(loadRetentionLeases());
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
Expand Down Expand Up @@ -2646,7 +2652,7 @@ private DocumentMapperForType docMapper(String type) {
mapperService.resolveDocumentType(type));
}

private EngineConfig newEngineConfig() {
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
Expand All @@ -2656,7 +2662,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
}

Expand Down Expand Up @@ -3293,7 +3299,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) {
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (mutex) {
Expand Down Expand Up @@ -3322,7 +3328,7 @@ public void close() throws IOException {
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -51,6 +52,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -76,6 +78,7 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
Expand All @@ -84,7 +87,6 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -137,12 +139,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class);
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class);
}

@After
public void assertConsistentHistoryInLuceneIndex() throws Exception {
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
internalCluster().assertSeqNos();
internalCluster().assertSameDocIdsOnShards();
}

private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary,
Expand Down Expand Up @@ -1049,7 +1055,8 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) {
if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) {
assertThat("total recovered translog operations must include both local and remote recovery",
recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
recoveryState.getTranslog().recoveredOperations(),
greaterThanOrEqualTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
}
}
for (String node : nodes) {
Expand Down Expand Up @@ -1116,4 +1123,30 @@ public void testRepeatedRecovery() throws Exception {
ensureGreen(indexName);
}

public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes));
final String indexName = "test";
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), randomBoolean())).get());
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
.mapToObj(n -> client().prepareIndex(indexName, "type").setSource("foo", "bar"))
.collect(Collectors.toList());
indexRandom(randomBoolean(), true, true, indexRequests);
ensureGreen();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
final String nodeWithoutData = internalCluster().startDataOnlyNode();
assertAcked(client().admin().cluster().prepareReroute()
.add(new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeWithoutData, true)).get());
internalCluster().startDataOnlyNode(randomNodeDataPathSettings);
ensureGreen();
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,31 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(shard);

// good copy
shard = newStartedShard(false);
long globalCheckpoint = populateData.apply(shard);
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
assertTrue(safeCommit.isPresent());
int expectedTotalLocal = 0;
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromMinSeqNo(safeCommit.get().localCheckpoint + 1)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
if (op.seqNo() <= globalCheckpoint) {
expectedTotalLocal++;
}
}
}
IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(),
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(),
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);

// corrupted copy
Expand All @@ -192,6 +201,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);

// copy with truncated translog
Expand All @@ -213,6 +223,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
}
}

0 comments on commit 6275cd7

Please sign in to comment.