Skip to content

Commit

Permalink
Migrate scripted metric aggregation scripts to ScriptContext design (#…
Browse files Browse the repository at this point in the history
…30111)

* Migrate scripted metric aggregation scripts to ScriptContext design #29328

* Rename new script context container class and add clarifying comments to remaining references to params._agg(s)

* Misc cleanup: make mock metric agg script inner classes static

* Move _score to an accessor rather than an arg for scripted metric agg scripts

This causes the score to be evaluated only when it's used.

* Documentation changes for params._agg -> agg

* Migration doc addition for scripted metric aggs _agg object change

* Rename "agg" Scripted Metric Aggregation script context variable to "state"

* Rename a private base class from ...Agg to ...State that I missed in my last commit

* Clean up imports after merge
  • Loading branch information
rationull authored and rjernst committed Jul 2, 2018
1 parent fffcf93 commit 0efe5bc
Show file tree
Hide file tree
Showing 12 changed files with 616 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ Here is an example on how to create the aggregation request:
--------------------------------------------------
ScriptedMetricAggregationBuilder aggregation = AggregationBuilders
.scriptedMetric("agg")
.initScript(new Script("params._agg.heights = []"))
.mapScript(new Script("params._agg.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"));
.initScript(new Script("state.heights = []"))
.mapScript(new Script("state.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"));
--------------------------------------------------

You can also specify a `combine` script which will be executed on each shard:
Expand All @@ -23,9 +23,9 @@ You can also specify a `combine` script which will be executed on each shard:
--------------------------------------------------
ScriptedMetricAggregationBuilder aggregation = AggregationBuilders
.scriptedMetric("agg")
.initScript(new Script("params._agg.heights = []"))
.mapScript(new Script("params._agg.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"))
.combineScript(new Script("double heights_sum = 0.0; for (t in params._agg.heights) { heights_sum += t } return heights_sum"));
.initScript(new Script("state.heights = []"))
.mapScript(new Script("state.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"))
.combineScript(new Script("double heights_sum = 0.0; for (t in state.heights) { heights_sum += t } return heights_sum"));
--------------------------------------------------

