diff --git a/src/Nest/XPack/Watcher/Action/ActionBase.cs b/src/Nest/XPack/Watcher/Action/ActionBase.cs
index 54d79eee1b4..22fe451ebd9 100644
--- a/src/Nest/XPack/Watcher/Action/ActionBase.cs
+++ b/src/Nest/XPack/Watcher/Action/ActionBase.cs
@@ -7,22 +7,57 @@
namespace Nest
{
+ ///
+ /// A Watcher action
+ ///
[InterfaceDataContract]
public interface IAction
{
+ ///
+ /// The type of action
+ ///
[IgnoreDataMember]
ActionType ActionType { get; }
+ ///
+ /// The name of the action
+ ///
[IgnoreDataMember]
string Name { get; set; }
+ ///
+ /// Limit how often an action is executed, after it has been executed.
+ /// When a throttling period is set, repeated executions of the action are prevented if it has already
+ /// executed within the throttling period time frame (now - throttling period).
+ ///
[IgnoreDataMember]
Time ThrottlePeriod { get; set; }
- [DataMember(Name = "transform")]
+ ///
+ /// Trigger the configured action for every element within an array
+ /// defined by the path assigned.
+ ///
+ /// Valid only in Elasticsearch 7.3.0+
+ ///
+ [IgnoreDataMember]
+ string Foreach { get; set; }
+
+ ///
+ /// Transforms the payload before executing the action. The transformation is only applied
+ /// for the payload for this action.
+ ///
+ [IgnoreDataMember]
TransformContainer Transform { get; set; }
+
+ ///
+ /// A condition for the action. Allows a single watch to specify multiple actions, but
+ /// further control when each action will be executed.
+ ///
+ [IgnoreDataMember]
+ ConditionContainer Condition { get; set; }
}
+ ///
public abstract class ActionBase : IAction
{
internal ActionBase() { }
@@ -31,12 +66,21 @@ internal ActionBase() { }
public abstract ActionType ActionType { get; }
+ ///
public string Name { get; set; }
+ ///
public Time ThrottlePeriod { get; set; }
+ ///
+ public string Foreach { get; set; }
+
+ ///
public TransformContainer Transform { get; set; }
+ ///
+ public ConditionContainer Condition { get; set; }
+
public static bool operator false(ActionBase a) => false;
public static bool operator true(ActionBase a) => false;
@@ -78,7 +122,10 @@ internal class ActionsFormatter : IJsonFormatter
{ "index", 3 },
{ "logging", 4 },
{ "slack", 5 },
- { "pagerduty", 6 }
+ { "pagerduty", 6 },
+ { "foreach", 7 },
+ { "transform", 8 },
+ { "condition", 9 }
};
public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
@@ -92,6 +139,10 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
Time throttlePeriod = null;
IAction action = null;
+ string @foreach = null;
+ TransformContainer transform = null;
+ ConditionContainer condition = null;
+
while (reader.ReadIsInObject(ref actionCount))
{
var propertyName = reader.ReadPropertyNameSegmentRaw();
@@ -128,6 +179,17 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
action = formatterResolver.GetFormatter()
.Deserialize(ref reader, formatterResolver);
break;
+ case 7:
+ @foreach = reader.ReadString();
+ break;
+ case 8:
+ transform = formatterResolver.GetFormatter()
+ .Deserialize(ref reader, formatterResolver);
+ break;
+ case 9:
+ condition = formatterResolver.GetFormatter()
+ .Deserialize(ref reader, formatterResolver);
+ break;
}
}
else
@@ -138,6 +200,9 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
{
action.Name = name;
action.ThrottlePeriod = throttlePeriod;
+ action.Foreach = @foreach;
+ action.Transform = transform;
+ action.Condition = condition;
dictionary.Add(name, action);
}
}
@@ -166,6 +231,28 @@ public void Serialize(ref JsonWriter writer, Actions value, IJsonFormatterResolv
timeFormatter.Serialize(ref writer, action.ThrottlePeriod, formatterResolver);
writer.WriteValueSeparator();
}
+
+ if (!string.IsNullOrEmpty(action.Foreach))
+ {
+ writer.WritePropertyName("foreach");
+ writer.WriteString(action.Foreach);
+ writer.WriteValueSeparator();
+ }
+
+ if (action.Transform != null)
+ {
+ writer.WritePropertyName("transform");
+ formatterResolver.GetFormatter().Serialize(ref writer, action.Transform, formatterResolver);
+ writer.WriteValueSeparator();
+ }
+
+ if (action.Condition != null)
+ {
+ writer.WritePropertyName("condition");
+ formatterResolver.GetFormatter().Serialize(ref writer, action.Condition, formatterResolver);
+ writer.WriteValueSeparator();
+ }
+
writer.WritePropertyName(kvp.Value.ActionType.GetStringValue());
switch (action.ActionType)
diff --git a/src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs b/src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs
index c4216b78314..d1fc3898bc6 100644
--- a/src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs
+++ b/src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs
@@ -2,6 +2,7 @@
namespace Nest
{
+ ///
public abstract class ActionsDescriptorBase
: DescriptorBase, IAction
where TDescriptor : DescriptorBase, TInterface
@@ -29,10 +30,21 @@ string IAction.Name
Time IAction.ThrottlePeriod { get; set; }
TransformContainer IAction.Transform { get; set; }
+ ConditionContainer IAction.Condition { get; set; }
+ string IAction.Foreach { get; set; }
+ ///
public TDescriptor Transform(Func selector) =>
Assign(selector.InvokeOrDefault(new TransformDescriptor()), (a, v) => a.Transform = v);
+ ///
+ public TDescriptor Condition(Func selector) =>
+ Assign(selector.InvokeOrDefault(new ConditionDescriptor()), (a, v) => a.Condition = v);
+
+ ///
public TDescriptor ThrottlePeriod(Time throttlePeriod) => Assign(throttlePeriod, (a, v) => a.ThrottlePeriod = v);
+
+ ///
+ public TDescriptor Foreach(string @foreach) => Assign(@foreach, (a, v) => a.Foreach = v);
}
}
diff --git a/src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs b/src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs
index b51f694eb16..9d8c0e5aeb3 100644
--- a/src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs
+++ b/src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using Elastic.Xunit.XunitPlumbing;
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
@@ -710,4 +711,160 @@ protected override void ExpectResponse(PutWatchResponse response)
response.Id.Should().Be(CallIsolatedValue);
}
}
+
+ [SkipVersion("<7.3.0", "Foreach introduced in 7.3.0")]
+ public class PutWatchApiWithForeachTests : ApiIntegrationTestBase
+ {
+ public PutWatchApiWithForeachTests(XPackCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
+
+ protected override bool ExpectIsValid => true;
+
+ protected override object ExpectJson =>
+ new
+ {
+ input = new
+ {
+ search = new
+ {
+ request = new
+ {
+ indices = new[] { "project" },
+ body = new
+ {
+ query = new
+ {
+ range = new
+ {
+ numberOfCommits = new
+ {
+ gt = 10.0
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ trigger = new
+ {
+ schedule = new
+ {
+ interval = "5m"
+ }
+ },
+ actions = new
+ {
+ log_hits = new
+ {
+ @foreach = "ctx.payload.hits.hits",
+ logging = new
+ {
+ text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}"
+ },
+ transform = new
+ {
+ script = new
+ {
+ source = "return [ 'time' : ctx.trigger.scheduled_time ]"
+ }
+ },
+ condition = new
+ {
+ always = new {}
+ }
+ }
+ }
+ };
+
+ protected override int ExpectStatusCode => 201;
+
+ protected override HttpMethod HttpMethod => HttpMethod.PUT;
+
+ protected override Func Fluent => p => p
+ .Input(i => i
+ .Search(s => s
+ .Request(si => si
+ .Indices()
+ .Body(b => b
+ .Query(q => q
+ .Range(r => r
+ .Field(f => f.NumberOfCommits)
+ .GreaterThan(10)
+ )
+ )
+ )
+ )
+ )
+ )
+ .Trigger(t => t
+ .Schedule(s => s
+ .Interval(new Interval(5, IntervalUnit.Minute))
+ )
+ )
+ .Actions(a => a
+ .Logging("log_hits", i => i
+ .Foreach("ctx.payload.hits.hits")
+ .Text("Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}")
+ .Transform(t => t
+ .Script(st =>st
+ .Source("return [ 'time' : ctx.trigger.scheduled_time ]")
+ )
+ )
+ .Condition(c => c
+ .Always()
+ )
+ )
+ );
+
+ protected override PutWatchRequest Initializer =>
+ new PutWatchRequest(CallIsolatedValue)
+ {
+ Input = new SearchInput
+ {
+ Request = new SearchInputRequest
+ {
+ Indices = new IndexName[] { typeof(Project) },
+ Body = new SearchRequest
+ {
+ Query = new NumericRangeQuery
+ {
+ Field = Infer.Field(f => f.NumberOfCommits),
+ GreaterThan = 10
+ }
+ }
+ }
+ },
+ Trigger = new ScheduleContainer
+ {
+ Interval = new Interval(5, IntervalUnit.Minute)
+ },
+ Actions = new LoggingAction("log_hits")
+ {
+ Foreach = "ctx.payload.hits.hits",
+ Text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}",
+ Transform = new InlineScriptTransform("return [ 'time' : ctx.trigger.scheduled_time ]"),
+ Condition = new AlwaysCondition()
+ }
+ };
+
+ protected override bool SupportsDeserialization => false;
+
+ protected override string UrlPath => $"/_watcher/watch/{CallIsolatedValue}";
+
+ protected override LazyResponses ClientUsage() => Calls(
+ (client, f) => client.Watcher.Put(CallIsolatedValue, f),
+ (client, f) => client.Watcher.PutAsync(CallIsolatedValue, f),
+ (client, r) => client.Watcher.Put(r),
+ (client, r) => client.Watcher.PutAsync(r)
+ );
+
+ protected override PutWatchDescriptor NewDescriptor() => new PutWatchDescriptor(CallIsolatedValue);
+
+ protected override void ExpectResponse(PutWatchResponse response)
+ {
+ response.Created.Should().BeTrue();
+ response.Version.Should().Be(1);
+ response.Id.Should().Be(CallIsolatedValue);
+ }
+ }
}