Skip to content

Commit

Permalink
Allow execution of each Watcher action in array (#4068)
Browse files Browse the repository at this point in the history
* Allow execution of each Watcher action in array

Relates: #4001

This commit introduces Foreach on Watcher actions, to allow execution of the Watcher
action on each element in an array specified by the path assigned to foreach.

Add condition to Watcher actions and ensure condition and transform are serialized.
Add integration test for Action foreach, transform and condition.
  • Loading branch information
russcam authored Sep 4, 2019
1 parent 6284623 commit 8acde7e
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 2 deletions.
91 changes: 89 additions & 2 deletions src/Nest/XPack/Watcher/Action/ActionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,57 @@

namespace Nest
{
/// <summary>
/// A Watcher action
/// </summary>
[InterfaceDataContract]
public interface IAction
{
/// <summary>
/// The type of action
/// </summary>
[IgnoreDataMember]
ActionType ActionType { get; }

/// <summary>
/// The name of the action
/// </summary>
[IgnoreDataMember]
string Name { get; set; }

/// <summary>
/// Limit how often an action is executed, after it has been executed.
/// When a throttling period is set, repeated executions of the action are prevented if it has already
/// executed within the throttling period time frame (now - throttling period).
/// </summary>
[IgnoreDataMember]
Time ThrottlePeriod { get; set; }

[DataMember(Name = "transform")]
/// <summary>
/// Trigger the configured action for every element within an array
/// defined by the path assigned.
/// <para />
/// Valid only in Elasticsearch 7.3.0+
/// </summary>
[IgnoreDataMember]
string Foreach { get; set; }

/// <summary>
/// Transforms the payload before executing the action. The transformation is only applied
/// for the payload for this action.
/// </summary>
[IgnoreDataMember]
TransformContainer Transform { get; set; }

/// <summary>
/// A condition for the action. Allows a single watch to specify multiple actions, but
/// further control when each action will be executed.
/// </summary>
[IgnoreDataMember]
ConditionContainer Condition { get; set; }
}

/// <inheritdoc />
public abstract class ActionBase : IAction
{
internal ActionBase() { }
Expand All @@ -31,12 +66,21 @@ internal ActionBase() { }

public abstract ActionType ActionType { get; }

/// <inheritdoc />
public string Name { get; set; }

/// <inheritdoc />
public Time ThrottlePeriod { get; set; }

/// <inheritdoc />
public string Foreach { get; set; }

/// <inheritdoc />
public TransformContainer Transform { get; set; }

/// <inheritdoc />
public ConditionContainer Condition { get; set; }

public static bool operator false(ActionBase a) => false;

public static bool operator true(ActionBase a) => false;
Expand Down Expand Up @@ -78,7 +122,10 @@ internal class ActionsFormatter : IJsonFormatter<Actions>
{ "index", 3 },
{ "logging", 4 },
{ "slack", 5 },
{ "pagerduty", 6 }
{ "pagerduty", 6 },
{ "foreach", 7 },
{ "transform", 8 },
{ "condition", 9 }
};

public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
Expand All @@ -92,6 +139,10 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt

Time throttlePeriod = null;
IAction action = null;
string @foreach = null;
TransformContainer transform = null;
ConditionContainer condition = null;

while (reader.ReadIsInObject(ref actionCount))
{
var propertyName = reader.ReadPropertyNameSegmentRaw();
Expand Down Expand Up @@ -128,6 +179,17 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
action = formatterResolver.GetFormatter<PagerDutyAction>()
.Deserialize(ref reader, formatterResolver);
break;
case 7:
@foreach = reader.ReadString();
break;
case 8:
transform = formatterResolver.GetFormatter<TransformContainer>()
.Deserialize(ref reader, formatterResolver);
break;
case 9:
condition = formatterResolver.GetFormatter<ConditionContainer>()
.Deserialize(ref reader, formatterResolver);
break;
}
}
else
Expand All @@ -138,6 +200,9 @@ public Actions Deserialize(ref JsonReader reader, IJsonFormatterResolver formatt
{
action.Name = name;
action.ThrottlePeriod = throttlePeriod;
action.Foreach = @foreach;
action.Transform = transform;
action.Condition = condition;
dictionary.Add(name, action);
}
}
Expand Down Expand Up @@ -166,6 +231,28 @@ public void Serialize(ref JsonWriter writer, Actions value, IJsonFormatterResolv
timeFormatter.Serialize(ref writer, action.ThrottlePeriod, formatterResolver);
writer.WriteValueSeparator();
}

if (!string.IsNullOrEmpty(action.Foreach))
{
writer.WritePropertyName("foreach");
writer.WriteString(action.Foreach);
writer.WriteValueSeparator();
}

if (action.Transform != null)
{
writer.WritePropertyName("transform");
formatterResolver.GetFormatter<TransformContainer>().Serialize(ref writer, action.Transform, formatterResolver);
writer.WriteValueSeparator();
}

if (action.Condition != null)
{
writer.WritePropertyName("condition");
formatterResolver.GetFormatter<ConditionContainer>().Serialize(ref writer, action.Condition, formatterResolver);
writer.WriteValueSeparator();
}

writer.WritePropertyName(kvp.Value.ActionType.GetStringValue());

switch (action.ActionType)
Expand Down
12 changes: 12 additions & 0 deletions src/Nest/XPack/Watcher/Action/ActionsDescriptorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Nest
{
/// <inheritdoc cref="IAction"/>
public abstract class ActionsDescriptorBase<TDescriptor, TInterface>
: DescriptorBase<TDescriptor, TInterface>, IAction
where TDescriptor : DescriptorBase<TDescriptor, TInterface>, TInterface
Expand Down Expand Up @@ -29,10 +30,21 @@ string IAction.Name

Time IAction.ThrottlePeriod { get; set; }
TransformContainer IAction.Transform { get; set; }
ConditionContainer IAction.Condition { get; set; }
string IAction.Foreach { get; set; }

/// <inheritdoc cref="IAction.Transform"/>
public TDescriptor Transform(Func<TransformDescriptor, TransformContainer> selector) =>
Assign(selector.InvokeOrDefault(new TransformDescriptor()), (a, v) => a.Transform = v);

/// <inheritdoc cref="IAction.Condition"/>
public TDescriptor Condition(Func<ConditionDescriptor, ConditionContainer> selector) =>
Assign(selector.InvokeOrDefault(new ConditionDescriptor()), (a, v) => a.Condition = v);

/// <inheritdoc cref="IAction.ThrottlePeriod"/>
public TDescriptor ThrottlePeriod(Time throttlePeriod) => Assign(throttlePeriod, (a, v) => a.ThrottlePeriod = v);

/// <inheritdoc cref="IAction.Foreach"/>
public TDescriptor Foreach(string @foreach) => Assign(@foreach, (a, v) => a.Foreach = v);
}
}
157 changes: 157 additions & 0 deletions src/Tests/Tests/XPack/Watcher/PutWatch/PutWatchApiTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Elastic.Xunit.XunitPlumbing;
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
Expand Down Expand Up @@ -710,4 +711,160 @@ protected override void ExpectResponse(PutWatchResponse response)
response.Id.Should().Be(CallIsolatedValue);
}
}

