Skip to content

Commit

Permalink
Ensure analytics can be resumed
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitris-athanasiou committed Apr 11, 2019
1 parent 70e593f commit e731d06
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
// The task has fully reindexed the documents and we should continue on with our analyses
case ANALYZING:
// TODO apply previously stored model state if applicable
startAnalytics(task, config);
startAnalytics(task, config, true);
break;
// If we are already at REINDEXING, we are not 100% sure if we reindexed ALL the docs.
// We will delete the destination index, recreate, reindex
Expand Down Expand Up @@ -139,7 +139,7 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
// Reindexing is complete; start analytics
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
refreshResponse -> startAnalytics(task, config),
refreshResponse -> startAnalytics(task, config, false),
task::markAsFailed
);

Expand Down Expand Up @@ -177,7 +177,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
createDestinationIndex(config.getSource().getIndex(), config.getDest().getIndex(), config.getHeaders(), copyIndexCreatedListener);
}

private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, boolean isTaskRestarting) {
// Update state to ANALYZING and start process
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
dataExtractorFactory -> {
Expand All @@ -201,7 +201,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
// TODO This could fail with errors. In that case we get stuck with the copied index.
// We could delete the index in case of failure or we could try building the factory before reindexing
// to catch the error early on.
DataFrameDataExtractorFactory.create(client, config, dataExtractorFactoryListener);
DataFrameDataExtractorFactory.create(client, config, isTaskRestarting, dataExtractorFactoryListener);
}

private void createDestinationIndex(String sourceIndex, String destinationIndex, Map<String, String> headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,22 @@

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataFrameDataExtractorFactory {

/**
* Fields to ignore. These are mostly internal meta fields.
*/
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
"_source", "_type", "_uid", "_version", "_feature", "_ignored");

/**
* The types supported by data frames
*/
private static final Set<String> COMPATIBLE_FIELD_TYPES;

static {
Set<String> compatibleTypes = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toSet());
compatibleTypes.add("scaled_float"); // have to add manually since scaled_float is in a module

COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes);
}

