Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move derivative agg to module #91014

Merged
merged 5 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.client.analytics.ParsedStringStats;
import org.elasticsearch.client.analytics.ParsedTopMetrics;
import org.elasticsearch.client.analytics.StringStatsAggregationBuilder;
Expand Down Expand Up @@ -151,7 +152,6 @@
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
Expand All @@ -35,7 +36,6 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketSelector;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.derivative;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -60,7 +60,7 @@ public class BucketSelectorIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(CustomScriptPlugin.class);
return List.of(CustomScriptPlugin.class, AggregationsPlugin.class);
}

public static class CustomScriptPlugin extends MockScriptPlugin {
Expand Down Expand Up @@ -570,7 +570,9 @@ public void testEmptyBuckets() {
.interval(1)
.extendedBounds(1L, 4L)
.minDocCount(0)
.subAggregation(derivative("derivative", "_count").gapPolicy(GapPolicy.INSERT_ZEROS))
.subAggregation(
new DerivativePipelineAggregationBuilder("derivative", "_count").gapPolicy(GapPolicy.INSERT_ZEROS)
)
)
)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;
Expand All @@ -31,11 +34,11 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.derivative;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -56,6 +59,11 @@ private ZonedDateTime date(int month, int day) {
return ZonedDateTime.of(2012, month, day, 0, 0, 0, 0, ZoneOffset.UTC);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(AggregationsPlugin.class);
}

private static IndexRequestBuilder indexDoc(String idx, ZonedDateTime date, int value) throws Exception {
return client().prepareIndex(idx).setSource(jsonBuilder().startObject().timeField("date", date).field("value", value).endObject());
}
Expand Down Expand Up @@ -113,7 +121,7 @@ public void testSingleValuedField() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down Expand Up @@ -158,7 +166,7 @@ public void testSingleValuedFieldNormalised() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.DAY))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.DAY))
)
.get();

Expand Down Expand Up @@ -223,7 +231,7 @@ public void testSingleValuedFieldNormalised_timeZone_CET_DstStart() throws Excep
.calendarInterval(DateHistogramInterval.DAY)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.HOUR))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.HOUR))
)
.get();

Expand Down Expand Up @@ -281,7 +289,7 @@ public void testSingleValuedFieldNormalised_timeZone_CET_DstEnd() throws Excepti
.calendarInterval(DateHistogramInterval.DAY)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.HOUR))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.HOUR))
)
.get();

Expand Down Expand Up @@ -341,7 +349,7 @@ public void testSingleValuedFieldNormalised_timeZone_AsiaKathmandu() throws Exce
.calendarInterval(DateHistogramInterval.HOUR)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.MINUTE))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.MINUTE))
)
.get();

Expand Down Expand Up @@ -409,7 +417,7 @@ public void testSingleValuedFieldWithSubAggregation() throws Exception {
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(sum("sum").field("value"))
.subAggregation(derivative("deriv", "sum"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "sum"))
)
.get();

Expand Down Expand Up @@ -492,7 +500,7 @@ public void testMultiValuedField() throws Exception {
dateHistogram("histo").field("dates")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down Expand Up @@ -550,7 +558,7 @@ public void testUnmapped() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand All @@ -568,7 +576,7 @@ public void testPartiallyUnmapped() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.aggregations.bucket.adjacency.InternalAdjacencyMatrix;
import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.aggregations.pipeline.Derivative;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.aggregations.pipeline.MovFnPipelineAggregationBuilder;
import org.elasticsearch.aggregations.pipeline.MovingFunctionScript;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -42,6 +44,11 @@ public List<AggregationSpec> getAggregations() {
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return List.of(
new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregationBuilder::parse
).addResultReader(Derivative::new),
new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,31 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class InternalDerivative extends InternalSimpleValue implements Derivative {
public class Derivative extends InternalSimpleValue {
private final double normalizationFactor;

InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter, Map<String, Object> metadata) {
Derivative(String name, double value, double normalizationFactor, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, value, formatter, metadata);
this.normalizationFactor = normalizationFactor;
}

/**
* Read from a stream.
*/
public InternalDerivative(StreamInput in) throws IOException {
public Derivative(StreamInput in) throws IOException {
super(in);
normalizationFactor = in.readDouble();
}
Expand All @@ -45,7 +46,12 @@ public String getWriteableName() {
return DerivativePipelineAggregationBuilder.NAME;
}

@Override
/**
* Returns the normalized value. If no normalised factor has been specified
* this method will return {@link #value()}
*
* @return the normalized value
*/
public double normalizedValue() {
return normalizationFactor > 0 ? (value() / normalizationFactor) : value();
}
Expand Down Expand Up @@ -95,7 +101,7 @@ public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
InternalDerivative other = (InternalDerivative) obj;
Derivative other = (Derivative) obj;
return Objects.equals(value, other.value) && Objects.equals(normalizationFactor, other.normalizationFactor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParsingException;
Expand All @@ -17,7 +17,9 @@
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
Expand All @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -70,7 +71,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new InternalDerivative(name(), gradient, xDiff, formatter, metadata()));
aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.network.InetAddresses;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.elasticsearch.search.aggregations.metrics.InternalStats;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.NumericDocValuesField;
Expand All @@ -19,11 +19,13 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -35,6 +37,8 @@
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;

import java.io.IOException;
Expand Down Expand Up @@ -65,6 +69,11 @@ public class CumulativeSumAggregatorTests extends AggregatorTestCase {

private static final List<Integer> datasetValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

@Override
protected List<SearchPlugin> getSearchPlugins() {
return List.of(new AggregationsPlugin());
}

public void testSimple() throws IOException {
Query query = new MatchAllDocsQuery();

Expand Down
Loading