diff --git a/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java b/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java index 5e0e8b3d48963..79c015791641a 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java +++ b/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java @@ -171,7 +171,7 @@ public int getBucketSize() { */ @FunctionalInterface public interface ResultBuilder { - T build(long index, SortValue sortValue); + T build(long index, SortValue sortValue) throws IOException; } /** @@ -180,7 +180,7 @@ public interface ResultBuilder { * @param builder builds results. See {@link ExtraData} for how to store * data along side the sort for this to extract. */ - public final > List getValues(long bucket, ResultBuilder builder) { + public final > List getValues(long bucket, ResultBuilder builder) throws IOException { long rootIndex = bucket * bucketSize; if (rootIndex >= values().size()) { // We've never seen this bucket. @@ -201,7 +201,7 @@ public final > List getValues(long bucket, ResultBuil * Get the values for a bucket if it has been collected. If it hasn't * then returns an empty array. */ - public final List getValues(long bucket) { + public final List getValues(long bucket) throws IOException { return getValues(bucket, (i, sv) -> sv); } diff --git a/server/src/main/java/org/elasticsearch/search/sort/SortValue.java b/server/src/main/java/org/elasticsearch/search/sort/SortValue.java index 92fbe3cfc4875..e19f80de33391 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/SortValue.java +++ b/server/src/main/java/org/elasticsearch/search/sort/SortValue.java @@ -19,6 +19,8 @@ package org.elasticsearch.search.sort; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -48,13 +50,22 @@ public static SortValue from(long l) { return new LongSortValue(l); } + /** + * Get a {@linkplain SortValue} for bytes. Callers should be sure that they + * have a {@link BytesRef#deepCopyOf} of any mutable references. + */ + public static SortValue from(BytesRef bytes) { + return new BytesSortValue(bytes); + } + /** * Get the list of {@linkplain NamedWriteable}s that this class needs. */ public static List namedWriteables() { return Arrays.asList( new NamedWriteableRegistry.Entry(SortValue.class, DoubleSortValue.NAME, DoubleSortValue::new), - new NamedWriteableRegistry.Entry(SortValue.class, LongSortValue.NAME, LongSortValue::new)); + new NamedWriteableRegistry.Entry(SortValue.class, LongSortValue.NAME, LongSortValue::new), + new NamedWriteableRegistry.Entry(SortValue.class, BytesSortValue.NAME, BytesSortValue::new)); } private SortValue() { @@ -200,7 +211,7 @@ private static class LongSortValue extends SortValue { this.key = key; } - LongSortValue(StreamInput in) throws IOException { + private LongSortValue(StreamInput in) throws IOException { key = in.readLong(); } @@ -259,4 +270,80 @@ public Number numberValue() { return key; } } + + private static class BytesSortValue extends SortValue { + public static final String NAME = "bytes"; + + private final BytesRef key; + + BytesSortValue(BytesRef key) { + this.key = key; + } + + private BytesSortValue(StreamInput in) throws IOException { + key = in.readBytesRef(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().before(Version.V_7_11_0)) { + throw new IllegalArgumentException( + "versions of Elasticsearch before 7.11.0 can't handle non-numeric sort values and attempted to send to [" + + out.getVersion() + + "]" + ); + } + out.writeBytesRef(key); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public Object getKey() { + return key; + } + + @Override + public String format(DocValueFormat format) { + return format.format(key).toString(); + } + + @Override + protected XContentBuilder rawToXContent(XContentBuilder builder) throws IOException { + return builder.value(key.utf8ToString()); + } + + @Override + protected int compareToSameType(SortValue obj) { + BytesSortValue other = (BytesSortValue) obj; + return key.compareTo(other.key); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || false == getClass().equals(obj.getClass())) { + return false; + } + BytesSortValue other = (BytesSortValue) obj; + return key.equals(other.key); + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public String toString() { + return key.toString(); + } + + @Override + public Number numberValue() { + throw new UnsupportedOperationException(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/sort/BucketedSortTestCase.java b/server/src/test/java/org/elasticsearch/search/sort/BucketedSortTestCase.java index 43e648929e9a2..80d711cde0f3a 100644 --- a/server/src/test/java/org/elasticsearch/search/sort/BucketedSortTestCase.java +++ b/server/src/test/java/org/elasticsearch/search/sort/BucketedSortTestCase.java @@ -73,7 +73,7 @@ private T build(SortOrder order, int bucketSize, double[] values) { return build(order, format, bucketSize, BucketedSort.NOOP_EXTRA_DATA, values); } - public final void testNeverCalled() { + public final void testNeverCalled() throws IOException { SortOrder order = randomFrom(SortOrder.values()); DocValueFormat format = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN); try (T sort = build(order, format, 1, BucketedSort.NOOP_EXTRA_DATA, new double[] {})) { diff --git a/server/src/test/java/org/elasticsearch/search/sort/SortValueTests.java b/server/src/test/java/org/elasticsearch/search/sort/SortValueTests.java index 6287490dc5c76..d354c00717ce6 100644 --- a/server/src/test/java/org/elasticsearch/search/sort/SortValueTests.java +++ b/server/src/test/java/org/elasticsearch/search/sort/SortValueTests.java @@ -19,14 +19,19 @@ package org.elasticsearch.search.sort; +import org.apache.lucene.document.InetAddressPoint; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.test.AbstractNamedWriteableTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.time.ZoneId; @@ -51,7 +56,12 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { @Override protected SortValue createTestInstance() { - return randomBoolean() ? SortValue.from(randomDouble()) : SortValue.from(randomLong()); + switch (between(0, 2)) { + case 0: return SortValue.from(randomDouble()); + case 1: return SortValue.from(randomLong()); + case 2: return SortValue.from(new BytesRef(randomAlphaOfLength(5))); + default: throw new AssertionError(); + } } @Override @@ -81,11 +91,23 @@ public void testToXContentLong() { assertThat(toXContent(SortValue.from(1), STRICT_DATE_TIME), equalTo("{\"test\":\"1970-01-01T00:00:00.001Z\"}")); } + public void testToXContentBytes() { + assertThat(toXContent(SortValue.from(new BytesRef("cat")), DocValueFormat.RAW), equalTo("{\"test\":\"cat\"}")); + assertThat( + toXContent(SortValue.from(new BytesRef(InetAddressPoint.encode(InetAddresses.forString("127.0.0.1")))), DocValueFormat.IP), + equalTo("{\"test\":\"127.0.0.1\"}") + ); + } + public void testCompareDifferentTypes() { assertThat(SortValue.from(1.0), lessThan(SortValue.from(1))); assertThat(SortValue.from(Double.MAX_VALUE), lessThan(SortValue.from(Long.MIN_VALUE))); assertThat(SortValue.from(1), greaterThan(SortValue.from(1.0))); assertThat(SortValue.from(Long.MIN_VALUE), greaterThan(SortValue.from(Double.MAX_VALUE))); + assertThat(SortValue.from(new BytesRef("cat")), lessThan(SortValue.from(1))); + assertThat(SortValue.from(1), greaterThan(SortValue.from(new BytesRef("cat")))); + assertThat(SortValue.from(new BytesRef("cat")), lessThan(SortValue.from(1.0))); + assertThat(SortValue.from(1.0), greaterThan(SortValue.from(new BytesRef("cat")))); } public void testCompareDoubles() { @@ -102,6 +124,25 @@ public void testCompareLongs() { assertThat(SortValue.from(r), greaterThan(SortValue.from(r - 1))); } + public void testBytes() { + String r = randomAlphaOfLength(5); + assertThat(SortValue.from(new BytesRef(r)), equalTo(SortValue.from(new BytesRef(r)))); + assertThat(SortValue.from(new BytesRef(r)), lessThan(SortValue.from(new BytesRef(r + "with_suffix")))); + assertThat(SortValue.from(new BytesRef(r)), greaterThan(SortValue.from(new BytesRef(new byte[] {})))); + } + + public void testSerializeBytesToOldVersion() { + SortValue value = SortValue.from(new BytesRef("can't send me!")); + Version version = VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_10_1); + Exception e = expectThrows(IllegalArgumentException.class, () -> copyInstance(value, version)); + assertThat( + e.getMessage(), + equalTo( + "versions of Elasticsearch before 7.11.0 can't handle non-numeric sort values and attempted to send to [" + version + "]" + ) + ); + } + public String toXContent(SortValue sortValue, DocValueFormat format) { return Strings.toString(new ToXContentFragment() { @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index 8fb6fc6e63b13..1cd024b2698f5 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -114,7 +114,8 @@ public List getAggregations() { TopMetricsAggregationBuilder.NAME, TopMetricsAggregationBuilder::new, usage.track(AnalyticsStatsAction.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER))) - .addResultReader(InternalTopMetrics::new), + .addResultReader(InternalTopMetrics::new) + .setAggregatorRegistrar(TopMetricsAggregationBuilder::registerAggregators), new AggregationSpec( TTestAggregationBuilder.NAME, TTestAggregationBuilder::new, diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java index 94c84cb3ffbe9..02947343d9411 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregationBuilder.java @@ -18,7 +18,10 @@ import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry.RegistryKey; import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; @@ -34,6 +37,28 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder REGISTRY_KEY = new RegistryKey<>( + TopMetricsAggregationBuilder.NAME, + TopMetricsAggregator.MetricValuesSupplier.class + ); + + public static void registerAggregators(ValuesSourceRegistry.Builder registry) { + registry.registerUsage(NAME); + registry.register(REGISTRY_KEY, List.of(CoreValuesSourceType.NUMERIC), TopMetricsAggregator::buildNumericMetricValues, false); + registry.register( + REGISTRY_KEY, + List.of(CoreValuesSourceType.BOOLEAN, CoreValuesSourceType.DATE), + TopMetricsAggregator.LongMetricValues::new, + false + ); + registry.register( + REGISTRY_KEY, + List.of(CoreValuesSourceType.BYTES, CoreValuesSourceType.IP), + TopMetricsAggregator.GlobalOrdsValues::new, + false + ); + } + /** * Default to returning only a single top metric. */ diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java index b42154d46a83e..2df02aff67312 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java @@ -8,8 +8,10 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; @@ -24,6 +26,8 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortBuilder; @@ -37,6 +41,7 @@ import java.util.Map; import static java.util.stream.Collectors.toList; +import static org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.REGISTRY_KEY; /** * Collects the {@code top_metrics} aggregation, which functions like a memory @@ -57,17 +62,24 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue { private final BucketedSort sort; private final Metrics metrics; - TopMetricsAggregator(String name, SearchContext context, Aggregator parent, Map metadata, int size, - SortBuilder sort, List metricSources) throws IOException { + TopMetricsAggregator( + String name, + SearchContext context, + Aggregator parent, + Map metadata, + int size, + SortBuilder sort, + MetricValues[] metricValues + ) throws IOException { super(name, context, parent, metadata); this.size = size; - metrics = new Metrics(size, context.getQueryShardContext().bigArrays(), metricSources); + this.metrics = new TopMetricsAggregator.Metrics(metricValues); /* * If we're only collecting a single value then only provided *that* * value to the sort so that swaps and loads are just a little faster * in that *very* common case. */ - BucketedSort.ExtraData values = metricSources.size() == 1 ? metrics.values[0] : metrics; + BucketedSort.ExtraData values = metrics.values.length == 1 ? metrics.values[0] : metrics; this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values); } @@ -77,7 +89,7 @@ public boolean hasMetric(String name) { throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]"); } for (MetricValues values : metrics.values) { - if (values.name().equals(name)) { + if (values.name.equals(name)) { return true; } } @@ -138,49 +150,11 @@ public void doClose() { Releasables.close(sort, metrics); } - /** - * Information about each metric that this {@link Aggregator} uses to - * load and format metric values. - */ - static class MetricSource { - private final String name; - private final DocValueFormat format; - private final ValuesSource.Numeric valuesSource; - - MetricSource(String name, DocValueFormat format, ValuesSource.Numeric valuesSource) { - this.name = name; - this.format = format; - this.valuesSource = valuesSource; - } - - String getName() { - return name; - } - - DocValueFormat getFormat() { - return format; - } - } - static class Metrics implements BucketedSort.ExtraData, Releasable { private final MetricValues[] values; - Metrics(int size, BigArrays bigArrays, List sources) { - values = new MetricValues[sources.size()]; - int i = 0; - for (MetricSource source : sources) { - values[i++] = valuesFor(size, bigArrays, source); - } - } - - private static MetricValues valuesFor(int size, BigArrays bigArrays, MetricSource source) { - if (source.valuesSource == null) { - return new AlwaysNullMetricValues(source); - } - if (source.valuesSource.isFloatingPoint()) { - return new DoubleMetricValues(size, bigArrays, source); - } - return new LongMetricValues(size, bigArrays, source); + Metrics(MetricValues[] values) { + this.values = values; } boolean needsScores() { @@ -194,7 +168,7 @@ boolean needsScores() { double metric(String name, long index) { for (MetricValues value : values) { - if (value.name().equals(name)) { + if (value.name.equals(name)) { return value.doubleValue(index); } } @@ -212,7 +186,7 @@ BucketedSort.ResultBuilder resultBuilder(DocValueF } List names() { - return Arrays.stream(values).map(MetricValues::name).collect(toList()); + return Arrays.stream(values).map(v -> v.name).collect(toList()); } @Override @@ -241,44 +215,72 @@ public void close() { } } - private abstract static class MetricValues implements BucketedSort.ExtraData, Releasable { - protected final MetricSource source; + @FunctionalInterface + interface MetricValuesSupplier { + MetricValues build(int size, BigArrays bigArrays, String name, ValuesSourceConfig config); + } - MetricValues(MetricSource source) { - this.source = source; - } + abstract static class MetricValues implements BucketedSort.ExtraData, Releasable { + protected final String name; - final String name() { - return source.name; + MetricValues(String name) { + this.name = name; } abstract boolean needsScores(); abstract double doubleValue(long index); - abstract InternalTopMetrics.MetricValue metricValue(long index); + abstract InternalTopMetrics.MetricValue metricValue(long index) throws IOException; } private abstract static class CollectingMetricValues extends MetricValues { protected final BigArrays bigArrays; + protected final ValuesSourceConfig config; - CollectingMetricValues(BigArrays bigArrays, MetricSource source) { - super(source); + CollectingMetricValues(BigArrays bigArrays, String name, ValuesSourceConfig config) { + super(name); this.bigArrays = bigArrays; + this.config = config; } @Override public final boolean needsScores() { - return source.valuesSource.needsScores(); + return config.getValuesSource().needsScores(); } } + + static MetricValues buildMetricValues( + ValuesSourceRegistry registry, + BigArrays bigArrays, + int size, + String name, + ValuesSourceConfig config + ) { + if (false == config.hasValues()) { + // `config` doesn't have the name if the + return new AlwaysNullMetricValues(name); + } + MetricValuesSupplier supplier = registry.getAggregator(REGISTRY_KEY, config); + return supplier.build(size, bigArrays, name, config); + } + + static MetricValues buildNumericMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) { + ValuesSource.Numeric numeric = (ValuesSource.Numeric) config.getValuesSource(); + if (numeric.isFloatingPoint()) { + return new DoubleMetricValues(size, bigArrays, name, config); + } + return new LongMetricValues(size, bigArrays, name, config); + } /** * Loads metrics for floating point numbers. */ static class DoubleMetricValues extends CollectingMetricValues { + private final ValuesSource.Numeric valuesSource; private DoubleArray values; - DoubleMetricValues(int size, BigArrays bigArrays, MetricSource source) { - super(bigArrays, source); + DoubleMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) { + super(bigArrays, name, config); + valuesSource = (ValuesSource.Numeric) config.getValuesSource(); values = bigArrays.newDoubleArray(size, false); } @@ -294,7 +296,7 @@ public MetricValue metricValue(long index) { // Use NaN as a sentinel for "missing" return null; } - return new MetricValue(source.format, SortValue.from(value)); + return new MetricValue(config.format(), SortValue.from(value)); } @Override @@ -307,7 +309,7 @@ public void swap(long lhs, long rhs) { @Override public Loader loader(LeafReaderContext ctx) throws IOException { // TODO allow configuration of value mode - NumericDoubleValues metricValues = MultiValueMode.AVG.select(source.valuesSource.doubleValues(ctx)); + NumericDoubleValues metricValues = MultiValueMode.AVG.select(valuesSource.doubleValues(ctx)); return (index, doc) -> { if (index >= values.size()) { values = bigArrays.grow(values, index + 1); @@ -328,6 +330,7 @@ public void close() { * Loads metrics for whole numbers. */ static class LongMetricValues extends CollectingMetricValues { + private final ValuesSource.Numeric valuesSource; /** * Tracks "missing" values in a {@link BitArray}. Unlike * {@link DoubleMetricValues}, we there isn't a sentinel value @@ -338,8 +341,9 @@ static class LongMetricValues extends CollectingMetricValues { private final MissingHelper empty; private LongArray values; - LongMetricValues(int size, BigArrays bigArrays, MetricSource source) { - super(bigArrays, source); + LongMetricValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) { + super(bigArrays, name, config); + valuesSource = (ValuesSource.Numeric) config.getValuesSource(); empty = new MissingHelper(bigArrays); values = bigArrays.newLongArray(size, false); } @@ -357,7 +361,7 @@ public MetricValue metricValue(long index) { if (empty.isEmpty(index)) { return null; } - return new MetricValue(source.format, SortValue.from(values.get(index))); + return new MetricValue(config.format(), SortValue.from(values.get(index))); } @Override @@ -371,7 +375,7 @@ public void swap(long lhs, long rhs) { @Override public Loader loader(LeafReaderContext ctx) throws IOException { // TODO allow configuration of value mode - NumericDocValues metricValues = MultiValueMode.AVG.select(source.valuesSource.longValues(ctx)); + NumericDocValues metricValues = MultiValueMode.AVG.select(valuesSource.longValues(ctx)); return (index, doc) -> { if (false == metricValues.advanceExact(doc)) { empty.markMissing(index); @@ -391,13 +395,77 @@ public void close() { } } + /** + * Loads metrics for whole numbers. + */ + static class GlobalOrdsValues extends CollectingMetricValues { + private final ValuesSource.Bytes.WithOrdinals valuesSource; + private SortedSetDocValues globalOrds; + private LongArray values; + + GlobalOrdsValues(int size, BigArrays bigArrays, String name, ValuesSourceConfig config) { + super(bigArrays, name, config); + if (false == config.hasGlobalOrdinals()) { + throw new IllegalArgumentException("top_metrics can only collect bytes that have global ordinals"); + } + this.valuesSource = (ValuesSource.Bytes.WithOrdinals) config.getValuesSource(); + values = bigArrays.newLongArray(size, false); + } + + @Override + public double doubleValue(long index) { + throw new IllegalArgumentException("pipeline aggregations may not refer to non-numeric metrics collected by top_metrics"); + } + + @Override + public MetricValue metricValue(long index) throws IOException { + if (globalOrds == null) { + // We didn't collect a single segment. + return null; + } + long ord = values.get(index); + if (ord == -1) { + return null; + } + return new MetricValue(config.format(), SortValue.from(BytesRef.deepCopyOf(globalOrds.lookupOrd(ord)))); + } + + @Override + public void swap(long lhs, long rhs) { + long tmp = values.get(lhs); + values.set(lhs, values.get(rhs)); + values.set(rhs, tmp); + } + + @Override + public Loader loader(LeafReaderContext ctx) throws IOException { + globalOrds = valuesSource.globalOrdinalsValues(ctx); + // For now just return the value that sorts first. + return (index, doc) -> { + if (false == globalOrds.advanceExact(doc)) { + values.set(index, -1); + return; + } + if (index >= values.size()) { + values = bigArrays.grow(values, index + 1); + } + values.set(index, globalOrds.nextOrd()); + }; + } + + @Override + public void close() { + Releasables.close(values); + } + } + /** * {@linkplain MetricValues} implementation for unmapped fields * that always returns {@code null} or {@code NaN}. */ static class AlwaysNullMetricValues extends MetricValues { - AlwaysNullMetricValues(MetricSource source) { - super(source); + AlwaysNullMetricValues(String name) { + super(name); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java index 2c200dc677783..bd642696d985d 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorFactory.java @@ -15,17 +15,17 @@ import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; -import org.elasticsearch.search.aggregations.support.ValueType; -import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValues; +import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValuesSupplier; import java.io.IOException; import java.util.List; import java.util.Map; -import static java.util.stream.Collectors.toList; +import static org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.REGISTRY_KEY; public class TopMetricsAggregatorFactory extends AggregatorFactory { /** @@ -58,14 +58,22 @@ protected TopMetricsAggregator createInternal(SearchContext searchContext, Aggre + "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey() + "] index level setting."); } - List metricSources = metricFields.stream().map(config -> { - ValuesSourceConfig resolved = ValuesSourceConfig.resolve( - context, ValueType.NUMERIC, - config.getFieldName(), config.getScript(), config.getMissing(), config.getTimeZone(), null, - CoreValuesSourceType.NUMERIC); - return new TopMetricsAggregator.MetricSource(config.getFieldName(), resolved.format(), - (ValuesSource.Numeric) resolved.getValuesSource()); - }).collect(toList()); - return new TopMetricsAggregator(name, searchContext, parent, metadata, size, sortBuilders.get(0), metricSources); + MetricValues[] metricValues = new MetricValues[metricFields.size()]; + for (int i = 0; i < metricFields.size(); i++) { + MultiValuesSourceFieldConfig config = metricFields.get(i); + ValuesSourceConfig vsConfig = ValuesSourceConfig.resolve( + context, + null, + config.getFieldName(), + config.getScript(), + config.getMissing(), + config.getTimeZone(), + null, + CoreValuesSourceType.NUMERIC + ); + MetricValuesSupplier supplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, vsConfig); + metricValues[i] = supplier.build(size, context.bigArrays(), config.getFieldName(), vsConfig); + } + return new TopMetricsAggregator(name, searchContext, parent, metadata, size, sortBuilders.get(0), metricValues); } } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorMetricsTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorMetricsTests.java index f9e2e34140f5d..4f4dbaa9950fe 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorMetricsTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorMetricsTests.java @@ -7,20 +7,29 @@ package org.elasticsearch.xpack.analytics.topmetrics; import org.apache.lucene.index.SortedNumericDocValues; -import org.elasticsearch.common.CheckedBiConsumer; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.FieldContext; import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; import org.elasticsearch.search.sort.SortValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue; import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.TopMetric; -import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricSource; +import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricValues; import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.Metrics; import java.io.IOException; -import java.util.ArrayList; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -38,29 +47,50 @@ import static org.mockito.Mockito.when; public class TopMetricsAggregatorMetricsTests extends ESTestCase { - public void testUnmapped() throws IOException { - withMetric(null, (m, source) -> { + private static final ValuesSourceRegistry REGISTRY; + static { + ValuesSourceRegistry.Builder registry = new ValuesSourceRegistry.Builder(); + TopMetricsAggregationBuilder.registerAggregators(registry); + REGISTRY = registry.build(); + } + + public void testNoNumbers() throws IOException { + assertNoValues(toConfig(null, CoreValuesSourceType.NUMERIC, DocValueFormat.RAW, false)); + } + + public void testNoDates() throws IOException { + assertNoValues(toConfig(null, CoreValuesSourceType.DATE, DocValueFormat.RAW, false)); + } + + public void testNoStrings() throws IOException { + assertNoValues(toConfig(null, CoreValuesSourceType.BYTES, DocValueFormat.RAW, false)); + } + + private void assertNoValues(ValuesSourceConfig config) throws IOException { + withMetric(config, m -> { // Load from doc is a noop m.loader(null).loadFromDoc(0, randomInt()); - assertNullMetric(m, source, randomInt()); + assertNullMetric(m, config, randomInt(), true); }); } public void testEmptyLong() throws IOException { SortedNumericDocValues values = mock(SortedNumericDocValues.class); when(values.advanceExact(0)).thenReturn(false); - withMetric(valuesSource(values), (m, source) -> { + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> { m.loader(null).loadFromDoc(0, 0); - assertNullMetric(m, source, 0); + assertNullMetric(m, config, 0, true); }); } public void testEmptyDouble() throws IOException { SortedNumericDoubleValues values = mock(SortedNumericDoubleValues.class); when(values.advanceExact(0)).thenReturn(false); - withMetric(valuesSource(values), (m, source) -> { + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> { m.loader(null).loadFromDoc(0, 0); - assertNullMetric(m, source, 0); + assertNullMetric(m, config, 0, true); }); } @@ -70,9 +100,10 @@ public void testLoadLong() throws IOException { when(values.advanceExact(0)).thenReturn(true); when(values.docValueCount()).thenReturn(1); when(values.nextValue()).thenReturn(value); - withMetric(valuesSource(values), (m, source) -> { + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> { m.loader(null).loadFromDoc(0, 0); - assertMetricValue(m, 0, source, SortValue.from(value)); + assertMetricValue(m, 0, config, SortValue.from(value), true); }); } @@ -82,9 +113,24 @@ public void testLoadDouble() throws IOException { when(values.advanceExact(0)).thenReturn(true); when(values.docValueCount()).thenReturn(1); when(values.nextValue()).thenReturn(value); - withMetric(valuesSource(values), (m, source) -> { + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> { m.loader(null).loadFromDoc(0, 0); - assertMetricValue(m, 0, source, SortValue.from(value)); + assertMetricValue(m, 0, config, SortValue.from(value), true); + }); + } + + public void testLoadString() throws IOException { + BytesRef value = new BytesRef(randomAlphaOfLength(5)); + SortedSetDocValues values = mock(SortedSetDocValues.class); + when(values.advanceExact(0)).thenReturn(true); + when(values.getValueCount()).thenReturn(1L); + when(values.nextOrd()).thenReturn(0L); + when(values.lookupOrd(0L)).thenReturn(value); + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> { + m.loader(null).loadFromDoc(0, 0); + assertMetricValue(m, 0, config, SortValue.from(value), false); }); } @@ -96,9 +142,8 @@ public void testLoadAndSwapLong() throws IOException { when(values.advanceExact(1)).thenReturn(true); when(values.docValueCount()).thenReturn(1); when(values.nextValue()).thenReturn(firstValue, secondValue); - withMetric(valuesSource(values), (m, source) -> { - assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue)); - }); + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), true)); } public void testLoadAndSwapDouble() throws IOException { @@ -109,105 +154,162 @@ public void testLoadAndSwapDouble() throws IOException { when(values.advanceExact(1)).thenReturn(true); when(values.docValueCount()).thenReturn(1); when(values.nextValue()).thenReturn(firstValue, secondValue); - withMetric(valuesSource(values), (m, source) -> { - assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue)); - }); + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), true)); + } + + public void testLoadAndSwapString() throws IOException { + BytesRef firstValue = new BytesRef(randomAlphaOfLength(5)); + BytesRef secondValue = new BytesRef(randomAlphaOfLength(5)); + SortedSetDocValues values = mock(SortedSetDocValues.class); + when(values.advanceExact(0)).thenReturn(true); + when(values.advanceExact(1)).thenReturn(true); + when(values.getValueCount()).thenReturn(1L); + when(values.nextOrd()).thenReturn(0L, 1L); + when(values.lookupOrd(0L)).thenReturn(firstValue); + when(values.lookupOrd(1L)).thenReturn(secondValue); + ValuesSourceConfig config = toConfig(values); + withMetric(config, m -> assertLoadTwoAndSwap(m, config, SortValue.from(firstValue), SortValue.from(secondValue), false)); } public void testManyValues() throws IOException { long[] values = IntStream.range(0, between(2, 100)).mapToLong(i -> randomLong()).toArray(); - List valuesSources = Arrays.stream(values) - .mapToObj(v -> { - try { - SortedNumericDocValues docValues = mock(SortedNumericDocValues.class); - when(docValues.advanceExact(0)).thenReturn(true); - when(docValues.docValueCount()).thenReturn(1); - when(docValues.nextValue()).thenReturn(v); - return valuesSource(docValues); - } catch (IOException e) { - throw new AssertionError(e); - } - }) - .collect(toList()); - withMetrics(valuesSources, (m, sources) -> { + List configs = Arrays.stream(values).mapToObj(v -> { + try { + SortedNumericDocValues docValues = mock(SortedNumericDocValues.class); + when(docValues.advanceExact(0)).thenReturn(true); + when(docValues.docValueCount()).thenReturn(1); + when(docValues.nextValue()).thenReturn(v); + return toConfig(docValues); + } catch (IOException e) { + throw new AssertionError(e); + } + }).collect(toList()); + withMetrics(configs, m -> { m.loader(null).loadFromDoc(0, 0); TopMetric metric = m.resultBuilder(DocValueFormat.RAW).build(0, SortValue.from(1)); assertThat(metric.getMetricValues(), hasSize(values.length)); for (int i = 0; i < values.length; i++) { - MetricSource source = sources.get(i); - assertThat(m.metric(source.getName(), 0), equalTo((double) values[i])); - assertThat(metric.getMetricValues(), - hasItem(new MetricValue(source.getFormat(), SortValue.from(values[i])))); + ValuesSourceConfig config = configs.get(i); + assertThat(m.metric(config.fieldContext().field(), 0), equalTo((double) values[i])); + assertThat(metric.getMetricValues(), hasItem(new MetricValue(config.format(), SortValue.from(values[i])))); } }); } - private ValuesSource.Numeric valuesSource(SortedNumericDocValues values) throws IOException { + private ValuesSourceConfig toConfig(SortedNumericDocValues values) throws IOException { ValuesSource.Numeric source = mock(ValuesSource.Numeric.class); when(source.isFloatingPoint()).thenReturn(false); when(source.longValues(null)).thenReturn(values); - return source; + if (randomBoolean()) { + return toConfig(source, CoreValuesSourceType.NUMERIC, randomWholeNumberDocValuesFormat(), true); + } + DocValueFormat dateFormatter = new DocValueFormat.DateTime( + DateFormatter.forPattern(randomDateFormatterPattern()), + ZoneOffset.UTC, + DateFieldMapper.Resolution.MILLISECONDS + ); + return toConfig(source, CoreValuesSourceType.DATE, randomFrom(DocValueFormat.RAW, dateFormatter), true); } - private ValuesSource.Numeric valuesSource(SortedNumericDoubleValues values) throws IOException { + private ValuesSourceConfig toConfig(SortedNumericDoubleValues values) throws IOException { ValuesSource.Numeric source = mock(ValuesSource.Numeric.class); when(source.isFloatingPoint()).thenReturn(true); when(source.doubleValues(null)).thenReturn(values); - return source; + return toConfig( + source, + CoreValuesSourceType.NUMERIC, + randomFrom(DocValueFormat.RAW, new DocValueFormat.Decimal("####.####")), + true + ); } - private void withMetric(ValuesSource.Numeric valuesSource, - CheckedBiConsumer consumer) throws IOException { - withMetrics(singletonList(valuesSource), (m, sources) -> consumer.accept(m, sources.get(0))); + private ValuesSourceConfig toConfig(SortedSetDocValues values) throws IOException { + ValuesSource.Bytes.WithOrdinals source = mock(ValuesSource.Bytes.WithOrdinals.class); + when(source.globalOrdinalsValues(null)).thenReturn(values); + ValuesSourceConfig config = toConfig(source, CoreValuesSourceType.BYTES, DocValueFormat.RAW, true); + when(config.hasGlobalOrdinals()).thenReturn(true); + return config; } - private void withMetrics(List valuesSources, - CheckedBiConsumer, IOException> consumer) throws IOException { - Set names = new HashSet<>(); - List sources = new ArrayList<>(valuesSources.size()); - for (ValuesSource.Numeric valuesSource : valuesSources) { - String name = randomValueOtherThanMany(names::contains, () -> randomAlphaOfLength(5)); - names.add(name); - sources.add(new MetricSource(name, randomDocValueFormat(), valuesSource)); + private final Set names = new HashSet<>(); + + private ValuesSourceConfig toConfig(ValuesSource source, ValuesSourceType type, DocValueFormat format, boolean hasValues) { + String name = randomValueOtherThanMany(names::contains, () -> randomAlphaOfLength(5)); + names.add(name); + ValuesSourceConfig config = mock(ValuesSourceConfig.class); + when(config.fieldContext()).thenReturn(new FieldContext(name, null, null)); + when(config.valueSourceType()).thenReturn(type); + when(config.format()).thenReturn(format); + when(config.getValuesSource()).thenReturn(source); + when(config.hasValues()).thenReturn(hasValues); + return config; + } + + private void withMetric(ValuesSourceConfig config, CheckedConsumer consumer) throws IOException { + withMetrics(singletonList(config), consumer); + } + + private void withMetrics(List configs, CheckedConsumer consumer) throws IOException { + MetricValues[] values = new MetricValues[configs.size()]; + for (int i = 0; i < configs.size(); i++) { + values[i] = TopMetricsAggregator.buildMetricValues( + REGISTRY, + BigArrays.NON_RECYCLING_INSTANCE, + 1, + configs.get(i).fieldContext().field(), + configs.get(i) + ); } - try (Metrics m = new Metrics(1, BigArrays.NON_RECYCLING_INSTANCE, sources)) { - consumer.accept(m, sources); + try (Metrics m = new Metrics(values)) { + consumer.accept(m); } } - private void assertNullMetric(Metrics m, MetricSource source, long index) { - DocValueFormat sortFormat = randomDocValueFormat(); - assertThat(m.metric(source.getName(), index), notANumber()); + private void assertNullMetric(Metrics m, ValuesSourceConfig config, long index, boolean assertSortValue) throws IOException { + if (assertSortValue) { + assertThat(m.metric(config.fieldContext().field(), index), notANumber()); + } + DocValueFormat sortFormat = randomWholeNumberDocValuesFormat(); TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1)); assertThat(metric.getSortFormat(), sameInstance(sortFormat)); assertThat(metric.getMetricValues(), equalTo(singletonList(null))); } - private void assertMetricValue(Metrics m, long index, MetricSource source, SortValue value) { - DocValueFormat sortFormat = randomDocValueFormat(); - assertThat(m.metric(source.getName(), index), equalTo(value.numberValue().doubleValue())); + private void assertMetricValue(Metrics m, long index, ValuesSourceConfig config, SortValue value, boolean assertSortValue) + throws IOException { + if (assertSortValue) { + assertThat(m.metric(config.fieldContext().field(), index), equalTo(value.numberValue().doubleValue())); + } + DocValueFormat sortFormat = randomWholeNumberDocValuesFormat(); TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1)); assertThat(metric.getSortValue(), equalTo(SortValue.from(1))); assertThat(metric.getSortFormat(), sameInstance(sortFormat)); - assertThat(metric.getMetricValues(), equalTo(singletonList(new MetricValue(source.getFormat(), value)))); + assertThat(metric.getMetricValues(), equalTo(singletonList(new MetricValue(config.format(), value)))); } - private void assertLoadTwoAndSwap(Metrics m, MetricSource source, SortValue firstValue, SortValue secondValue) throws IOException { + private void assertLoadTwoAndSwap( + Metrics m, + ValuesSourceConfig config, + SortValue firstValue, + SortValue secondValue, + boolean assertSortValue + ) throws IOException { m.loader(null).loadFromDoc(0, 0); m.loader(null).loadFromDoc(1, 1); - assertMetricValue(m, 0, source, firstValue); - assertMetricValue(m, 1, source, secondValue); + assertMetricValue(m, 0, config, firstValue, assertSortValue); + assertMetricValue(m, 1, config, secondValue, assertSortValue); m.swap(0, 1); - assertMetricValue(m, 0, source, secondValue); - assertMetricValue(m, 1, source, firstValue); + assertMetricValue(m, 0, config, secondValue, assertSortValue); + assertMetricValue(m, 1, config, firstValue, assertSortValue); m.loader(null).loadFromDoc(2, 2); // 2 is empty - assertNullMetric(m, source, 2); + assertNullMetric(m, config, 2, assertSortValue); m.swap(0, 2); - assertNullMetric(m, source, 0); - assertMetricValue(m, 2, source, secondValue); + assertNullMetric(m, config, 0, assertSortValue); + assertMetricValue(m, 2, config, secondValue, assertSortValue); } - private DocValueFormat randomDocValueFormat() { - return randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN); + private DocValueFormat randomWholeNumberDocValuesFormat() { + return randomFrom(DocValueFormat.RAW, new DocValueFormat.Decimal("####")); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml index 3d77e98912be8..dd2305c858e24 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/top_metrics.yml @@ -550,3 +550,36 @@ - match: { aggregations.tm.top.1.sort: [2] } - match: { aggregations.tm.top.2.metrics.v: 3.1414999961853027 } - match: { aggregations.tm.top.2.sort: [1] } + +--- +"fetch keyword": + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{"s": 1, "animal": "cat"}' + - '{"index": {}}' + - '{"s": 2, "animal": "dog"}' + - '{"index": {}}' + - '{"s": 3, "animal": "chicken"}' + + - do: + search: + size: 0 + body: + aggs: + tm: + top_metrics: + metrics: + field: animal.keyword + sort: + s: asc + size: 3 + - match: { aggregations.tm.top.0.metrics.animal\.keyword: cat} + - match: { aggregations.tm.top.0.sort: [1] } + - match: { aggregations.tm.top.1.metrics.animal\.keyword: dog} + - match: { aggregations.tm.top.1.sort: [2] } + - match: { aggregations.tm.top.2.metrics.animal\.keyword: chicken} + - match: { aggregations.tm.top.2.sort: [3] }