private final Client client;
private final String analyticsId;
private final String index;
Expand Down Expand Up @@ -95,14 +58,15 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) {
*
* @param client ES Client used to make calls against the cluster
* @param config The config from which to create the extractor factory
* @param isTaskRestarting Whether the task is restarting
* @param listener The listener to notify on creation or failure
*/
public static void create(Client client,
DataFrameAnalyticsConfig config,
boolean isTaskRestarting,
ActionListener<DataFrameDataExtractorFactory> listener) {
validateIndexAndExtractFields(client, config.getHeaders(), config.getDest().getIndex(), config.getDest().getResultsField(),
config.getAnalysesFields(), ActionListener.wrap(
extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory(
validateIndexAndExtractFields(client, config.getDest().getIndex(), config, isTaskRestarting,
ActionListener.wrap(extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory(
client, config.getId(), config.getDest().getIndex(), extractedFields, config.getHeaders())),
listener::onFailure
));
Expand All @@ -118,8 +82,7 @@ public static void create(Client client,
public static void validateConfigAndSourceIndex(Client client,
DataFrameAnalyticsConfig config,
ActionListener<DataFrameAnalyticsConfig> listener) {
validateIndexAndExtractFields(client, config.getHeaders(), config.getSource().getIndex(), config.getDest().getResultsField(),
config.getAnalysesFields(), ActionListener.wrap(
validateIndexAndExtractFields(client, config.getSource().getIndex(), config, false, ActionListener.wrap(
fields -> {
config.getSource().getParsedQuery(); // validate query is acceptable
listener.onResponse(config);
Expand All @@ -128,86 +91,15 @@ public static void validateConfigAndSourceIndex(Client client,
));
}

// Visible for testing
static ExtractedFields detectExtractedFields(String index,
String resultsField,
FetchSourceContext desiredFields,
FieldCapabilitiesResponse fieldCapabilitiesResponse) {
Set<String> fields = fieldCapabilitiesResponse.get().keySet();
fields.removeAll(IGNORE_FIELDS);

if (fields.contains(resultsField)) {
throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" +
" please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), resultsField,
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName());
}

removeFieldsWithIncompatibleTypes(fields, fieldCapabilitiesResponse);
includeAndExcludeFields(fields, desiredFields, index);
List<String> sortedFields = new ArrayList<>(fields);
// We sort the fields to ensure the checksum for each document is deterministic
Collections.sort(sortedFields);
ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse)
.filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
if (extractedFields.getAllFields().isEmpty()) {
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index);
}
return extractedFields;
}

private static void removeFieldsWithIncompatibleTypes(Set<String> fields, FieldCapabilitiesResponse fieldCapabilitiesResponse) {
Iterator<String> fieldsIterator = fields.iterator();
while (fieldsIterator.hasNext()) {
String field = fieldsIterator.next();
Map<String, FieldCapabilities> fieldCaps = fieldCapabilitiesResponse.getField(field);
if (fieldCaps == null || COMPATIBLE_FIELD_TYPES.containsAll(fieldCaps.keySet()) == false) {
fieldsIterator.remove();
}
}
}

private static void includeAndExcludeFields(Set<String> fields, FetchSourceContext desiredFields, String index) {
if (desiredFields == null) {
return;
}
String includes = desiredFields.includes().length == 0 ? "*" : Strings.arrayToCommaDelimitedString(desiredFields.includes());
String excludes = Strings.arrayToCommaDelimitedString(desiredFields.excludes());

if (Regex.isMatchAllPattern(includes) && excludes.isEmpty()) {
return;
}
try {
// If the inclusion set does not match anything, that means the user's desired fields cannot be found in
// the collection of supported field types. We should let the user know.
Set<String> includedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
.expand(includes, false);
// If the exclusion set does not match anything, that means the fields are already not present
// no need to raise if nothing matched
Set<String> excludedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
.expand(excludes, true);

fields.retainAll(includedSet);
fields.removeAll(excludedSet);
} catch (ResourceNotFoundException ex) {
// Re-wrap our exception so that we throw the same exception type when there are no fields.
throw ExceptionsHelper.badRequestException(ex.getMessage());
}

}

private static void validateIndexAndExtractFields(Client client,
Map<String, String> headers,
String index,
String resultsField,
FetchSourceContext desiredFields,
DataFrameAnalyticsConfig config,
boolean isTaskRestarting,
ActionListener<ExtractedFields> listener) {
// Step 2. Extract fields (if possible) and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> listener.onResponse(
detectExtractedFields(index, resultsField, desiredFields, fieldCapabilitiesResponse)),
new ExtractedFieldsDetector(index, config, isTaskRestarting, fieldCapabilitiesResponse).detect()),
e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
Expand All @@ -222,10 +114,11 @@ private static void validateIndexAndExtractFields(Client client,
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(index);
fieldCapabilitiesRequest.fields("*");
ClientHelper.executeWithHeaders(headers, ClientHelper.ML_ORIGIN, client, () -> {
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
// This response gets discarded - the listener handles the real response
return null;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe.extractor;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NameResolver;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ExtractedFieldsDetector {

/**
* Fields to ignore. These are mostly internal meta fields.
*/
private static final List<String> IGNORE_FIELDS = Arrays.asList("_id", "_field_names", "_index", "_parent", "_routing", "_seq_no",
"_source", "_type", "_uid", "_version", "_feature", "_ignored");

/**
* The types supported by data frames
*/
private static final Set<String> COMPATIBLE_FIELD_TYPES;

static {
Set<String> compatibleTypes = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toSet());
compatibleTypes.add("scaled_float"); // have to add manually since scaled_float is in a module

COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes);
}

private final String index;
private final DataFrameAnalyticsConfig config;
private final boolean isTaskRestarting;
private final FieldCapabilitiesResponse fieldCapabilitiesResponse;

ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
FieldCapabilitiesResponse fieldCapabilitiesResponse) {
this.index = Objects.requireNonNull(index);
this.config = Objects.requireNonNull(config);
this.isTaskRestarting = isTaskRestarting;
this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse);
}

public ExtractedFields detect() {
Set<String> fields = fieldCapabilitiesResponse.get().keySet();
fields.removeAll(IGNORE_FIELDS);

checkResultsFieldIsNotPresent(fields, index);

// Ignore fields under the results object
fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + "."));

removeFieldsWithIncompatibleTypes(fields);
includeAndExcludeFields(fields, index);
List<String> sortedFields = new ArrayList<>(fields);
// We sort the fields to ensure the checksum for each document is deterministic
Collections.sort(sortedFields);
ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse)
.filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
if (extractedFields.getAllFields().isEmpty()) {
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index);
}
return extractedFields;
}

private void checkResultsFieldIsNotPresent(Set<String> fields, String index) {
// If the task is restarting we do not mind the index containing the results field, we will overwrite all docs
if (isTaskRestarting == false && fields.contains(config.getDest().getResultsField())) {
throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" +
" please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), config.getDest().getResultsField(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName());
}
}

private void removeFieldsWithIncompatibleTypes(Set<String> fields) {
Iterator<String> fieldsIterator = fields.iterator();
while (fieldsIterator.hasNext()) {
String field = fieldsIterator.next();
Map<String, FieldCapabilities> fieldCaps = fieldCapabilitiesResponse.getField(field);
if (fieldCaps == null || COMPATIBLE_FIELD_TYPES.containsAll(fieldCaps.keySet()) == false) {
fieldsIterator.remove();
}
}
}

private void includeAndExcludeFields(Set<String> fields, String index) {
FetchSourceContext analysesFields = config.getAnalysesFields();
if (analysesFields == null) {
return;
}
String includes = analysesFields.includes().length == 0 ? "*" : Strings.arrayToCommaDelimitedString(analysesFields.includes());
String excludes = Strings.arrayToCommaDelimitedString(analysesFields.excludes());

if (Regex.isMatchAllPattern(includes) && excludes.isEmpty()) {
return;
}
try {
// If the inclusion set does not match anything, that means the user's desired fields cannot be found in
// the collection of supported field types. We should let the user know.
Set<String> includedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
.expand(includes, false);
// If the exclusion set does not match anything, that means the fields are already not present
// no need to raise if nothing matched
Set<String> excludedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
.expand(excludes, true);

fields.retainAll(includedSet);
fields.removeAll(excludedSet);
} catch (ResourceNotFoundException ex) {
// Re-wrap our exception so that we throw the same exception type when there are no fields.
throw ExceptionsHelper.badRequestException(ex.getMessage());
}
}

}
Loading

0 comments on commit e731d06

Please sign in to comment.