Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move derivative agg to module #91014

Merged
merged 5 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.client.analytics.ParsedStringStats;
import org.elasticsearch.client.analytics.ParsedTopMetrics;
import org.elasticsearch.client.analytics.StringStatsAggregationBuilder;
Expand Down Expand Up @@ -151,7 +152,6 @@
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
Expand All @@ -35,7 +36,6 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketSelector;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.derivative;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -60,7 +60,7 @@ public class BucketSelectorIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(CustomScriptPlugin.class);
return List.of(CustomScriptPlugin.class, AggregationsPlugin.class);
}

public static class CustomScriptPlugin extends MockScriptPlugin {
Expand Down Expand Up @@ -570,7 +570,9 @@ public void testEmptyBuckets() {
.interval(1)
.extendedBounds(1L, 4L)
.minDocCount(0)
.subAggregation(derivative("derivative", "_count").gapPolicy(GapPolicy.INSERT_ZEROS))
.subAggregation(
new DerivativePipelineAggregationBuilder("derivative", "_count").gapPolicy(GapPolicy.INSERT_ZEROS)
)
)
)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;
Expand All @@ -31,11 +34,11 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.derivative;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -56,6 +59,11 @@ private ZonedDateTime date(int month, int day) {
return ZonedDateTime.of(2012, month, day, 0, 0, 0, 0, ZoneOffset.UTC);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(AggregationsPlugin.class);
}

private static IndexRequestBuilder indexDoc(String idx, ZonedDateTime date, int value) throws Exception {
return client().prepareIndex(idx).setSource(jsonBuilder().startObject().timeField("date", date).field("value", value).endObject());
}
Expand Down Expand Up @@ -113,7 +121,7 @@ public void testSingleValuedField() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down Expand Up @@ -158,7 +166,7 @@ public void testSingleValuedFieldNormalised() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.DAY))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.DAY))
)
.get();

Expand Down Expand Up @@ -223,7 +231,7 @@ public void testSingleValuedFieldNormalised_timeZone_CET_DstStart() throws Excep
.calendarInterval(DateHistogramInterval.DAY)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.HOUR))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.HOUR))
)
.get();

Expand Down Expand Up @@ -281,7 +289,7 @@ public void testSingleValuedFieldNormalised_timeZone_CET_DstEnd() throws Excepti
.calendarInterval(DateHistogramInterval.DAY)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.HOUR))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.HOUR))
)
.get();

Expand Down Expand Up @@ -341,7 +349,7 @@ public void testSingleValuedFieldNormalised_timeZone_AsiaKathmandu() throws Exce
.calendarInterval(DateHistogramInterval.HOUR)
.timeZone(timezone)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count").unit(DateHistogramInterval.MINUTE))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count").unit(DateHistogramInterval.MINUTE))
)
.get();

Expand Down Expand Up @@ -409,7 +417,7 @@ public void testSingleValuedFieldWithSubAggregation() throws Exception {
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(sum("sum").field("value"))
.subAggregation(derivative("deriv", "sum"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "sum"))
)
.get();

Expand Down Expand Up @@ -492,7 +500,7 @@ public void testMultiValuedField() throws Exception {
dateHistogram("histo").field("dates")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down Expand Up @@ -550,7 +558,7 @@ public void testUnmapped() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand All @@ -568,7 +576,7 @@ public void testPartiallyUnmapped() throws Exception {
dateHistogram("histo").field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(0)
.subAggregation(derivative("deriv", "_count"))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "_count"))
)
.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.aggregations.bucket.adjacency.InternalAdjacencyMatrix;
import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.aggregations.pipeline.Derivative;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.aggregations.pipeline.MovFnPipelineAggregationBuilder;
import org.elasticsearch.aggregations.pipeline.MovingFunctionScript;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -42,6 +44,11 @@ public List<AggregationSpec> getAggregations() {
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return List.of(
new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregationBuilder::parse
).addResultReader(Derivative::new),
new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,31 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class InternalDerivative extends InternalSimpleValue implements Derivative {
public class Derivative extends InternalSimpleValue {
private final double normalizationFactor;

InternalDerivative(String name, double value, double normalizationFactor, DocValueFormat formatter, Map<String, Object> metadata) {
Derivative(String name, double value, double normalizationFactor, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, value, formatter, metadata);
this.normalizationFactor = normalizationFactor;
}

/**
* Read from a stream.
*/
public InternalDerivative(StreamInput in) throws IOException {
public Derivative(StreamInput in) throws IOException {
super(in);
normalizationFactor = in.readDouble();
}
Expand All @@ -45,7 +46,12 @@ public String getWriteableName() {
return DerivativePipelineAggregationBuilder.NAME;
}

@Override
/**
* Returns the normalized value. If no normalised factor has been specified
* this method will return {@link #value()}
*
* @return the normalized value
*/
public double normalizedValue() {
return normalizationFactor > 0 ? (value() / normalizationFactor) : value();
}
Expand Down Expand Up @@ -95,7 +101,7 @@ public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
InternalDerivative other = (InternalDerivative) obj;
Derivative other = (Derivative) obj;
return Objects.equals(value, other.value) && Objects.equals(normalizationFactor, other.normalizationFactor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParsingException;
Expand All @@ -17,7 +17,9 @@
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
Expand All @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -70,7 +71,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new InternalDerivative(name(), gradient, xDiff, formatter, metadata()));
aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected SearchPlugin registerPlugin() {
protected List<NamedXContentRegistry.Entry> getNamedXContents() {
ContextParser<Object, Aggregation> parser = (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c);
return CollectionUtils.appendToCopy(
getDefaultNamedXContents(),
super.getNamedXContents(),
new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(AdjacencyMatrixAggregationBuilder.NAME), parser)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.network.InetAddresses;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.elasticsearch.search.aggregations.metrics.InternalStats;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ protected SearchPlugin registerPlugin() {
return new AggregationsPlugin();
}

// TODO: the base test class should be able to get this from the search plugin? (^)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we're registering the high level REST client's version which isn't in the SearchPlugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be removing these as we go. We don't need to parse the thing any more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we also had these tests to test the xcontent serialization (similarly how we test the binary serialization)? We lose that if we drop these tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the downside is that we need to maintain the xcontent parsing in tests...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. That's why I didn't nuke it. It's not obviously right. Before we had the high level REST client we never tested the round trip here - we'd just render xcontent and test in REST tests or specific xcontent renderings. Now we have randomized parsing. But it's quite heavy.

@Override
protected List<NamedXContentRegistry.Entry> getNamedXContents() {
ContextParser<Object, Aggregation> parser = (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c);
return CollectionUtils.appendToCopy(
getDefaultNamedXContents(),
super.getNamedXContents(),
new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(AutoDateHistogramAggregationBuilder.NAME), parser)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations.pipeline;
package org.elasticsearch.aggregations.pipeline;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.NumericDocValuesField;
Expand All @@ -19,11 +19,13 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.aggregations.AggregationsPlugin;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -35,6 +37,8 @@
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;

import java.io.IOException;
Expand Down Expand Up @@ -65,6 +69,11 @@ public class CumulativeSumAggregatorTests extends AggregatorTestCase {

private static final List<Integer> datasetValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

@Override
protected List<SearchPlugin> getSearchPlugins() {
return List.of(new AggregationsPlugin());
}

public void testSimple() throws IOException {
Query query = new MatchAllDocsQuery();

Expand Down
Loading