Skip to content

Commit

Permalink
Make Timestamps Returned by Snapshot APIs Consistent (#43148) (#44261)
Browse files Browse the repository at this point in the history
* We don't have to calculate the start and end times form the shards for the status API, we have the start time available from the CS or the `SnapshotInfo` in the repo and can either take the end time form the `SnapshotInfo` or
take the most recent time from the shard stats for in progress snapshots
* Closes #43074
  • Loading branch information
original-brownbear authored Jul 12, 2019
1 parent e490ecb commit 9e920f9
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>,
stats = new SnapshotStats();
for (SnapshotIndexShardStatus shard : shards) {
indexShards.put(shard.getShardId().getId(), shard);
stats.add(shard.getStats());
stats.add(shard.getStats(), true);
}
shardsStats = new SnapshotShardsStats(shards);
this.indexShards = unmodifiableMap(indexShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,12 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
processedSize);
}

void add(SnapshotStats stats) {
/**
* Add stats instance to the total
* @param stats Stats instance to add
* @param updateTimestamps Whether or not start time and duration should be updated
*/
void add(SnapshotStats stats, boolean updateTimestamps) {
incrementalFileCount += stats.incrementalFileCount;
totalFileCount += stats.totalFileCount;
processedFileCount += stats.processedFileCount;
Expand All @@ -317,7 +322,7 @@ void add(SnapshotStats stats) {
// First time here
startTime = stats.startTime;
time = stats.time;
} else {
} else if (updateTimestamps) {
// The time the last snapshot ends
long endTime = Math.max(startTime + time, stats.startTime + stats.time);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand Down Expand Up @@ -72,14 +72,14 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
@Nullable
private Boolean includeGlobalState;

SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards,
final Boolean includeGlobalState) {
SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards, Boolean includeGlobalState,
long startTime, long time) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
updateShardStats();
updateShardStats(startTime, time);
}

private SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards,
Expand Down Expand Up @@ -172,7 +172,16 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
includeGlobalState = in.readOptionalBoolean();
}
updateShardStats();
final long startTime;
final long time;
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
startTime = in.readLong();
time = in.readLong();
} else {
startTime = 0L;
time = 0L;
}
updateShardStats(startTime, time);
}

@Override
Expand All @@ -186,6 +195,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeOptionalBoolean(includeGlobalState);
}
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeLong(stats.getStartTime());
out.writeLong(stats.getTime());
}
}

/**
Expand Down Expand Up @@ -286,11 +299,12 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
return PARSER.parse(parser, null);
}

private void updateShardStats() {
stats = new SnapshotStats();
private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
stats.add(shard.getStats());
// BWC: only update timestamps when we did not get a start time from an old node
stats.add(shard.getStats(), startTime == 0L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
shardStatusBuilder.add(shardStatus);
}
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState()));
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(),
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)));
}
}
// Now add snapshots on disk that are not currently running
Expand Down Expand Up @@ -236,8 +237,10 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
final long startTime = snapshotInfo.startTime();
builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state,
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState()));
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState(),
startTime, snapshotInfo.endTime() - startTime));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testToString() throws Exception {
List<SnapshotIndexShardStatus> snapshotIndexShardStatuses = new ArrayList<>();
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
boolean includeGlobalState = randomBoolean();
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);

int initializingShards = 0;
int startedShards = 0;
Expand Down Expand Up @@ -166,7 +166,7 @@ protected SnapshotStatus createTestInstance() {
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
}
boolean includeGlobalState = randomBoolean();
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;

import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {

public void testStatusApiConsistency() {
Client client = client();

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
Settings.builder().put("location", randomRepoPath()).build()));

createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "_doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));

final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
assertThat(snapshotStatus.size(), equalTo(1));
final SnapshotStatus snStatus = snapshotStatus.get(0);
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
}
}

0 comments on commit 9e920f9

Please sign in to comment.