From e731d066751218acac0d149b0e421ce8cf7be566 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 9 Apr 2019 18:41:49 +0300 Subject: [PATCH] Ensure analytics can be resumed --- .../dataframe/DataFrameAnalyticsManager.java | 8 +- .../DataFrameDataExtractorFactory.java | 127 ++-------------- .../extractor/ExtractedFieldsDetector.java | 143 ++++++++++++++++++ ...java => ExtractedFieldsDetectorTests.java} | 118 +++++++++++---- 4 files changed, 243 insertions(+), 153 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/{DataFrameDataExtractorFactoryTests.java => ExtractedFieldsDetectorTests.java} (61%) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 4319bcada0bbd..02369adfd785d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -101,7 +101,7 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current // The task has fully reindexed the documents and we should continue on with our analyses case ANALYZING: // TODO apply previously stored model state if applicable - startAnalytics(task, config); + startAnalytics(task, config, true); break; // If we are already at REINDEXING, we are not 100% sure if we reindexed ALL the docs. // We will delete the destination index, recreate, reindex @@ -139,7 +139,7 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) { // Reindexing is complete; start analytics ActionListener refreshListener = ActionListener.wrap( - refreshResponse -> startAnalytics(task, config), + refreshResponse -> startAnalytics(task, config, false), task::markAsFailed ); @@ -177,7 +177,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF createDestinationIndex(config.getSource().getIndex(), config.getDest().getIndex(), config.getHeaders(), copyIndexCreatedListener); } - private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) { + private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, boolean isTaskRestarting) { // Update state to ANALYZING and start process ActionListener dataExtractorFactoryListener = ActionListener.wrap( dataExtractorFactory -> { @@ -201,7 +201,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi // TODO This could fail with errors. In that case we get stuck with the copied index. // We could delete the index in case of failure or we could try building the factory before reindexing // to catch the error early on. - DataFrameDataExtractorFactory.create(client, config, dataExtractorFactoryListener); + DataFrameDataExtractorFactory.create(client, config, isTaskRestarting, dataExtractorFactoryListener); } private void createDestinationIndex(String sourceIndex, String destinationIndex, Map headers, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index 1d45c4055510e..f7fc0faf0b011 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -7,59 +7,22 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; -import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.core.ml.utils.NameResolver; -import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class DataFrameDataExtractorFactory { - /** - * Fields to ignore. These are mostly internal meta fields. - */ - private static final List IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no", - "_source", "_type", "_uid", "_version", "_feature", "_ignored"); - - /** - * The types supported by data frames - */ - private static final Set COMPATIBLE_FIELD_TYPES; - - static { - Set compatibleTypes = Stream.of(NumberFieldMapper.NumberType.values()) - .map(NumberFieldMapper.NumberType::typeName) - .collect(Collectors.toSet()); - compatibleTypes.add("scaled_float"); // have to add manually since scaled_float is in a module - - COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes); - } - private final Client client; private final String analyticsId; private final String index; @@ -95,14 +58,15 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) { * * @param client ES Client used to make calls against the cluster * @param config The config from which to create the extractor factory + * @param isTaskRestarting Whether the task is restarting * @param listener The listener to notify on creation or failure */ public static void create(Client client, DataFrameAnalyticsConfig config, + boolean isTaskRestarting, ActionListener listener) { - validateIndexAndExtractFields(client, config.getHeaders(), config.getDest().getIndex(), config.getDest().getResultsField(), - config.getAnalysesFields(), ActionListener.wrap( - extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory( + validateIndexAndExtractFields(client, config.getDest().getIndex(), config, isTaskRestarting, + ActionListener.wrap(extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory( client, config.getId(), config.getDest().getIndex(), extractedFields, config.getHeaders())), listener::onFailure )); @@ -118,8 +82,7 @@ public static void create(Client client, public static void validateConfigAndSourceIndex(Client client, DataFrameAnalyticsConfig config, ActionListener listener) { - validateIndexAndExtractFields(client, config.getHeaders(), config.getSource().getIndex(), config.getDest().getResultsField(), - config.getAnalysesFields(), ActionListener.wrap( + validateIndexAndExtractFields(client, config.getSource().getIndex(), config, false, ActionListener.wrap( fields -> { config.getSource().getParsedQuery(); // validate query is acceptable listener.onResponse(config); @@ -128,86 +91,15 @@ public static void validateConfigAndSourceIndex(Client client, )); } - // Visible for testing - static ExtractedFields detectExtractedFields(String index, - String resultsField, - FetchSourceContext desiredFields, - FieldCapabilitiesResponse fieldCapabilitiesResponse) { - Set fields = fieldCapabilitiesResponse.get().keySet(); - fields.removeAll(IGNORE_FIELDS); - - if (fields.contains(resultsField)) { - throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" + - " please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(), - DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), resultsField, - DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName()); - } - - removeFieldsWithIncompatibleTypes(fields, fieldCapabilitiesResponse); - includeAndExcludeFields(fields, desiredFields, index); - List sortedFields = new ArrayList<>(fields); - // We sort the fields to ensure the checksum for each document is deterministic - Collections.sort(sortedFields); - ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse) - .filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); - if (extractedFields.getAllFields().isEmpty()) { - throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index); - } - return extractedFields; - } - - private static void removeFieldsWithIncompatibleTypes(Set fields, FieldCapabilitiesResponse fieldCapabilitiesResponse) { - Iterator fieldsIterator = fields.iterator(); - while (fieldsIterator.hasNext()) { - String field = fieldsIterator.next(); - Map fieldCaps = fieldCapabilitiesResponse.getField(field); - if (fieldCaps == null || COMPATIBLE_FIELD_TYPES.containsAll(fieldCaps.keySet()) == false) { - fieldsIterator.remove(); - } - } - } - - private static void includeAndExcludeFields(Set fields, FetchSourceContext desiredFields, String index) { - if (desiredFields == null) { - return; - } - String includes = desiredFields.includes().length == 0 ? "*" : Strings.arrayToCommaDelimitedString(desiredFields.includes()); - String excludes = Strings.arrayToCommaDelimitedString(desiredFields.excludes()); - - if (Regex.isMatchAllPattern(includes) && excludes.isEmpty()) { - return; - } - try { - // If the inclusion set does not match anything, that means the user's desired fields cannot be found in - // the collection of supported field types. We should let the user know. - Set includedSet = NameResolver.newUnaliased(fields, - (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) - .expand(includes, false); - // If the exclusion set does not match anything, that means the fields are already not present - // no need to raise if nothing matched - Set excludedSet = NameResolver.newUnaliased(fields, - (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) - .expand(excludes, true); - - fields.retainAll(includedSet); - fields.removeAll(excludedSet); - } catch (ResourceNotFoundException ex) { - // Re-wrap our exception so that we throw the same exception type when there are no fields. - throw ExceptionsHelper.badRequestException(ex.getMessage()); - } - - } - private static void validateIndexAndExtractFields(Client client, - Map headers, String index, - String resultsField, - FetchSourceContext desiredFields, + DataFrameAnalyticsConfig config, + boolean isTaskRestarting, ActionListener listener) { // Step 2. Extract fields (if possible) and notify listener ActionListener fieldCapabilitiesHandler = ActionListener.wrap( fieldCapabilitiesResponse -> listener.onResponse( - detectExtractedFields(index, resultsField, desiredFields, fieldCapabilitiesResponse)), + new ExtractedFieldsDetector(index, config, isTaskRestarting, fieldCapabilitiesResponse).detect()), e -> { if (e instanceof IndexNotFoundException) { listener.onFailure(new ResourceNotFoundException("cannot retrieve data because index " @@ -222,10 +114,11 @@ private static void validateIndexAndExtractFields(Client client, FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); fieldCapabilitiesRequest.indices(index); fieldCapabilitiesRequest.fields("*"); - ClientHelper.executeWithHeaders(headers, ClientHelper.ML_ORIGIN, client, () -> { + ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); // This response gets discarded - the listener handles the real response return null; }); } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java new file mode 100644 index 0000000000000..1c363b62bfaef --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.extractor; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.NameResolver; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ExtractedFieldsDetector { + + /** + * Fields to ignore. These are mostly internal meta fields. + */ + private static final List IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no", + "_source", "_type", "_uid", "_version", "_feature", "_ignored"); + + /** + * The types supported by data frames + */ + private static final Set COMPATIBLE_FIELD_TYPES; + + static { + Set compatibleTypes = Stream.of(NumberFieldMapper.NumberType.values()) + .map(NumberFieldMapper.NumberType::typeName) + .collect(Collectors.toSet()); + compatibleTypes.add("scaled_float"); // have to add manually since scaled_float is in a module + + COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes); + } + + private final String index; + private final DataFrameAnalyticsConfig config; + private final boolean isTaskRestarting; + private final FieldCapabilitiesResponse fieldCapabilitiesResponse; + + ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, + FieldCapabilitiesResponse fieldCapabilitiesResponse) { + this.index = Objects.requireNonNull(index); + this.config = Objects.requireNonNull(config); + this.isTaskRestarting = isTaskRestarting; + this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse); + } + + public ExtractedFields detect() { + Set fields = fieldCapabilitiesResponse.get().keySet(); + fields.removeAll(IGNORE_FIELDS); + + checkResultsFieldIsNotPresent(fields, index); + + // Ignore fields under the results object + fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + ".")); + + removeFieldsWithIncompatibleTypes(fields); + includeAndExcludeFields(fields, index); + List sortedFields = new ArrayList<>(fields); + // We sort the fields to ensure the checksum for each document is deterministic + Collections.sort(sortedFields); + ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse) + .filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); + if (extractedFields.getAllFields().isEmpty()) { + throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index); + } + return extractedFields; + } + + private void checkResultsFieldIsNotPresent(Set fields, String index) { + // If the task is restarting we do not mind the index containing the results field, we will overwrite all docs + if (isTaskRestarting == false && fields.contains(config.getDest().getResultsField())) { + throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" + + " please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(), + DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), config.getDest().getResultsField(), + DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName()); + } + } + + private void removeFieldsWithIncompatibleTypes(Set fields) { + Iterator fieldsIterator = fields.iterator(); + while (fieldsIterator.hasNext()) { + String field = fieldsIterator.next(); + Map fieldCaps = fieldCapabilitiesResponse.getField(field); + if (fieldCaps == null || COMPATIBLE_FIELD_TYPES.containsAll(fieldCaps.keySet()) == false) { + fieldsIterator.remove(); + } + } + } + + private void includeAndExcludeFields(Set fields, String index) { + FetchSourceContext analysesFields = config.getAnalysesFields(); + if (analysesFields == null) { + return; + } + String includes = analysesFields.includes().length == 0 ? "*" : Strings.arrayToCommaDelimitedString(analysesFields.includes()); + String excludes = Strings.arrayToCommaDelimitedString(analysesFields.excludes()); + + if (Regex.isMatchAllPattern(includes) && excludes.isEmpty()) { + return; + } + try { + // If the inclusion set does not match anything, that means the user's desired fields cannot be found in + // the collection of supported field types. We should let the user know. + Set includedSet = NameResolver.newUnaliased(fields, + (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) + .expand(includes, false); + // If the exclusion set does not match anything, that means the fields are already not present + // no need to raise if nothing matched + Set excludedSet = NameResolver.newUnaliased(fields, + (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) + .expand(excludes, true); + + fields.retainAll(includedSet); + fields.removeAll(excludedSet); + } catch (ResourceNotFoundException ex) { + // Re-wrap our exception so that we throw the same exception type when there are no fields. + throw ExceptionsHelper.badRequestException(ex.getMessage()); + } + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java similarity index 61% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactoryTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index fa6e3e44de24a..905349aa72840 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -10,6 +10,10 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalysisConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; @@ -26,64 +30,71 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class DataFrameDataExtractorFactoryTests extends ESTestCase { +public class ExtractedFieldsDetectorTests extends ESTestCase { - private static final String INDEX = "source_index"; + private static final String SOURCE_INDEX = "source_index"; + private static final String DEST_INDEX = "dest_index"; private static final String RESULTS_FIELD = "ml"; - private static final FetchSourceContext EMPTY_CONTEXT = new FetchSourceContext(true, new String[0], new String[0]); - public void testDetectExtractedFields_GivenFloatField() { + public void testDetect_GivenFloatField() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("some_float", "float").build(); - ExtractedFields extractedFields = - DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(1)); assertThat(allFields.get(0).getName(), equalTo("some_float")); } - public void testDetectExtractedFields_GivenNumericFieldWithMultipleTypes() { + public void testDetect_GivenNumericFieldWithMultipleTypes() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("some_number", "long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float") .build(); - ExtractedFields extractedFields = - DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(1)); assertThat(allFields.get(0).getName(), equalTo("some_number")); } - public void testDetectExtractedFields_GivenNonNumericField() { + public void testDetect_GivenNonNumericField() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("some_keyword", "keyword").build(); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities)); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } - public void testDetectExtractedFields_GivenFieldWithNumericAndNonNumericTypes() { + public void testDetect_GivenFieldWithNumericAndNonNumericTypes() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("indecisive_field", "float", "keyword").build(); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities)); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } - public void testDetectExtractedFields_GivenMultipleFields() { + public void testDetect_GivenMultipleFields() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("some_float", "float") .addAggregatableField("some_long", "long") .addAggregatableField("some_keyword", "keyword") .build(); - ExtractedFields extractedFields = - DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); assertThat(allFields.size(), equalTo(2)); @@ -91,16 +102,18 @@ public void testDetectExtractedFields_GivenMultipleFields() { containsInAnyOrder("some_float", "some_long")); } - public void testDetectExtractedFields_GivenIgnoredField() { + public void testDetect_GivenIgnoredField() { FieldCapabilitiesResponse fieldCapabilities= new MockFieldCapsResponseBuilder() .addAggregatableField("_id", "float").build(); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities)); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } - public void testDetectExtractedFields_ShouldSortFieldsAlphabetically() { + public void testDetect_ShouldSortFieldsAlphabetically() { int fieldCount = randomIntBetween(10, 20); List fields = new ArrayList<>(); for (int i = 0; i < fieldCount; i++) { @@ -115,8 +128,9 @@ public void testDetectExtractedFields_ShouldSortFieldsAlphabetically() { } FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build(); - ExtractedFields extractedFields = - DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) .collect(Collectors.toList()); @@ -130,8 +144,11 @@ public void testDetectedExtractedFields_GivenIncludeWithMissingField() { .build(); FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, desiredFields, fieldCapabilities)); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index] with name [your_field1]")); } @@ -142,8 +159,10 @@ public void testDetectedExtractedFields_GivenExcludeAllValidFields() { .build(); FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, desiredFields, fieldCapabilities)); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]")); } @@ -156,8 +175,11 @@ public void testDetectedExtractedFields_GivenInclusionsAndExclusions() { .build(); FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); - ExtractedFields extractedFields = - DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, desiredFields, fieldCapabilities); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) .collect(Collectors.toList()); assertThat(extractedFieldNames, equalTo(Arrays.asList("my_field1", "your_field2"))); @@ -171,13 +193,45 @@ public void testDetectedExtractedFields_GivenIndexContainsResultsField() { .addAggregatableField("your_keyword", "keyword") .build(); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> DataFrameDataExtractorFactory.detectExtractedFields(INDEX, RESULTS_FIELD, EMPTY_CONTEXT, fieldCapabilities)); + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), false, fieldCapabilities); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("Index [source_index] already has a field that matches the dest.results_field [ml]; " + "please set a different results_field")); } + public void testDetectedExtractedFields_GivenIndexContainsResultsFieldAndTaskIsRestarting() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField(RESULTS_FIELD + ".outlier_score", "float") + .addAggregatableField("my_field1", "float") + .addAggregatableField("your_field2", "float") + .addAggregatableField("your_keyword", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildAnalyticsConfig(), true, fieldCapabilities); + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(Arrays.asList("my_field1", "your_field2"))); + } + + private static DataFrameAnalyticsConfig buildAnalyticsConfig() { + return buildAnalyticsConfig(null); + } + + private static DataFrameAnalyticsConfig buildAnalyticsConfig(FetchSourceContext analysesFields) { + return new DataFrameAnalyticsConfig.Builder("foo") + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) + .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) + .setAnalysesFields(analysesFields) + .setAnalyses(Collections.singletonList(new DataFrameAnalysisConfig( + Collections.singletonMap("outlier_detection", Collections.emptyMap())))) + .build(); + } + private static class MockFieldCapsResponseBuilder { private final Map> fieldCaps = new HashMap<>();