diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 7c14d41724acc..1fb8534a19cde 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -109,6 +109,10 @@ Do not ignore request analysis/similarity settings on index resize operations wh Fix NPE when CumulativeSum agg encounters null value/empty bucket ({pull}29641[#29641]) +Machine Learning:: + +* Account for gaps in data counts after job is reopened ({pull}30294[#30294]) + //[float] //=== Regressions diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 80223027e8ee0..d906ccf2f7a9b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -82,7 +82,7 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData totalRecordStats = counts; incrementalRecordStats = new DataCounts(job.getId()); - diagnostics = new DataStreamDiagnostics(job); + diagnostics = new DataStreamDiagnostics(job, counts); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java index c61926dfb0426..a4497653497ea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.utils.Intervals; +import java.util.Date; + /** * A moving window of buckets that allow keeping * track of some statistics like the bucket count, @@ -33,12 +36,17 @@ class BucketDiagnostics { private long latestFlushedBucketStartMs = -1; private final BucketFlushListener bucketFlushListener; - BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { + BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); buckets = new long[maxSize]; this.bucketFlushListener = bucketFlushListener; + + Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp(); + if (latestRecordTimestamp != null) { + addRecord(latestRecordTimestamp.getTime()); + } } void addRecord(long recordTimestampMs) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java index a19f6eba02367..a225587d0bb75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import java.util.Date; @@ -32,8 +33,8 @@ public class DataStreamDiagnostics { private long sparseBucketCount = 0; private long latestSparseBucketTime = -1; - public DataStreamDiagnostics(Job job) { - bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener()); + public DataStreamDiagnostics(Job job, DataCounts dataCounts) { + bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener()); } private BucketDiagnostics.BucketFlushListener createBucketFlushListener() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 19f7f88c38fef..0d9c52a28bd33 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; import java.util.Arrays; @@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { private static final long BUCKET_SPAN = 60000; private Job job; + private DataCounts dataCounts; @Before public void setUpMocks() { @@ -32,10 +34,11 @@ public void setUpMocks() { builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null); + dataCounts = new DataCounts(job.getId()); } public void testIncompleteBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1000); d.checkRecord(2000); @@ -81,7 +84,7 @@ public void testIncompleteBuckets() { } public void testSimple() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(70000); d.checkRecord(130000); @@ -103,7 +106,7 @@ public void testSimple() { } public void testSimpleReverse() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(610000); d.checkRecord(550000); @@ -126,7 +129,7 @@ public void testSimpleReverse() { public void testWithLatencyLessThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -141,7 +144,7 @@ public void testWithLatencyLessThanTenBuckets() { public void testWithLatencyGreaterThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -155,7 +158,7 @@ public void testWithLatencyGreaterThanTenBuckets() { } public void testEmptyBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -177,7 +180,7 @@ public void testEmptyBuckets() { } public void testEmptyBucketsStartLater() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1110000); d.checkRecord(1170000); @@ -199,7 +202,7 @@ public void testEmptyBucketsStartLater() { } public void testSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -227,7 +230,7 @@ public void testSparseBuckets() { * signal */ public void testSparseBucketsLast() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -255,7 +258,7 @@ public void testSparseBucketsLast() { * signal on the 2nd to last */ public void testSparseBucketsLastTwo() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -280,7 +283,7 @@ public void testSparseBucketsLastTwo() { } public void testMixedEmptyAndSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -308,7 +311,7 @@ public void testMixedEmptyAndSparseBuckets() { * whether counts are right. */ public void testEmptyBucketsLongerOutage() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -336,7 +339,7 @@ public void testEmptyBucketsLongerOutage() { * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -374,7 +377,7 @@ private static Job createJob(TimeValue bucketSpan, TimeValue latency) { } public void testFlushAfterZeroRecords() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.flush(); assertEquals(0, d.getBucketCount()); } diff --git a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 3b84994f5acca..e7381050260c4 100644 --- a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -241,7 +241,7 @@ public void testMiniFarequoteReopen() throws Exception { assertEquals(0, responseBody.get("invalid_date_count")); assertEquals(0, responseBody.get("missing_field_count")); assertEquals(0, responseBody.get("out_of_order_timestamp_count")); - assertEquals(0, responseBody.get("bucket_count")); + assertEquals(1000, responseBody.get("bucket_count")); // unintuitive: should return the earliest record timestamp of this feed??? assertEquals(null, responseBody.get("earliest_record_timestamp")); @@ -266,7 +266,7 @@ public void testMiniFarequoteReopen() throws Exception { assertEquals(0, dataCountsDoc.get("invalid_date_count")); assertEquals(0, dataCountsDoc.get("missing_field_count")); assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); - assertEquals(0, dataCountsDoc.get("bucket_count")); + assertEquals(1000, dataCountsDoc.get("bucket_count")); assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp")); diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java new file mode 100644 index 0000000000000..993f370723774 --- /dev/null +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests that after reopening a job and sending more + * data after a gap, data counts are reported correctly. + */ +public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase { + + private static final String JOB_ID = "reopen-job-with-gap-test"; + private static final long BUCKET_SPAN_SECONDS = 3600; + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void test() throws Exception { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList(new Detector.Builder("count", null).build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1483228800L; // 2017-01-01T00:00:00Z + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setExcludeInterim(true); + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L)); + assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L)); + + timestamp += 10 * BUCKET_SPAN_SECONDS; + data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + openJob(job.getId()); + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L)); + DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts(); + assertThat(dataCounts.getBucketCount(), equalTo(29L)); + assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L)); + } + + private static Map createRecord(long timestamp) { + Map record = new HashMap<>(); + record.put("time", timestamp); + return record; + } +}