You can also specify a `reduce` script which will be executed on the node which gets the request:
Expand All @@ -34,10 +34,10 @@ You can also specify a `reduce` script which will be executed on the node which
--------------------------------------------------
ScriptedMetricAggregationBuilder aggregation = AggregationBuilders
.scriptedMetric("agg")
.initScript(new Script("params._agg.heights = []"))
.mapScript(new Script("params._agg.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"))
.combineScript(new Script("double heights_sum = 0.0; for (t in params._agg.heights) { heights_sum += t } return heights_sum"))
.reduceScript(new Script("double heights_sum = 0.0; for (a in params._aggs) { heights_sum += a } return heights_sum"));
.initScript(new Script("state.heights = []"))
.mapScript(new Script("state.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"))
.combineScript(new Script("double heights_sum = 0.0; for (t in state.heights) { heights_sum += t } return heights_sum"))
.reduceScript(new Script("double heights_sum = 0.0; for (a in states) { heights_sum += a } return heights_sum"));
--------------------------------------------------


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ POST ledger/_search?size=0
"aggs": {
"profit": {
"scripted_metric": {
"init_script" : "params._agg.transactions = []",
"map_script" : "params._agg.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)", <1>
"combine_script" : "double profit = 0; for (t in params._agg.transactions) { profit += t } return profit",
"reduce_script" : "double profit = 0; for (a in params._aggs) { profit += a } return profit"
"init_script" : "state.transactions = []",
"map_script" : "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)", <1>
"combine_script" : "double profit = 0; for (t in state.transactions) { profit += t } return profit",
"reduce_script" : "double profit = 0; for (a in states) { profit += a } return profit"
}
}
}
Expand Down Expand Up @@ -67,8 +67,7 @@ POST ledger/_search?size=0
"id": "my_combine_script"
},
"params": {
"field": "amount", <1>
"_agg": {} <2>
"field": "amount" <1>
},
"reduce_script" : {
"id": "my_reduce_script"
Expand All @@ -82,8 +81,7 @@ POST ledger/_search?size=0
// TEST[setup:ledger,stored_scripted_metric_script]

<1> script parameters for `init`, `map` and `combine` scripts must be specified
in a global `params` object so that it can be share between the scripts.
<2> if you specify script parameters then you must specify `"_agg": {}`.
in a global `params` object so that it can be shared between the scripts.

////
Verify this response as well but in a hidden block.
Expand All @@ -108,7 +106,7 @@ For more details on specifying scripts see <<modules-scripting, script documenta

==== Allowed return types

Whilst any valid script object can be used within a single script, the scripts must return or store in the `_agg` object only the following types:
Whilst any valid script object can be used within a single script, the scripts must return or store in the `state` object only the following types:

* primitive types
* String
Expand All @@ -121,10 +119,10 @@ The scripted metric aggregation uses scripts at 4 stages of its execution:

init_script:: Executed prior to any collection of documents. Allows the aggregation to set up any initial state.
+
In the above example, the `init_script` creates an array `transactions` in the `_agg` object.
In the above example, the `init_script` creates an array `transactions` in the `state` object.

map_script:: Executed once per document collected. This is the only required script. If no combine_script is specified, the resulting state
needs to be stored in an object named `_agg`.
needs to be stored in the `state` object.
+
In the above example, the `map_script` checks the value of the type field. If the value is 'sale' the value of the amount field
is added to the transactions array. If the value of the type field is not 'sale' the negated value of the amount field is added
Expand All @@ -137,8 +135,8 @@ In the above example, the `combine_script` iterates through all the stored trans
and finally returns `profit`.

reduce_script:: Executed once on the coordinating node after all shards have returned their results. The script is provided with access to a
variable `_aggs` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided
the reduce phase will return the `_aggs` variable.
variable `states` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided
the reduce phase will return the `states` variable.
+
In the above example, the `reduce_script` iterates through the `profit` returned by each shard summing the values before returning the
final combined profit which will be returned in the response of the aggregation.
Expand Down Expand Up @@ -166,13 +164,11 @@ at each stage of the example above.

===== Before init_script

No params object was specified so the default params object is used:
`state` is initialized as a new empty object.

[source,js]
--------------------------------------------------
"params" : {
"_agg" : {}
}
"state" : {}
--------------------------------------------------
// NOTCONSOLE

Expand All @@ -184,10 +180,8 @@ Shard A::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : []
}
"state" : {
"transactions" : []
}
--------------------------------------------------
// NOTCONSOLE
Expand All @@ -196,10 +190,8 @@ Shard B::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : []
}
"state" : {
"transactions" : []
}
--------------------------------------------------
// NOTCONSOLE
Expand All @@ -212,10 +204,8 @@ Shard A::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : [ 80, -30 ]
}
"state" : {
"transactions" : [ 80, -30 ]
}
--------------------------------------------------
// NOTCONSOLE
Expand All @@ -224,10 +214,8 @@ Shard B::
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {
"transactions" : [ -10, 130 ]
}
"state" : {
"transactions" : [ -10, 130 ]
}
--------------------------------------------------
// NOTCONSOLE
Expand All @@ -242,11 +230,11 @@ Shard B:: 120

===== After reduce_script

The reduce_script receives an `_aggs` array containing the result of the combine script for each shard:
The reduce_script receives a `states` array containing the result of the combine script for each shard:

