Skip to content

Commit

Permalink
[CCR] Add total fetch time leader stat (#34577)
Browse files Browse the repository at this point in the history
Add total fetch time leader stat, that
keeps track how much time was spent on fetches
from the leader cluster perspective.
  • Loading branch information
martijnvg authored Oct 23, 2018
1 parent ca6808e commit e6d87cc
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -67,6 +68,8 @@ public static class Request extends SingleShardRequest<Request> {
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE;

private long relativeStartNanos;

public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());
this.shardId = shardId;
Expand Down Expand Up @@ -142,6 +145,9 @@ public void readFrom(StreamInput in) throws IOException {
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);

// Starting the clock in order to know how much time is spent on fetching operations:
relativeStartNanos = System.nanoTime();
}

@Override
Expand Down Expand Up @@ -220,6 +226,12 @@ public Translog.Operation[] getOperations() {
return operations;
}

private long tookInMillis;

public long getTookInMillis() {
return tookInMillis;
}

Response() {
}

Expand All @@ -228,13 +240,15 @@ public Translog.Operation[] getOperations() {
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
final Translog.Operation[] operations) {
final Translog.Operation[] operations,
final long tookInMillis) {

this.mappingVersion = mappingVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
this.operations = operations;
this.tookInMillis = tookInMillis;
}

@Override
Expand All @@ -245,6 +259,7 @@ public void readFrom(final StreamInput in) throws IOException {
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
tookInMillis = in.readVLong();
}

@Override
Expand All @@ -255,6 +270,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
out.writeArray(Translog.Operation::writeOperation, operations);
out.writeVLong(tookInMillis);
}

@Override
Expand All @@ -266,12 +282,14 @@ public boolean equals(final Object o) {
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Arrays.equals(operations, that.operations);
Arrays.equals(operations, that.operations) &&
tookInMillis == that.tookInMillis;
}

@Override
public int hashCode() {
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
Arrays.hashCode(operations), tookInMillis);
}
}

Expand Down Expand Up @@ -308,7 +326,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
}

@Override
Expand Down Expand Up @@ -373,7 +391,8 @@ private void globalCheckpointAdvancementFailure(
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
request.relativeStartNanos));
} catch (final Exception caught) {
caught.addSuppressed(e);
listener.onFailure(caught);
Expand Down Expand Up @@ -459,8 +478,11 @@ static Translog.Operation[] getOperations(
}

static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
long tookInNanos = System.nanoTime() - relativeStartNanos;
long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
operations, tookInMillis);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private int numConcurrentReads = 0;
private int numConcurrentWrites = 0;
private long currentMappingVersion = 0;
private long totalFetchTookTimeMillis = 0;
private long totalFetchTimeMillis = 0;
private long numberOfSuccessfulFetches = 0;
private long numberOfFailedFetches = 0;
Expand Down Expand Up @@ -238,6 +239,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
fetchExceptions.remove(from);
if (response.getOperations().length > 0) {
// do not count polls against fetch stats
totalFetchTookTimeMillis += response.getTookInMillis();
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
numberOfSuccessfulFetches++;
operationsReceived += response.getOperations().length;
Expand Down Expand Up @@ -449,6 +451,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
buffer.size(),
currentMappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ protected ShardChangesAction.Response createTestInstance() {
leaderGlobalCheckpoint,
leaderMaxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
operations
operations,
randomNonNegativeLong()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
final long globalCheckpoint = tracker.getCheckpoint();
final long maxSeqNo = tracker.getMaxSeqNo();
handler.accept(new ShardChangesAction.Response(
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0]));
0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L));
}
};
threadPool.generic().execute(task);
Expand Down Expand Up @@ -233,7 +233,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
nextGlobalCheckPoint,
nextGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(EMPTY))
ops.toArray(EMPTY),
randomNonNegativeLong())
)
);
responses.put(prevGlobalCheckpoint, item);
Expand All @@ -256,7 +257,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
prevGlobalCheckpoint,
prevGlobalCheckpoint,
randomNonNegativeLong(),
EMPTY
EMPTY,
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
}
Expand All @@ -273,7 +275,8 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
localLeaderGCP,
localLeaderGCP,
randomNonNegativeLong(),
ops.toArray(EMPTY)
ops.toArray(EMPTY),
randomNonNegativeLong()
);
item.add(new TestResponse(null, mappingVersion, response));
responses.put(fromSeqNo, Collections.unmodifiableList(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void testReceiveNothingExpectedSomething() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0]));
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L));

assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
Expand Down Expand Up @@ -782,7 +782,8 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
leaderGlobalCheckpoints.poll(),
maxSeqNos.poll(),
randomNonNegativeLong(),
operations
operations,
1L
);
handler.accept(response);
}
Expand Down Expand Up @@ -813,7 +814,8 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro
leaderGlobalCheckPoint,
leaderGlobalCheckPoint,
randomNonNegativeLong(),
ops.toArray(new Translog.Operation[0])
ops.toArray(new Translog.Operation[0]),
1L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
if (from > seqNoStats.getGlobalCheckpoint()) {
handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
return;
}
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
Expand All @@ -440,7 +440,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdatesOrDeletes,
ops
ops,
1L
);
handler.accept(response);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void testToXContent() throws IOException {
final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE);
final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
final long totalFetchTimeMillis = randomLongBetween(0, 4096);
final long totalFetchTookTimeMillis = randomLongBetween(0, 4096);
final long numberOfSuccessfulFetches = randomNonNegativeLong();
final long numberOfFailedFetches = randomLongBetween(0, 8);
final long operationsReceived = randomNonNegativeLong();
Expand Down Expand Up @@ -122,6 +123,7 @@ public void testToXContent() throws IOException {
numberOfQueuedWrites,
mappingVersion,
totalFetchTimeMillis,
totalFetchTookTimeMillis,
numberOfSuccessfulFetches,
numberOfFailedFetches,
operationsReceived,
Expand Down Expand Up @@ -166,6 +168,7 @@ public void testToXContent() throws IOException {
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
+ "\"mapping_version\":" + mappingVersion + ","
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
+ "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + ","
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
+ "\"operations_received\":" + operationsReceived + ","
Expand Down Expand Up @@ -208,6 +211,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
1,
1,
100,
50,
10,
0,
10,
Expand All @@ -226,7 +230,6 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
Map<String, Object> template =
XContentHelper.convertToMap(XContentType.JSON.xContent(), MonitoringTemplateUtils.loadTemplate("es"), false);
Map<?, ?> followStatsMapping = (Map<?, ?>) XContentMapValues.extractValue("mappings.doc.properties.ccr_stats.properties", template);

assertThat(serializedStatus.size(), equalTo(followStatsMapping.size()));
for (Map.Entry<String, Object> entry : serializedStatus.entrySet()) {
String fieldName = entry.getKey();
Expand Down
Loading

0 comments on commit e6d87cc

Please sign in to comment.