[SkipVersion("<7.3.0", "Foreach introduced in 7.3.0")]
public class PutWatchApiWithForeachTests : ApiIntegrationTestBase<XPackCluster, PutWatchResponse, IPutWatchRequest, PutWatchDescriptor, PutWatchRequest>
{
public PutWatchApiWithForeachTests(XPackCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override bool ExpectIsValid => true;

protected override object ExpectJson =>
new
{
input = new
{
search = new
{
request = new
{
indices = new[] { "project" },
body = new
{
query = new
{
range = new
{
numberOfCommits = new
{
gt = 10.0
}
}
}
}
}
}
},
trigger = new
{
schedule = new
{
interval = "5m"
}
},
actions = new
{
log_hits = new
{
@foreach = "ctx.payload.hits.hits",
logging = new
{
text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}"
},
transform = new
{
script = new
{
source = "return [ 'time' : ctx.trigger.scheduled_time ]"
}
},
condition = new
{
always = new {}
}
}
}
};

protected override int ExpectStatusCode => 201;

protected override HttpMethod HttpMethod => HttpMethod.PUT;

protected override Func<PutWatchDescriptor, IPutWatchRequest> Fluent => p => p
.Input(i => i
.Search(s => s
.Request(si => si
.Indices<Project>()
.Body<Project>(b => b
.Query(q => q
.Range(r => r
.Field(f => f.NumberOfCommits)
.GreaterThan(10)
)
)
)
)
)
)
.Trigger(t => t
.Schedule(s => s
.Interval(new Interval(5, IntervalUnit.Minute))
)
)
.Actions(a => a
.Logging("log_hits", i => i
.Foreach("ctx.payload.hits.hits")
.Text("Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}")
.Transform(t => t
.Script(st =>st
.Source("return [ 'time' : ctx.trigger.scheduled_time ]")
)
)
.Condition(c => c
.Always()
)
)
);

protected override PutWatchRequest Initializer =>
new PutWatchRequest(CallIsolatedValue)
{
Input = new SearchInput
{
Request = new SearchInputRequest
{
Indices = new IndexName[] { typeof(Project) },
Body = new SearchRequest<Project>
{
Query = new NumericRangeQuery
{
Field = Infer.Field<Project>(f => f.NumberOfCommits),
GreaterThan = 10
}
}
}
},
Trigger = new ScheduleContainer
{
Interval = new Interval(5, IntervalUnit.Minute)
},
Actions = new LoggingAction("log_hits")
{
Foreach = "ctx.payload.hits.hits",
Text = "Found id {{ctx.payload._id}} with field {{ctx.payload._source.numberOfCommits}}",
Transform = new InlineScriptTransform("return [ 'time' : ctx.trigger.scheduled_time ]"),
Condition = new AlwaysCondition()
}
};

protected override bool SupportsDeserialization => false;

protected override string UrlPath => $"/_watcher/watch/{CallIsolatedValue}";

protected override LazyResponses ClientUsage() => Calls(
(client, f) => client.Watcher.Put(CallIsolatedValue, f),
(client, f) => client.Watcher.PutAsync(CallIsolatedValue, f),
(client, r) => client.Watcher.Put(r),
(client, r) => client.Watcher.PutAsync(r)
);

protected override PutWatchDescriptor NewDescriptor() => new PutWatchDescriptor(CallIsolatedValue);

protected override void ExpectResponse(PutWatchResponse response)
{
response.Created.Should().BeTrue();
response.Version.Should().Be(1);
response.Id.Should().Be(CallIsolatedValue);
}
}
}

0 comments on commit 8acde7e

Please sign in to comment.