Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Snapshot Logic Write Metadata after Segments #45689

Merged
merged 12 commits into from
Aug 30, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata) {
MetaData metaData, Map<String, Object> userMetadata) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, userMetadata);
includeGlobalState, metaData, userMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
* <p>
* To perform a snapshot:
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
Expand Down Expand Up @@ -116,7 +114,11 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* @param snapshotId snapshot id
* @param indices list of indices to be snapshotted
* @param metaData cluster metadata
*
* @deprecated this method is only used when taking snapshots in a mixed version cluster where a master node older than
* {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION} is present.
*/
@Deprecated
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);

/**
Expand All @@ -136,7 +138,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata);
MetaData clusterMetaData, Map<String, Object> userMetadata);

/**
* Deletes snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.InvalidSnapshotNameException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
Expand Down Expand Up @@ -143,7 +142,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String TESTS_FILE = "tests-";

private static final String METADATA_PREFIX = "meta-";
public static final String METADATA_PREFIX = "meta-";

public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat";

Expand Down Expand Up @@ -358,23 +357,13 @@ public RepositoryMetaData getMetadata() {

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
try {
final String snapshotName = snapshotId.getName();
// check if the snapshot name already exists in the repository
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true);

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true);
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
Expand Down Expand Up @@ -610,14 +599,34 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId,
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);

try {
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
// decrements the generation it points at

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
}

try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
Expand Down Expand Up @@ -995,7 +1004,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID());
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), false);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws
* <p>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param failIfAlreadyExists Whether to fail if the blob already exists
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException {
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length(), true);
blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,19 @@
*
* <p>Creating a snapshot in the repository happens in the three steps described in detail below.</p>
*
* <h3>Initializing a Snapshot in the Repository</h3>
* <h3>Initializing a Snapshot in the Repository (Mixed Version Clusters only)</h3>
*
* <p>Creating a snapshot in the repository starts with a call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which
* the blob store repository implements via the following actions:</p>
* <p>In mixed version clusters that contain a node older than
* {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION}, creating a snapshot in the repository starts with a
* call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which the blob store repository implements via the
* following actions:</p>
* <ol>
* <li>Verify that no snapshot by the requested name exists.</li>
* <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
* <li>Write the metadata for each index to a blob in that index's directory at
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
* </ol>
* TODO: This behavior is problematic, adjust these docs once https://github.com/elastic/elasticsearch/issues/41581 is fixed
* TODO: Remove this section once BwC logic it references is removed
*
* <h3>Writing Shard Data (Segments)</h3>
*
Expand Down Expand Up @@ -164,6 +166,9 @@
* to finalizing the snapshot by invoking {@link org.elasticsearch.repositories.Repository#finalizeSnapshot}. This method executes the
* following actions in order:</p>
* <ol>
* <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
* <li>Write the metadata for each index to a blob in that index's directory at
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
* <li>Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat}
* directly under the repository root.</li>
* <li>Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -108,12 +110,19 @@
* the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
* as completed</li>
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry, MetaData)} finalizes snapshot in the repository,
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls
* {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
* </ul>
*/
public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {

/**
* Minimum node version which does not use {@link Repository#initializeSnapshot(SnapshotId, List, MetaData)} to write snapshot metadata
* when starting a snapshot.
*/
public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_8_0_0;

private static final Logger logger = LogManager.getLogger(SnapshotsService.class);

private final ClusterService clusterService;
Expand Down Expand Up @@ -398,24 +407,29 @@ protected void doRun() {
assert initializingSnapshots.contains(snapshot.snapshot());
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());

MetaData metaData = clusterState.metaData();
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
if (repository.isReadOnly()) {
throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
}
final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
// check if the snapshot name already exists in the repository
if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) {
// In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an
// older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid
// snapshot.
repository.initializeSnapshot(
snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData()));
}

repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true;

logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot);
endSnapshot(snapshot, clusterState.metaData());
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
Expand Down Expand Up @@ -498,7 +512,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assert snapshotsInProgress != null;
final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
assert entry != null;
endSnapshot(entry);
endSnapshot(entry, newState.metaData());
}
}
});
Expand Down Expand Up @@ -556,6 +570,7 @@ private void cleanupAfterError(Exception exception) {
Collections.emptyList(),
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
snapshot.userMetadata());
} catch (Exception inner) {
inner.addSuppressed(exception);
Expand All @@ -565,7 +580,18 @@ private void cleanupAfterError(Exception exception) {
}
userCreateSnapshotListener.onFailure(e);
}
}

private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
if (snapshot.includeGlobalState() == false) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}
return metaData;
}

private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
Expand Down Expand Up @@ -713,7 +739,7 @@ public void applyClusterState(ClusterChangedEvent event) {
entry -> entry.state().completed()
|| initializingSnapshots.contains(entry.snapshot()) == false
&& (entry.state() == State.INIT || completed(entry.shards().values()))
).forEach(this::endSnapshot);
).forEach(entry -> endSnapshot(entry, event.state().metaData()));
}
if (newMaster) {
finalizeSnapshotDeletionFromPreviousMaster(event);
Expand Down Expand Up @@ -960,7 +986,7 @@ private static Tuple<Set<String>, Set<String>> indicesWithMissingShards(
*
* @param entry snapshot
*/
private void endSnapshot(final SnapshotsInProgress.Entry entry) {
private void endSnapshot(SnapshotsInProgress.Entry entry, MetaData metaData) {
if (endingSnapshots.add(entry.snapshot()) == false) {
return;
}
Expand Down Expand Up @@ -988,6 +1014,7 @@ protected void doRun() {
unmodifiableList(shardFailures),
entry.getRepositoryStateId(),
entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
entry.userMetadata());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Map<String, Object> userMetadata) {
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
return null;
}

Expand Down
Loading