From 3f49eef360605b19710279fbd6a58b743842dd54 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 27 Nov 2018 15:15:42 +0000 Subject: [PATCH] [FEATURE][ML] Write data frame configuration to process (#35914) --- .../xpack/ml/MachineLearning.java | 2 +- .../action/TransportRunAnalyticsAction.java | 20 +++++--- .../xpack/ml/analytics/DataFrameAnalysis.java | 32 ++++++++++++ .../ml/analytics/DataFrameDataExtractor.java | 21 ++++++++ .../analytics/process/AnalyticsBuilder.java | 35 ++++++++++++- .../process/AnalyticsProcessConfig.java | 50 +++++++++++++++++++ .../process/AnalyticsProcessFactory.java | 3 +- .../process/AnalyticsProcessManager.java | 16 ++++-- .../NativeAnalyticsProcessFactory.java | 22 +++++--- 9 files changed, 180 insertions(+), 21 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameAnalysis.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessConfig.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index d2e2a94fbec21..841fff2449f3b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -406,7 +406,7 @@ public Collection createComponents(Client client, ClusterService cluster new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); - analyticsProcessFactory = (jobId, executorService) -> null; + analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null; } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java index 0c2bea17c7849..2a6199486a2ac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java @@ -10,6 +10,8 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -25,6 +27,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSortConfig; +import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.Script; @@ -114,18 +117,23 @@ private boolean isMlNode(DiscoveryNode node) { private void reindexDataframeAndStartAnalysis(String index, ActionListener listener) { final String destinationIndex = index + "_copy"; + ActionListener reindexCompletedListener = ActionListener.wrap( + bulkResponse -> { + client.execute(RefreshAction.INSTANCE, new RefreshRequest(destinationIndex), ActionListener.wrap( + refreshResponse -> { + runPipelineAnalytics(destinationIndex, listener); + }, listener::onFailure + )); + }, listener::onFailure + ); + ActionListener copyIndexCreatedListener = ActionListener.wrap( createIndexResponse -> { ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(index); reindexRequest.setDestIndex(destinationIndex); reindexRequest.setScript(new Script("ctx._source." + DataFrameFields.ID + " = ctx._id")); - client.execute(ReindexAction.INSTANCE, reindexRequest, ActionListener.wrap( - bulkResponse -> { - runPipelineAnalytics(destinationIndex, listener); - }, - listener::onFailure - )); + client.execute(ReindexAction.INSTANCE, reindexRequest, reindexCompletedListener); }, listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameAnalysis.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameAnalysis.java new file mode 100644 index 0000000000000..1b06e77e31b5a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameAnalysis.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; + +public class DataFrameAnalysis implements ToXContentObject { + + private static final ParseField NAME = new ParseField("name"); + + private final String name; + + public DataFrameAnalysis(String name) { + this.name = ExceptionsHelper.requireNonNull(name, NAME.getPreferredName()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME.getPreferredName(), name); + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java index f9d50f1c3343e..c35ee278b4e8e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java @@ -189,4 +189,25 @@ public String[] getFieldNamesArray() { List fieldNames = getFieldNames(); return fieldNames.toArray(new String[fieldNames.size()]); } + + public DataSummary collectDataSummary() { + SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices(context.indices) + .setSize(0) + .setQuery(context.query); + + SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); + return new DataSummary(searchResponse.getHits().getTotalHits(), context.extractedFields.getAllFields().size()); + } + + public static class DataSummary { + + public final long rows; + public final long cols; + + public DataSummary(long rows, long cols) { + this.rows = rows; + this.cols = cols; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsBuilder.java index 4f9627a402b64..e2b81ff547cc6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsBuilder.java @@ -5,10 +5,19 @@ */ package org.elasticsearch.xpack.ml.analytics.process; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -19,13 +28,21 @@ public class AnalyticsBuilder { private static final String ANALYTICS_PATH = "./" + ANALYTICS; private static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput"; + private static final String CONFIG_ARG = "--config="; + private final Environment env; private final NativeController nativeController; private final ProcessPipes processPipes; + private final AnalyticsProcessConfig config; + private final List filesToDelete; - public AnalyticsBuilder(NativeController nativeController, ProcessPipes processPipes) { + public AnalyticsBuilder(Environment env, NativeController nativeController, ProcessPipes processPipes, AnalyticsProcessConfig config, + List filesToDelete) { + this.env = Objects.requireNonNull(env); this.nativeController = Objects.requireNonNull(nativeController); this.processPipes = Objects.requireNonNull(processPipes); + this.config = Objects.requireNonNull(config); + this.filesToDelete = Objects.requireNonNull(filesToDelete); } public void build() throws IOException { @@ -34,10 +51,24 @@ public void build() throws IOException { nativeController.startProcess(command); } - List buildAnalyticsCommand() { + List buildAnalyticsCommand() throws IOException { List command = new ArrayList<>(); command.add(ANALYTICS_PATH); command.add(LENGTH_ENCODED_INPUT_ARG); + addConfigFile(command); return command; } + + private void addConfigFile(List command) throws IOException { + Path configFile = Files.createTempFile(env.tmpFile(), "analysis", ".conf"); + filesToDelete.add(configFile); + try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(configFile),StandardCharsets.UTF_8); + XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { + + config.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + osw.write(Strings.toString(jsonBuilder)); + } + + command.add(CONFIG_ARG + configFile.toString()); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessConfig.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessConfig.java new file mode 100644 index 0000000000000..62e97189d0dbf --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics.process; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis; + +import java.io.IOException; +import java.util.Objects; + +public class AnalyticsProcessConfig implements ToXContentObject { + + private static final String ROWS = "rows"; + private static final String COLS = "cols"; + private static final String MEMORY_LIMIT = "memory_limit"; + private static final String THREADS = "threads"; + private static final String ANALYSIS = "analysis"; + + private final long rows; + private final long cols; + private final ByteSizeValue memoryLimit; + private final int threads; + private final DataFrameAnalysis analysis; + + + public AnalyticsProcessConfig(long rows, long cols, ByteSizeValue memoryLimit, int threads, DataFrameAnalysis analysis) { + this.rows = rows; + this.cols = cols; + this.memoryLimit = Objects.requireNonNull(memoryLimit); + this.threads = threads; + this.analysis = Objects.requireNonNull(analysis); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ROWS, rows); + builder.field(COLS, cols); + builder.field(MEMORY_LIMIT, memoryLimit.getBytes()); + builder.field(THREADS, threads); + builder.field(ANALYSIS, analysis); + builder.endObject(); + return builder; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessFactory.java index 330738cb69f7e..d0eb7a414074f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessFactory.java @@ -13,8 +13,9 @@ public interface AnalyticsProcessFactory { * Create an implementation of {@link AnalyticsProcess} * * @param jobId The job id + * @param analyticsProcessConfig The process configuration * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process * @return The process */ - AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService); + AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java index 4a24408dfc144..e0b939ad420cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java @@ -8,10 +8,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis; import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; import java.io.IOException; @@ -39,7 +42,7 @@ public AnalyticsProcessManager(Client client, Environment environment, ThreadPoo public void processData(String jobId, DataFrameDataExtractor dataExtractor) { threadPool.generic().execute(() -> { - AnalyticsProcess process = createProcess(jobId); + AnalyticsProcess process = createProcess(jobId, dataExtractor); try { // Fake header process.writeRecord(dataExtractor.getFieldNamesArray()); @@ -69,13 +72,20 @@ public void processData(String jobId, DataFrameDataExtractor dataExtractor) { }); } - private AnalyticsProcess createProcess(String jobId) { + private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) { // TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, executorService); + AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, createProcessConfig(dataExtractor), executorService); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start analytics process"); } return process; } + + private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor) { + DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary(); + AnalyticsProcessConfig config = new AnalyticsProcessConfig(dataSummary.rows, dataSummary.cols, + new ByteSizeValue(1, ByteSizeUnit.GB), 1, new DataFrameAnalysis("outliers")); + return config; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java index 125e8d12d90ec..7d8d1557ad967 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java @@ -16,8 +16,10 @@ import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.IOException; +import java.nio.file.Path; import java.time.Duration; -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -37,15 +39,17 @@ public NativeAnalyticsProcessFactory(Environment env, NativeController nativeCon } @Override - public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService) { + public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, + ExecutorService executorService) { + List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, - true, false, true, true, false, false); + true, false, true, true, false, false); - createNativeProcess(jobId, processPipes); + createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0, - Collections.emptyList(), () -> {}); + processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0, + filesToDelete, () -> {}); try { @@ -61,8 +65,10 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService exe } } - private void createNativeProcess(String jobId, ProcessPipes processPipes) { - AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(nativeController, processPipes); + private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List filesToDelete, + ProcessPipes processPipes) { + AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env, nativeController, processPipes, analyticsProcessConfig, + filesToDelete); try { analyticsBuilder.build(); processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);