[source,js]
--------------------------------------------------
"_aggs" : [
"states" : [
50,
120
]
Expand Down Expand Up @@ -279,14 +267,12 @@ params:: Optional. An object whose contents will be passed as variable
+
[source,js]
--------------------------------------------------
"params" : {
"_agg" : {}
}
"params" : {}
--------------------------------------------------
// NOTCONSOLE

==== Empty Buckets

If a parent bucket of the scripted metric aggregation does not collect any documents an empty aggregation response will be returned from the
shard with a `null` value. In this case the `reduce_script`'s `_aggs` variable will contain `null` as a response from that shard.
shard with a `null` value. In this case the `reduce_script`'s `states` variable will contain `null` as a response from that shard.
`reduce_script`'s should therefore expect and deal with `null` responses from shards.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.painless;

import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.painless.spi.Whitelist;
import org.elasticsearch.script.ScriptedMetricAggContexts;
import org.elasticsearch.script.ScriptContext;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ScriptedMetricAggContextsTests extends ScriptTestCase {
@Override
protected Map<ScriptContext<?>, List<Whitelist>> scriptContexts() {
Map<ScriptContext<?>, List<Whitelist>> contexts = new HashMap<>();
contexts.put(ScriptedMetricAggContexts.InitScript.CONTEXT, Whitelist.BASE_WHITELISTS);
contexts.put(ScriptedMetricAggContexts.MapScript.CONTEXT, Whitelist.BASE_WHITELISTS);
contexts.put(ScriptedMetricAggContexts.CombineScript.CONTEXT, Whitelist.BASE_WHITELISTS);
contexts.put(ScriptedMetricAggContexts.ReduceScript.CONTEXT, Whitelist.BASE_WHITELISTS);
return contexts;
}

public void testInitBasic() {
ScriptedMetricAggContexts.InitScript.Factory factory = scriptEngine.compile("test",
"state.testField = params.initialVal", ScriptedMetricAggContexts.InitScript.CONTEXT, Collections.emptyMap());

Map<String, Object> params = new HashMap<>();
Map<String, Object> state = new HashMap<>();

params.put("initialVal", 10);

ScriptedMetricAggContexts.InitScript script = factory.newInstance(params, state);
script.execute();

assert(state.containsKey("testField"));
assertEquals(10, state.get("testField"));
}

public void testMapBasic() {
ScriptedMetricAggContexts.MapScript.Factory factory = scriptEngine.compile("test",
"state.testField = 2*_score", ScriptedMetricAggContexts.MapScript.CONTEXT, Collections.emptyMap());

Map<String, Object> params = new HashMap<>();
Map<String, Object> state = new HashMap<>();

Scorer scorer = new Scorer(null) {
@Override
public int docID() { return 0; }

@Override
public float score() { return 0.5f; }

@Override
public DocIdSetIterator iterator() { return null; }
};

ScriptedMetricAggContexts.MapScript.LeafFactory leafFactory = factory.newFactory(params, state, null);
ScriptedMetricAggContexts.MapScript script = leafFactory.newInstance(null);

script.setScorer(scorer);
script.execute();

assert(state.containsKey("testField"));
assertEquals(1.0, state.get("testField"));
}

public void testCombineBasic() {
ScriptedMetricAggContexts.CombineScript.Factory factory = scriptEngine.compile("test",
"state.testField = params.initialVal; return state.testField + params.inc", ScriptedMetricAggContexts.CombineScript.CONTEXT,
Collections.emptyMap());

Map<String, Object> params = new HashMap<>();
Map<String, Object> state = new HashMap<>();

params.put("initialVal", 10);
params.put("inc", 2);

ScriptedMetricAggContexts.CombineScript script = factory.newInstance(params, state);
Object res = script.execute();

assert(state.containsKey("testField"));
assertEquals(10, state.get("testField"));
assertEquals(12, res);
}

public void testReduceBasic() {
ScriptedMetricAggContexts.ReduceScript.Factory factory = scriptEngine.compile("test",
"states[0].testField + states[1].testField", ScriptedMetricAggContexts.ReduceScript.CONTEXT, Collections.emptyMap());

Map<String, Object> params = new HashMap<>();
List<Object> states = new ArrayList<>();

Map<String, Object> state1 = new HashMap<>(), state2 = new HashMap<>();
state1.put("testField", 1);
state2.put("testField", 2);

states.add(state1);
states.add(state2);

ScriptedMetricAggContexts.ReduceScript script = factory.newInstance(params, states);
Object res = script.execute();
assertEquals(3, res);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public class ScriptModule {
SimilarityScript.CONTEXT,
SimilarityWeightScript.CONTEXT,
TemplateScript.CONTEXT,
MovingFunctionScript.CONTEXT
MovingFunctionScript.CONTEXT,
ScriptedMetricAggContexts.InitScript.CONTEXT,
ScriptedMetricAggContexts.MapScript.CONTEXT,
ScriptedMetricAggContexts.CombineScript.CONTEXT,
ScriptedMetricAggContexts.ReduceScript.CONTEXT
).collect(Collectors.toMap(c -> c.name, Function.identity()));
}

Expand Down
Loading

0 comments on commit 0efe5bc

Please sign in to comment.