Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not load global checkpoint to ReplicationTracker in local recovery step #44781

Merged
merged 2 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1407,7 +1408,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
return recoveredOps;
};
innerOpenEngineAndTranslog();
innerOpenEngineAndTranslog(() -> globalCheckpoint);
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
Expand Down Expand Up @@ -1533,6 +1534,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
return opsRecovered;
}

private void loadGlobalCheckpointToReplicationTracker() throws IOException {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
}

/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
Expand All @@ -1548,7 +1558,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

Expand All @@ -1559,25 +1570,20 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
innerOpenEngineAndTranslog();
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog() throws IOException {
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
final EngineConfig config = newEngineConfig();
final EngineConfig config = newEngineConfig(globalCheckpointSupplier);

// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
updateRetentionLeasesOnReplica(loadRetentionLeases());
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
Expand Down Expand Up @@ -2646,7 +2652,7 @@ private DocumentMapperForType docMapper(String type) {
mapperService.resolveDocumentType(type));
}

private EngineConfig newEngineConfig() {
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
Expand All @@ -2656,7 +2662,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
}

Expand Down Expand Up @@ -3293,7 +3299,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) {
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (mutex) {
Expand Down Expand Up @@ -3322,7 +3328,7 @@ public void close() throws IOException {
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -51,6 +52,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -76,6 +78,7 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
Expand All @@ -84,7 +87,6 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -137,12 +139,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class);
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class);
}

@After
public void assertConsistentHistoryInLuceneIndex() throws Exception {
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
internalCluster().assertSeqNos();
internalCluster().assertSameDocIdsOnShards();
}

private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary,
Expand Down Expand Up @@ -1049,7 +1055,8 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) {
if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) {
assertThat("total recovered translog operations must include both local and remote recovery",
recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
recoveryState.getTranslog().recoveredOperations(),
greaterThanOrEqualTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
}
}
for (String node : nodes) {
Expand Down Expand Up @@ -1116,4 +1123,30 @@ public void testRepeatedRecovery() throws Exception {
ensureGreen(indexName);
}

public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes));
final String indexName = "test";
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), randomBoolean())).get());
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
.mapToObj(n -> client().prepareIndex(indexName, "type").setSource("foo", "bar"))
.collect(Collectors.toList());
indexRandom(randomBoolean(), true, true, indexRequests);
ensureGreen();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
final String nodeWithoutData = internalCluster().startDataOnlyNode();
assertAcked(client().admin().cluster().prepareReroute()
.add(new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeWithoutData, true)).get());
internalCluster().startDataOnlyNode(randomNodeDataPathSettings);
ensureGreen();
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,31 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(shard);

// good copy
shard = newStartedShard(false);
long globalCheckpoint = populateData.apply(shard);
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
assertTrue(safeCommit.isPresent());
int expectedTotalLocal = 0;
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromMinSeqNo(safeCommit.get().localCheckpoint + 1)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
if (op.seqNo() <= globalCheckpoint) {
expectedTotalLocal++;
}
}
}
IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
replica.prepareForIndexRecovery();
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1));
assertThat(replica.recoveryState().getTranslog().totalLocal(),
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(),
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);

// corrupted copy
Expand All @@ -192,6 +201,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);

// copy with truncated translog
Expand All @@ -213,6 +223,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
closeShards(replica);
}
}