Skip to content

Commit

Permalink
Move ML Optimistic Concurrency Control to Seq No (#38278)
Browse files Browse the repository at this point in the history
This commit moves the usage of internal versioning for CAS operations to use sequence numbers and primary terms

Relates to #36148
Relates to #10708
  • Loading branch information
bleskes authored Feb 4, 2019
1 parent 1d82a6d commit ff13a43
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void onTimeout(TimeValue timeout) {
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat

CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
jobConfigProvider::validateDatafeedJob,
jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -52,14 +54,16 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi

private final Client client;
private final JobManager jobManager;
private final ClusterService clusterService;

@Inject
public TransportUpdateFilterAction(TransportService transportService, ActionFilters actionFilters, Client client,
JobManager jobManager) {
JobManager jobManager, ClusterService clusterService) {
super(UpdateFilterAction.NAME, transportService, actionFilters,
(Supplier<UpdateFilterAction.Request>) UpdateFilterAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -95,13 +99,20 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio
}

MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build();
indexUpdatedFilter(updatedFilter, filterWithVersion.version, request, listener);
indexUpdatedFilter(
updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
}

private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterAction.Request request,
private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm,
UpdateFilterAction.Request request,
ActionListener<PutFilterAction.Response> listener) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
indexRequest.version(version);
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.version(version);
}
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
Expand Down Expand Up @@ -146,7 +157,7 @@ public void onResponse(GetResponse getDocResponse) {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build();
listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion()));
listener.onResponse(new FilterWithVersion(filter, getDocResponse));
}
} else {
this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId)));
Expand All @@ -167,10 +178,15 @@ private static class FilterWithVersion {

private final MlFilter filter;
private final long version;
private final long seqNo;
private final long primaryTerm;

private FilterWithVersion(MlFilter filter, long version) {
private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) {
this.filter = filter;
this.version = version;
this.version = getDocResponse.getVersion();
this.seqNo = getDocResponse.getSeqNo();
this.primaryTerm = getDocResponse.getPrimaryTerm();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -262,10 +264,12 @@ public void onFailure(Exception e) {
* @param headers Datafeed headers applied with the update
* @param validator BiConsumer that accepts the updated config and can perform
* extra validations. {@code validator} must call the passed listener
* @param minClusterNodeVersion minimum version of nodes in cluster
* @param updatedConfigListener Updated datafeed config listener
*/
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
Version minClusterNodeVersion,
ActionListener<DatafeedConfig> updatedConfigListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
Expand All @@ -277,7 +281,9 @@ public void onResponse(GetResponse getResponse) {
updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
return;
}
long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
DatafeedConfig.Builder configBuilder;
try {
Expand All @@ -298,7 +304,7 @@ public void onResponse(GetResponse getResponse) {

ActionListener<Boolean> validatedListener = ActionListener.wrap(
ok -> {
indexUpdatedConfig(updatedConfig, version, ActionListener.wrap(
indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedConfigListener.onResponse(updatedConfig);
Expand All @@ -318,17 +324,23 @@ public void onFailure(Exception e) {
});
}

private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener<IndexResponse> listener) {
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm,
Version minClusterNodeVersion, ActionListener<IndexResponse> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener);

} catch (IOException e) {
listener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobActi

Runnable doUpdate = () -> {
jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
this::validate, ActionListener.wrap(
this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
updatedJob -> postJobUpdate(request, updatedJob, actionListener),
actionListener::onFailure
));
Expand Down Expand Up @@ -603,8 +603,8 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList
.setModelSnapshotId(modelSnapshot.getSnapshotId())
.build();

jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(
job -> {
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(job -> {
auditor.info(request.getJobId(),
Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
updateHandler.accept(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -225,9 +227,12 @@ public void onFailure(Exception e) {
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param minClusterNodeVersion the minimum version of nodes in the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));

Expand All @@ -239,7 +244,9 @@ public void onResponse(GetResponse getResponse) {
return;
}

long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job.Builder jobBuilder;
try {
Expand All @@ -259,7 +266,7 @@ public void onResponse(GetResponse getResponse) {
return;
}

indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
}

@Override
Expand All @@ -280,17 +287,18 @@ public interface UpdateValidator {
}

/**
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but
* with an extra validation step which is called before the updated is applied.
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param validator The job update validator
* @param minClusterNodeVersion the minimum version of a node ifn the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
UpdateValidator validator, ActionListener<Job> updatedJobListener) {
UpdateValidator validator, Version minClusterNodeVersion, ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));

Expand All @@ -302,7 +310,9 @@ public void onResponse(GetResponse getResponse) {
return;
}

long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job originalJob;
try {
Expand All @@ -324,7 +334,7 @@ public void onResponse(GetResponse getResponse) {
return;
}

indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
},
updatedJobListener::onFailure
));
Expand All @@ -337,17 +347,22 @@ public void onFailure(Exception e) {
});
}

private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> updatedJobListener) {
private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedJobListener.onResponse(updatedJob);
Expand Down
Loading

0 comments on commit ff13a43

Please sign in to comment.