Skip to content

Commit

Permalink
Make Snapshot Logic Write Metadata after Segments
Browse files Browse the repository at this point in the history
* WIP, this still needs a docs fixup
* Fixes elastic#41581
* Fixes elastic#25281
  • Loading branch information
original-brownbear committed Aug 18, 2019
1 parent e7a8585 commit 51100e2
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,12 @@ public RepositoryData getRepositoryData() {
return in.getRepositoryData();
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
in.initializeSnapshot(snapshotId, indices, metaData);
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata) {
MetaData metaData, Map<String, Object> userMetadata) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, userMetadata);
includeGlobalState, metaData, userMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
* <p>
* To perform a snapshot:
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
Expand Down Expand Up @@ -110,15 +108,6 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
*/
RepositoryData getRepositoryData();

/**
* Starts snapshotting process
*
* @param snapshotId snapshot id
* @param indices list of indices to be snapshotted
* @param metaData cluster metadata
*/
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);

/**
* Finalizes snapshotting process
* <p>
Expand All @@ -136,7 +125,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata);
MetaData clusterMetaData, Map<String, Object> userMetadata);

/**
* Deletes snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.InvalidSnapshotNameException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
Expand Down Expand Up @@ -139,7 +138,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String TESTS_FILE = "tests-";

private static final String METADATA_PREFIX = "meta-";
public static final String METADATA_PREFIX = "meta-";

public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat";

Expand Down Expand Up @@ -353,31 +352,6 @@ public RepositoryMetaData getMetadata() {
return metadata;
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
try {
final String snapshotName = snapshotId.getName();
// check if the snapshot name already exists in the repository
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID());

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
}
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
if (isReadOnly()) {
Expand Down Expand Up @@ -558,14 +532,34 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId,
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);

try {
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing snap-${uuid}.dat below ensures that the index.latest blob is not updated in a way that
// decrements the generation it points at

// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
}

try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
Expand Down Expand Up @@ -941,7 +935,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID());
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), true);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws
* <p>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param failIfAlreadyExists Whether to fail if the blob already exists
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException {
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length(), true);
blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,7 @@
*
* <h3>Initializing a Snapshot in the Repository</h3>
*
* <p>Creating a snapshot in the repository starts with a call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which
* the blob store repository implements via the following actions:</p>
* <ol>
* <li>Verify that no snapshot by the requested name exists.</li>
* <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
* <li>Write the metadata for each index to a blob in that index's directory at
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
* </ol>
* TODO: This behavior is problematic, adjust these docs once https://github.com/elastic/elasticsearch/issues/41581 is fixed
* TODO: Adjust these docs for https://github.com/elastic/elasticsearch/issues/41581
*
* <h3>Writing Shard Data (Segments)</h3>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -99,7 +100,7 @@
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
* the {@link #beginSnapshot(SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method</li>
Expand Down Expand Up @@ -303,7 +304,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current);
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() {
beginSnapshot(newSnapshot, request.partial(), new ActionListener<>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
Expand Down Expand Up @@ -372,14 +373,11 @@ private static void validate(final String repositoryName, final String snapshotN
* <p>
* Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed.
*
* @param clusterState cluster state
* @param snapshot snapshot meta data
* @param partial allow partial snapshots
* @param userCreateSnapshotListener listener
*/
private void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
private void beginSnapshot(final SnapshotsInProgress.Entry snapshot, final boolean partial,
final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

Expand All @@ -392,17 +390,16 @@ protected void doRun() {
assert initializingSnapshots.contains(snapshot.snapshot());
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());

MetaData metaData = clusterState.metaData();
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
if (repository.isReadOnly()) {
throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
}
// check if the snapshot name already exists in the repository
final RepositoryData repositoryData = repository.getRepositoryData();
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}

repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true;

logger.info("snapshot [{}] started", snapshot.snapshot());
Expand Down Expand Up @@ -541,6 +538,9 @@ public void onNoLongerMaster() {
private void cleanupAfterError(Exception exception) {
if(snapshotCreated) {
try {
// TODO: Get the metadata from cluster state update to be safe?
final MetaData metaData = metaDataForSnapshot(snapshot);

repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
snapshot.indices(),
Expand All @@ -550,6 +550,7 @@ private void cleanupAfterError(Exception exception) {
Collections.emptyList(),
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(),
metaData,
snapshot.userMetadata());
} catch (Exception inner) {
inner.addSuppressed(exception);
Expand All @@ -559,7 +560,19 @@ private void cleanupAfterError(Exception exception) {
}
userCreateSnapshotListener.onFailure(e);
}
}

private MetaData metaDataForSnapshot(final SnapshotsInProgress.Entry snapshot) {
MetaData metaData = clusterService.state().metaData();
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}
return metaData;
}

private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
Expand Down Expand Up @@ -982,6 +995,7 @@ protected void doRun() {
unmodifiableList(shardFailures),
entry.getRepositoryStateId(),
entry.includeGlobalState(),
metaDataForSnapshot(entry),
entry.userMetadata());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,10 @@ public RepositoryData getRepositoryData() {
return null;
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {

}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Map<String, Object> userMetadata) {
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public void testBlobStoreOperations() throws IOException {
xContentRegistry(), true);

// Write blobs in different formats
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile");
checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp");
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true);
checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);

// Assert that all checksum blobs can be read by all formats
assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile");
Expand All @@ -136,8 +136,8 @@ public void testCompressionIsApplied() throws IOException {
ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), true);
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
checksumFormatComp.write(blobObj, blobContainer, "blob-comp");
checksumFormat.write(blobObj, blobContainer, "blob-not-comp");
checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true);
Map<String, BlobMetaData> blobs = blobContainer.listBlobsByPrefix("blob-");
assertEquals(blobs.size(), 2);
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
Expand All @@ -150,7 +150,7 @@ public void testBlobCorruption() throws IOException {
BlobObj blobObj = new BlobObj(testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), randomBoolean());
checksumFormat.write(blobObj, blobContainer, "test-path");
checksumFormat.write(blobObj, blobContainer, "test-path", true);
assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
Expand Down
Loading

0 comments on commit 51100e2

Please sign in to comment.