Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report exponential_avg_bucket_processing_time which gives more weight to recent buckets #43189

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,45 @@ public class TimingStats implements ToXContentObject {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");

public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
"timing_stats",
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));

static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
}

private final String jobId;
private long bucketCount;
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;

public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}

public String getJobId() {
Expand All @@ -96,6 +103,10 @@ public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}

public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
Expand All @@ -110,6 +121,9 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
Expand All @@ -123,12 +137,19 @@ public boolean equals(Object o) {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}

@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.test.AbstractXContentTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {

Expand All @@ -33,6 +34,7 @@ public static TimingStats createTestInstance(String jobId) {
randomLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}

Expand All @@ -52,39 +54,41 @@ protected boolean supportsUnknownFields() {
}

public void testConstructor() {
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);

assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
}

public void testConstructor_NullValues() {
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null);
TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);

assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getBucketCount(), equalTo(7L));
assertNull(stats.getMinBucketProcessingTimeMs());
assertNull(stats.getMaxBucketProcessingTimeMs());
assertNull(stats.getAvgBucketProcessingTimeMs());
assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
}

public void testEquals() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);

assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}

public void testHashCode() {
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);

assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
Expand Down
9 changes: 8 additions & 1 deletion docs/reference/ml/apis/get-job-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ The API returns the following results:
"log_time": 1491948163000,
"timestamp": 1455234600000
},
"state": "closed"
"state": "closed",
"timing_stats": {
"job_id": "farequote",
"minimum_bucket_processing_time_ms": 0.0,
"maximum_bucket_processing_time_ms": 15.0,
"average_bucket_processing_time_ms": 8.75,
"exponential_average_bucket_processing_time_ms": 6.1435899
}
}
]
}
Expand Down
33 changes: 31 additions & 2 deletions docs/reference/ml/apis/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ progress of a job.

`model_size_stats`::
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>
See <<ml-modelsizestats,model size stats objects>>.

