Skip to content

Commit

Permalink
[ML] only persist progress if it has changed (#62123) (#62180)
Browse files Browse the repository at this point in the history
* [ML] only persist progress if it has changed

We already search for the previously stored progress document.

For optimization purposes, and to prevent restoring the same
progress after a failed analytics job is stopped,
this commit does an equality check between the previously stored progress and current progress
If the progress has changed, persistence continues as normal.
  • Loading branch information
benwtrent authored Sep 9, 2020
1 parent f1522fc commit e181e24
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,6 @@ public void testUpdateAnalytics() throws Exception {
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61913")
public void testTooLowConfiguredMemoryStillStarts() throws Exception {
initialize("low_memory_analysis");
indexData(sourceIndex, 10_000, 0, NESTED_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ private void cleanUpAnalytics() {
for (DataFrameAnalyticsConfig config : analytics) {
try {
assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L));
} catch (Exception e) {
// just log and ignore
logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -309,17 +310,31 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
searchResponse -> {
String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias();
StoredProgress previous = null;
if (searchResponse.getHits().getHits().length > 0) {
indexOrAlias = searchResponse.getHits().getHits()[0].getIndex();
try {
previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER);
} catch (Exception ex) {
LOGGER.warn(new ParameterizedMessage("[{}] failed to parse previously stored progress", jobId), ex);
}
}

List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
final StoredProgress progressToStore = new StoredProgress(progress);
if (progressToStore.equals(previous)) {
LOGGER.debug("[{}] new progress is the same as previously persisted progress. Skipping storage.", jobId);
runnable.run();
return;
}

IndexRequest indexRequest = new IndexRequest(indexOrAlias)
.id(progressDocId)
.setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress);
new StoredProgress(progress).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
progressToStore.toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
indexRequest.source(jsonBuilder);
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);
Expand Down

0 comments on commit e181e24

Please sign in to comment.