Skip to content

Commit

Permalink
Report exponential_avg_bucket_processing_time which gives more weight…
Browse files Browse the repository at this point in the history
… to recent buckets (elastic#43189)
  • Loading branch information
przemekwitek committed Jun 16, 2019
1 parent c3f1e6a commit f459bcb
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,45 @@ public class TimingStats implements ToXContentObject {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");

public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
"timing_stats",
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));

static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
}

private final String jobId;
private long bucketCount;
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;

public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}

public String getJobId() {
Expand All @@ -96,6 +103,10 @@ public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}

public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
Expand All @@ -110,6 +121,9 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
Expand All @@ -123,12 +137,19 @@ public boolean equals(Object o) {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}

@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.test.AbstractXContentTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {

Expand All @@ -33,6 +34,7 @@ public static TimingStats createTestInstance(String jobId) {
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}

Expand All @@ -52,39 +54,41 @@ protected boolean supportsUnknownFields() {
}

public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);

assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
}

public void testConstructor_NullValues() {
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null);
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);

assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertNull(stats.getMinBucketProcessingTimeMs());
assertNull(stats.getMaxBucketProcessingTimeMs());
assertNull(stats.getAvgBucketProcessingTimeMs());
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
}

public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);

assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}

public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);

assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
Expand Down
9 changes: 8 additions & 1 deletion docs/reference/ml/apis/get-job-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ The API returns the following results:
"log_time": 1491948163000,
"timestamp": 1455234600000
},
"state": "closed"
"state": "closed",
"timing_stats": {
"job_id": "farequote",
"minimum_bucket_processing_time_ms": 0.0,
"maximum_bucket_processing_time_ms": 15.0,
"average_bucket_processing_time_ms": 8.75,
"exponential_average_bucket_processing_time_ms": 6.1435899
}
}
]
}
Expand Down
33 changes: 31 additions & 2 deletions docs/reference/ml/apis/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ progress of a job.

`model_size_stats`::
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>
See <<ml-modelsizestats,model size stats objects>>.

`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>
of this job. See <<ml-forecastsstats, forecasts stats objects>>.

`timing_stats`::
(object) An object that provides statistical information about timing aspect
of this job. See <<ml-timingstats, timing stats objects>>.

`node`::
(object) For open jobs only, contains information about the node where the
Expand Down Expand Up @@ -209,6 +213,31 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow
NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
these fields are omitted.

[float]
[[ml-timingstats]]
==== Timing Stats Objects

The `timing_stats` object shows timing-related statistics about the job's progress. It has the following properties:

`job_id`::
(string) A numerical character string that uniquely identifies the job.

`bucket_count`::
(long) The number of buckets processed.

`minimum_bucket_processing_time_ms`::
(double) Minimum among all bucket processing times in milliseconds.

`maximum_bucket_processing_time_ms`::
(double) Maximum among all bucket processing times in milliseconds.

`average_bucket_processing_time_ms`::
(double) Average of all bucket processing times in milliseconds.

`exponential_average_bucket_processing_time_ms`::
(double) Exponential moving average of all bucket processing times in milliseconds.


[float]
[[ml-stats-node]]
==== Node Objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build
.endObject()
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,25 @@ public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");

public static final ParseField TYPE = new ParseField("timing_stats");

public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
TYPE.getPreferredName(),
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));

static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
}

public static String documentId(String jobId) {
Expand All @@ -57,26 +61,35 @@ public static String documentId(String jobId) {
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;

public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}

public TimingStats(String jobId) {
this(jobId, 0, null, null, null);
this(jobId, 0, null, null, null, null);
}

public TimingStats(TimingStats lhs) {
this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs);
this(
lhs.jobId,
lhs.bucketCount,
lhs.minBucketProcessingTimeMs,
lhs.maxBucketProcessingTimeMs,
lhs.avgBucketProcessingTimeMs,
lhs.exponentialAvgBucketProcessingTimeMs);
}

public TimingStats(StreamInput in) throws IOException {
Expand All @@ -85,6 +98,7 @@ public TimingStats(StreamInput in) throws IOException {
this.minBucketProcessingTimeMs = in.readOptionalDouble();
this.maxBucketProcessingTimeMs = in.readOptionalDouble();
this.avgBucketProcessingTimeMs = in.readOptionalDouble();
this.exponentialAvgBucketProcessingTimeMs = in.readOptionalDouble();
}

public String getJobId() {
Expand All @@ -107,12 +121,16 @@ public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}

public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}

/**
* Updates the statistics (min, max, avg) for the given data point (bucket processing time).
*/
public void updateStats(double bucketProcessingTimeMs) {
if (bucketProcessingTimeMs < 0.0) {
throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs);
throw new IllegalArgumentException("bucketProcessingTimeMs must be non-negative, was: " + bucketProcessingTimeMs);
}
if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) {
minBucketProcessingTimeMs = bucketProcessingTimeMs;
Expand All @@ -127,16 +145,29 @@ public void updateStats(double bucketProcessingTimeMs) {
// bucket processing times.
avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1);
}
if (exponentialAvgBucketProcessingTimeMs == null) {
exponentialAvgBucketProcessingTimeMs = bucketProcessingTimeMs;
} else {
// Calculate the exponential moving average (see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) of
// bucket processing times.
exponentialAvgBucketProcessingTimeMs = (1 - ALPHA) * exponentialAvgBucketProcessingTimeMs + ALPHA * bucketProcessingTimeMs;
}
bucketCount++;
}

/**
* Constant smoothing factor used for calculating exponential moving average. Represents the degree of weighting decrease.
*/
private static double ALPHA = 0.01;

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(bucketCount);
out.writeOptionalDouble(minBucketProcessingTimeMs);
out.writeOptionalDouble(maxBucketProcessingTimeMs);
out.writeOptionalDouble(avgBucketProcessingTimeMs);
out.writeOptionalDouble(exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand All @@ -153,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
Expand All @@ -166,12 +200,19 @@ public boolean equals(Object o) {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}

@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand All @@ -185,7 +226,8 @@ public String toString() {
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs);
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
}

/**
Expand Down
Loading

0 comments on commit f459bcb

Please sign in to comment.