From fee3e8428ffb9cbad893505607106713926a8a7c Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 25 Jun 2018 18:25:34 +0200 Subject: [PATCH] Watcher: Fix put watch action (#31524) If no version is specified when putting a watch, the index API should be used instead of the update API, so that the whole watch gets overwritten instead of being merged with the existing one. Merging only happens when a version is specified, so that credentials can be omitted, which is important for the watcher UI. --- ...watch_gets_overwritten_without_version.yml | 73 +++++++++++++++++++ .../actions/put/TransportPutWatchAction.java | 40 +++++++--- 2 files changed, 104 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/90_ensure_watch_gets_overwritten_without_version.yml diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/90_ensure_watch_gets_overwritten_without_version.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/90_ensure_watch_gets_overwritten_without_version.yml new file mode 100644 index 0000000000000..4bea2f655e624 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/90_ensure_watch_gets_overwritten_without_version.yml @@ -0,0 +1,73 @@ +--- +"Test put watch api without version overwrites watch": + - do: + cluster.health: + wait_for_status: yellow + + - do: + xpack.watcher.put_watch: + id: "my_watch" + body: > + { + "trigger": { + "schedule": { + "hourly": { + "minute": [ 0, 5 ] + } + } + }, + "input": { + "simple": { + "foo": "bar" + } + }, + "actions": { + "logging": { + "logging": { + "text": "yaml test" + } + } + } + } + - match: { _id: "my_watch" } + + - do: + xpack.watcher.get_watch: + id: "my_watch" + - match: { watch.input.simple.foo: "bar" } + + # change the simple input fields, then ensure the old + # field does not exist on get + - do: + xpack.watcher.put_watch: + id: "my_watch" + body: > + { + "trigger": { + "schedule": { + "hourly": { + "minute": [ 0, 5 ] + } + } + }, + "input": { + "simple": { + "spam": "eggs" + } + }, + "actions": { + "logging": { + "logging": { + "text": "yaml test" + } + } + } + } + - match: { _id: "my_watch" } + + - do: + xpack.watcher.get_watch: + id: "my_watch" + - match: { watch.input.simple.spam: "eggs" } + - is_false: watch.input.simple.foo + diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index d836507596b74..846dda66f6938 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -7,6 +7,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -97,24 +99,44 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, try (XContentBuilder builder = jsonBuilder()) { watch.toXContent(builder, DEFAULT_PARAMS); - UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); - updateRequest.docAsUpsert(isUpdate == false); - updateRequest.version(request.getVersion()); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.doc(builder); + if (isUpdate) { + UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); + updateRequest.version(request.getVersion()); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + updateRequest.doc(builder); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, - ActionListener.wrap(response -> { + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, + ActionListener.wrap(response -> { + boolean created = response.getResult() == DocWriteResponse.Result.CREATED; + if (shouldBeTriggeredLocally(request, watch)) { + triggerService.add(watch); + } + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); + }, listener::onFailure), + client::update); + } else { + IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); + indexRequest.source(builder); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest, + ActionListener.wrap(response -> { boolean created = response.getResult() == DocWriteResponse.Result.CREATED; - if (localExecute(request) == false && watch.status().state().isActive()) { + // if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node + if (shouldBeTriggeredLocally(request, watch)) { triggerService.add(watch); } listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); }, listener::onFailure), - client::update); + client::index); + } } } catch (Exception e) { listener.onFailure(e); } } + + private boolean shouldBeTriggeredLocally(PutWatchRequest request, Watch watch) { + return localExecute(request) == false && + watch.status().state().isActive(); + } }