Skip to content

Commit

Permalink
[ML] write warning if configured memory limit is too low for analytic…
Browse files Browse the repository at this point in the history
…s job (#61505)

Having `_start` fail when the configured memory limit is too low can be frustrating. 

We should instead warn the user that their job might not run properly if their configured limit is too low. 

It might be that our estimate is too high, and their configured limit works just fine.
  • Loading branch information
benwtrent authored Aug 25, 2020
1 parent 35b3514 commit 44d667b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
"Updated analytics task state to [{0}] with reason [{1}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED =
"Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. " +
"The analytics job may fail due to configured memory constraints.";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -777,6 +779,22 @@ public void testUpdateAnalytics() throws Exception {
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
}

public void testTooLowConfiguredMemoryStillStarts() throws Exception {
initialize("low_memory_analysis");
indexData(sourceIndex, 10_000, 0, NESTED_FIELD);

DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder(
buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(NESTED_FIELD)))
.setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.KB))
.build();
putAnalytics(config);
// Shouldn't throw
startAnalytics(jobId);
// It could be marked as failed...
forceStopAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
}

private static <T> T getOnlyElement(List<T> list) {
assertThat(list, hasSize(1));
return list.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -20,7 +19,6 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
Expand All @@ -46,7 +44,6 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase {

Expand Down Expand Up @@ -520,17 +517,11 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception

putAnalytics(config);
assertIsStopped(id);

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id));
assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(
exception.getMessage(),
startsWith("Cannot start because the configured model memory limit [" + modelMemoryLimit +
"] is lower than the expected memory usage"));

assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be");
//should not throw
startAnalytics(id);
// Might have been marked as failed
forceStopAnalytics(id);
waitUntilAnalyticsIsStopped(id);
}

public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -216,14 +217,16 @@ private void estimateMemoryUsageAndUpdateMemoryTracker(StartContext startContext
auditor.info(jobId,
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE, expectedMemoryWithoutDisk));
// Validate that model memory limit is sufficient to run the analysis
// We will only warn the caller if the configured limit is too low.
if (startContext.config.getModelMemoryLimit()
.compareTo(expectedMemoryWithoutDisk) < 0) {
ElasticsearchStatusException e =
ExceptionsHelper.badRequestException(
"Cannot start because the configured model memory limit [{}] is lower than the expected memory usage [{}]",
startContext.config.getModelMemoryLimit(), expectedMemoryWithoutDisk);
listener.onFailure(e);
return;
String warning = Messages.getMessage(
Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED,
startContext.config.getModelMemoryLimit(),
expectedMemoryWithoutDisk);
auditor.warning(jobId, warning);
logger.warn("[{}] {}", jobId, warning);
HeaderWarning.addWarning(warning);
}
// Refresh memory requirement for jobs
memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(
Expand Down

0 comments on commit 44d667b

Please sign in to comment.