Skip to content

Commit

Permalink
[FEATURE][ML] Write data frame configuration to process (#35914)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitris-athanasiou authored Nov 27, 2018
1 parent 6e3f832 commit 3f49eef
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, executorService) -> null;
analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null;
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -114,18 +117,23 @@ private boolean isMlNode(DiscoveryNode node) {
private void reindexDataframeAndStartAnalysis(String index, ActionListener<AcknowledgedResponse> listener) {
final String destinationIndex = index + "_copy";

ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
bulkResponse -> {
client.execute(RefreshAction.INSTANCE, new RefreshRequest(destinationIndex), ActionListener.wrap(
refreshResponse -> {
runPipelineAnalytics(destinationIndex, listener);
}, listener::onFailure
));
}, listener::onFailure
);

ActionListener<CreateIndexResponse> copyIndexCreatedListener = ActionListener.wrap(
createIndexResponse -> {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(index);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setScript(new Script("ctx._source." + DataFrameFields.ID + " = ctx._id"));
client.execute(ReindexAction.INSTANCE, reindexRequest, ActionListener.wrap(
bulkResponse -> {
runPipelineAnalytics(destinationIndex, listener);
},
listener::onFailure
));
client.execute(ReindexAction.INSTANCE, reindexRequest, reindexCompletedListener);
}, listener::onFailure
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.analytics;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;

public class DataFrameAnalysis implements ToXContentObject {

private static final ParseField NAME = new ParseField("name");

private final String name;

public DataFrameAnalysis(String name) {
this.name = ExceptionsHelper.requireNonNull(name, NAME.getPreferredName());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME.getPreferredName(), name);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,25 @@ public String[] getFieldNamesArray() {
List<String> fieldNames = getFieldNames();
return fieldNames.toArray(new String[fieldNames.size()]);
}

public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setSize(0)
.setQuery(context.query);

SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits(), context.extractedFields.getAllFields().size());
}

public static class DataSummary {

public final long rows;
public final long cols;

public DataSummary(long rows, long cols) {
this.rows = rows;
this.cols = cols;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@
*/
package org.elasticsearch.xpack.ml.analytics.process;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -19,13 +28,21 @@ public class AnalyticsBuilder {
private static final String ANALYTICS_PATH = "./" + ANALYTICS;

private static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
private static final String CONFIG_ARG = "--config=";

private final Environment env;
private final NativeController nativeController;
private final ProcessPipes processPipes;
private final AnalyticsProcessConfig config;
private final List<Path> filesToDelete;

public AnalyticsBuilder(NativeController nativeController, ProcessPipes processPipes) {
public AnalyticsBuilder(Environment env, NativeController nativeController, ProcessPipes processPipes, AnalyticsProcessConfig config,
List<Path> filesToDelete) {
this.env = Objects.requireNonNull(env);
this.nativeController = Objects.requireNonNull(nativeController);
this.processPipes = Objects.requireNonNull(processPipes);
this.config = Objects.requireNonNull(config);
this.filesToDelete = Objects.requireNonNull(filesToDelete);
}

public void build() throws IOException {
Expand All @@ -34,10 +51,24 @@ public void build() throws IOException {
nativeController.startProcess(command);
}

List<String> buildAnalyticsCommand() {
List<String> buildAnalyticsCommand() throws IOException {
List<String> command = new ArrayList<>();
command.add(ANALYTICS_PATH);
command.add(LENGTH_ENCODED_INPUT_ARG);
addConfigFile(command);
return command;
}

private void addConfigFile(List<String> command) throws IOException {
Path configFile = Files.createTempFile(env.tmpFile(), "analysis", ".conf");
filesToDelete.add(configFile);
try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(configFile),StandardCharsets.UTF_8);
XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {

config.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
osw.write(Strings.toString(jsonBuilder));
}

command.add(CONFIG_ARG + configFile.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.analytics.process;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis;

import java.io.IOException;
import java.util.Objects;

public class AnalyticsProcessConfig implements ToXContentObject {

private static final String ROWS = "rows";
private static final String COLS = "cols";
private static final String MEMORY_LIMIT = "memory_limit";
private static final String THREADS = "threads";
private static final String ANALYSIS = "analysis";

private final long rows;
private final long cols;
private final ByteSizeValue memoryLimit;
private final int threads;
private final DataFrameAnalysis analysis;


public AnalyticsProcessConfig(long rows, long cols, ByteSizeValue memoryLimit, int threads, DataFrameAnalysis analysis) {
this.rows = rows;
this.cols = cols;
this.memoryLimit = Objects.requireNonNull(memoryLimit);
this.threads = threads;
this.analysis = Objects.requireNonNull(analysis);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ROWS, rows);
builder.field(COLS, cols);
builder.field(MEMORY_LIMIT, memoryLimit.getBytes());
builder.field(THREADS, threads);
builder.field(ANALYSIS, analysis);
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public interface AnalyticsProcessFactory {
* Create an implementation of {@link AnalyticsProcess}
*
* @param jobId The job id
* @param analyticsProcessConfig The process configuration
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @return The process
*/
AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService);
AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor;

import java.io.IOException;
Expand Down Expand Up @@ -39,7 +42,7 @@ public AnalyticsProcessManager(Client client, Environment environment, ThreadPoo

public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
threadPool.generic().execute(() -> {
AnalyticsProcess process = createProcess(jobId);
AnalyticsProcess process = createProcess(jobId, dataExtractor);
try {
// Fake header
process.writeRecord(dataExtractor.getFieldNamesArray());
Expand Down Expand Up @@ -69,13 +72,20 @@ public void processData(String jobId, DataFrameDataExtractor dataExtractor) {
});
}

private AnalyticsProcess createProcess(String jobId) {
private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) {
// TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, executorService);
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, createProcessConfig(dataExtractor), executorService);
if (process.isProcessAlive() == false) {
throw ExceptionsHelper.serverError("Failed to start analytics process");
}
return process;
}

private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor) {
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
AnalyticsProcessConfig config = new AnalyticsProcessConfig(dataSummary.rows, dataSummary.cols,
new ByteSizeValue(1, ByteSizeUnit.GB), 1, new DataFrameAnalysis("outliers"));
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

Expand All @@ -37,15 +39,17 @@ public NativeAnalyticsProcessFactory(Environment env, NativeController nativeCon
}

@Override
public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService executorService) {
public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
true, false, true, true, false, false);
true, false, true, true, false, false);

createNativeProcess(jobId, processPipes);
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0,
Collections.emptyList(), () -> {});
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, 0,
filesToDelete, () -> {});


try {
Expand All @@ -61,8 +65,10 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, ExecutorService exe
}
}

private void createNativeProcess(String jobId, ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(nativeController, processPipes);
private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
ProcessPipes processPipes) {
AnalyticsBuilder analyticsBuilder = new AnalyticsBuilder(env, nativeController, processPipes, analyticsProcessConfig,
filesToDelete);
try {
analyticsBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
Expand Down

0 comments on commit 3f49eef

Please sign in to comment.