Skip to content

Commit

Permalink
[FEATURE][ML] Reindex data frame before starting analytics (#35835)
Browse files Browse the repository at this point in the history
With this commit before we start the analytics we first reindex
the source data frame into a new index. Note we should maintain
the settings and the mappings of the source index.
  • Loading branch information
dimitris-athanasiou authored Nov 26, 2018
1 parent 7b47107 commit 6e3f832
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -19,9 +37,13 @@
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.analytics.DataFrameFields;
import org.elasticsearch.xpack.ml.analytics.process.AnalyticsProcessManager;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

Expand All @@ -34,6 +56,17 @@ public class TransportRunAnalyticsAction extends HandledTransportAction<RunAnaly
private final Environment environment;
private final AnalyticsProcessManager analyticsProcessManager;

/**
* Unfortunately, getting the settings of an index include internal settings that should
* not be set explicitly. There is no way to filter those out. Thus, we have to maintain
* a list of them and filter them out manually.
*/
private static final List<String> INTERNAL_SETTINGS = Arrays.asList(
"index.creation_date",
"index.provided_name",
"index.uuid",
"index.version.created"
);

@Inject
public TransportRunAnalyticsAction(ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
Expand All @@ -53,7 +86,7 @@ public TransportRunAnalyticsAction(ThreadPool threadPool, TransportService trans
protected void doExecute(Task task, RunAnalyticsAction.Request request, ActionListener<AcknowledgedResponse> listener) {
DiscoveryNode localNode = clusterService.localNode();
if (isMlNode(localNode)) {
runPipelineAnalytics(request, listener);
reindexDataframeAndStartAnalysis(request.getIndex(), listener);
return;
}

Expand All @@ -78,8 +111,62 @@ private boolean isMlNode(DiscoveryNode node) {
return Boolean.valueOf(enabled);
}

private void runPipelineAnalytics(RunAnalyticsAction.Request request, ActionListener<AcknowledgedResponse> listener) {
String jobId = "ml-analytics-" + request.getIndex();
private void reindexDataframeAndStartAnalysis(String index, ActionListener<AcknowledgedResponse> listener) {
final String destinationIndex = index + "_copy";

ActionListener<CreateIndexResponse> copyIndexCreatedListener = ActionListener.wrap(
createIndexResponse -> {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(index);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setScript(new Script("ctx._source." + DataFrameFields.ID + " = ctx._id"));
client.execute(ReindexAction.INSTANCE, reindexRequest, ActionListener.wrap(
bulkResponse -> {
runPipelineAnalytics(destinationIndex, listener);
},
listener::onFailure
));
}, listener::onFailure
);

createDestinationIndex(index, destinationIndex, copyIndexCreatedListener);
}

private void createDestinationIndex(String sourceIndex, String destinationIndex, ActionListener<CreateIndexResponse> listener) {
IndexMetaData indexMetaData = clusterService.state().getMetaData().getIndices().get(sourceIndex);
if (indexMetaData == null) {
listener.onFailure(new IndexNotFoundException(sourceIndex));
return;
}

if (indexMetaData.getMappings().size() != 1) {
listener.onFailure(ExceptionsHelper.badRequestException("Does not support indices with multiple types"));
return;
}

Settings.Builder settingsBuilder = Settings.builder().put(indexMetaData.getSettings());
INTERNAL_SETTINGS.stream().forEach(settingsBuilder::remove);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataFrameFields.ID);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), SortOrder.ASC);

CreateIndexRequest createIndexRequest = new CreateIndexRequest(destinationIndex, settingsBuilder.build());
addDestinationIndexMappings(indexMetaData, createIndexRequest);
client.execute(CreateIndexAction.INSTANCE, createIndexRequest, listener);
}

private static void addDestinationIndexMappings(IndexMetaData indexMetaData, CreateIndexRequest createIndexRequest) {
ImmutableOpenMap<String, MappingMetaData> mappings = indexMetaData.getMappings();
Map<String, Object> mappingsAsMap = mappings.valuesIt().next().sourceAsMap();
Map<String, Object> properties = (Map<String, Object>) mappingsAsMap.get("properties");
Map<String, Object> idCopyMapping = new HashMap<>();
idCopyMapping.put("type", "keyword");
properties.put(DataFrameFields.ID, idCopyMapping);

createIndexRequest.mapping(mappings.keysIt().next(), mappingsAsMap);
}

private void runPipelineAnalytics(String index, ActionListener<AcknowledgedResponse> listener) {
String jobId = "ml-analytics-" + index;

ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
dataExtractorFactory -> {
Expand All @@ -90,7 +177,6 @@ private void runPipelineAnalytics(RunAnalyticsAction.Request request, ActionList
listener::onFailure
);

DataFrameDataExtractorFactory.create(client, Collections.emptyMap(), request.getIndex(), dataExtractorFactoryListener);
DataFrameDataExtractorFactory.create(client, Collections.emptyMap(), index, dataExtractorFactoryListener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequest
private SearchRequestBuilder buildSearchRequest() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setScroll(SCROLL_TIMEOUT)
.addSort("_doc", SortOrder.ASC)
.addSort(DataFrameFields.ID, SortOrder.ASC)
.setIndices(context.indices)
.setSize(context.scrollSize)
.setQuery(context.query)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.analytics;

public final class DataFrameFields {

public static final String ID = "_id_copy";

private DataFrameFields() {}
}

0 comments on commit 6e3f832

Please sign in to comment.