`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>
of this job. See <<ml-forecastsstats, forecasts stats objects>>.

`timing_stats`::
(object) An object that provides statistical information about timing aspect
of this job. See <<ml-timingstats, timing stats objects>>.

`node`::
(object) For open jobs only, contains information about the node where the
Expand Down Expand Up @@ -209,6 +213,31 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow
NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
these fields are omitted.

[float]
[[ml-timingstats]]
==== Timing Stats Objects

The `timing_stats` object shows timing-related statistics about the job's progress. It has the following properties:

`job_id`::
(string) A numerical character string that uniquely identifies the job.

`bucket_count`::
(long) The number of buckets processed.

`minimum_bucket_processing_time_ms`::
(double) Minimum among all bucket processing times in milliseconds.

`maximum_bucket_processing_time_ms`::
(double) Maximum among all bucket processing times in milliseconds.

`average_bucket_processing_time_ms`::
(double) Average of all bucket processing times in milliseconds.

`exponential_average_bucket_processing_time_ms`::
(double) Exponential moving average of all bucket processing times in milliseconds.


[float]
[[ml-stats-node]]
==== Node Objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build
.endObject()
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,25 @@ public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");

public static final ParseField TYPE = new ParseField("timing_stats");

public static final ConstructingObjectParser<TimingStats, Void> PARSER =
new ConstructingObjectParser<>(
TYPE.getPreferredName(),
true,
args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
args ->
new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));

static {
PARSER.declareString(constructorArg(), Job.ID);
PARSER.declareLong(constructorArg(), BUCKET_COUNT);
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
}

public static String documentId(String jobId) {
Expand All @@ -57,26 +61,35 @@ public static String documentId(String jobId) {
private Double minBucketProcessingTimeMs;
private Double maxBucketProcessingTimeMs;
private Double avgBucketProcessingTimeMs;
private Double exponentialAvgBucketProcessingTimeMs;

public TimingStats(
String jobId,
long bucketCount,
@Nullable Double minBucketProcessingTimeMs,
@Nullable Double maxBucketProcessingTimeMs,
@Nullable Double avgBucketProcessingTimeMs) {
@Nullable Double avgBucketProcessingTimeMs,
@Nullable Double exponentialAvgBucketProcessingTimeMs) {
this.jobId = jobId;
this.bucketCount = bucketCount;
this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
}

public TimingStats(String jobId) {
this(jobId, 0, null, null, null);
this(jobId, 0, null, null, null, null);
}

public TimingStats(TimingStats lhs) {
this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs);
this(
lhs.jobId,
lhs.bucketCount,
lhs.minBucketProcessingTimeMs,
lhs.maxBucketProcessingTimeMs,
lhs.avgBucketProcessingTimeMs,
lhs.exponentialAvgBucketProcessingTimeMs);
}

public TimingStats(StreamInput in) throws IOException {
Expand All @@ -85,6 +98,7 @@ public TimingStats(StreamInput in) throws IOException {
this.minBucketProcessingTimeMs = in.readOptionalDouble();
this.maxBucketProcessingTimeMs = in.readOptionalDouble();
this.avgBucketProcessingTimeMs = in.readOptionalDouble();
this.exponentialAvgBucketProcessingTimeMs = in.readOptionalDouble();
}

public String getJobId() {
Expand All @@ -107,12 +121,16 @@ public Double getAvgBucketProcessingTimeMs() {
return avgBucketProcessingTimeMs;
}

public Double getExponentialAvgBucketProcessingTimeMs() {
return exponentialAvgBucketProcessingTimeMs;
}

/**
* Updates the statistics (min, max, avg) for the given data point (bucket processing time).
*/
public void updateStats(double bucketProcessingTimeMs) {
if (bucketProcessingTimeMs < 0.0) {
throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs);
throw new IllegalArgumentException("bucketProcessingTimeMs must be non-negative, was: " + bucketProcessingTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#pedantry
So, is 0.0 considered a negative number? 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you've seen the latest revision of this comment ("bucketProcessingTimeMs must be non-negative, was: ") or not. But now I believe it is correct i.e. if the value is less than 0 we throw an error that it should be non-negative (so 0 or greater than 0).

}
if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) {
minBucketProcessingTimeMs = bucketProcessingTimeMs;
Expand All @@ -127,16 +145,29 @@ public void updateStats(double bucketProcessingTimeMs) {
// bucket processing times.
avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1);
}
if (exponentialAvgBucketProcessingTimeMs == null) {
exponentialAvgBucketProcessingTimeMs = bucketProcessingTimeMs;
} else {
// Calculate the exponential moving average (see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) of
// bucket processing times.
exponentialAvgBucketProcessingTimeMs = (1 - ALPHA) * exponentialAvgBucketProcessingTimeMs + ALPHA * bucketProcessingTimeMs;
}
bucketCount++;
}

/**
* Constant smoothing factor used for calculating exponential moving average. Represents the degree of weighting decrease.
*/
private static double ALPHA = 0.01;

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(bucketCount);
out.writeOptionalDouble(minBucketProcessingTimeMs);
out.writeOptionalDouble(maxBucketProcessingTimeMs);
out.writeOptionalDouble(avgBucketProcessingTimeMs);
out.writeOptionalDouble(exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand All @@ -153,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (avgBucketProcessingTimeMs != null) {
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
}
Expand All @@ -166,12 +200,19 @@ public boolean equals(Object o) {
&& this.bucketCount == that.bucketCount
&& Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
&& Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
&& Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
&& Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
}

@Override
public int hashCode() {
return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
return Objects.hash(
jobId,
bucketCount,
minBucketProcessingTimeMs,
maxBucketProcessingTimeMs,
avgBucketProcessingTimeMs,
exponentialAvgBucketProcessingTimeMs);
}

@Override
Expand All @@ -185,7 +226,8 @@ public String toString() {
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs);
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
}

/**
Expand Down
Loading