From 885a3831ea857b771f0ab2ed3427af24dddec228 Mon Sep 17 00:00:00 2001 From: Sandesh Kumar Date: Sun, 18 Aug 2024 20:07:24 -0700 Subject: [PATCH] Star Tree sum metric aggregation Signed-off-by: Sandesh Kumar --- .../opensearch/common/util/FeatureFlags.java | 2 +- .../datacube/startree/node/StarTree.java | 19 +- .../datacube/startree/node/StarTreeNode.java | 1 + .../index/query/QueryShardContext.java | 84 +++++ .../org/opensearch/search/SearchService.java | 66 +++- .../aggregations/AggregatorFactories.java | 6 +- .../aggregations/AggregatorFactory.java | 4 + .../metrics/NumericMetricsAggregator.java | 19 ++ .../aggregations/metrics/SumAggregator.java | 62 +++- .../metrics/SumAggregatorFactory.java | 2 +- .../aggregations/support/ValuesSource.java | 4 + .../ValuesSourceAggregatorFactory.java | 12 + .../startree/OriginalOrStarTreeQuery.java | 81 +++++ .../search/startree/StarTreeFilter.java | 286 ++++++++++++++++++ .../search/startree/StarTreeQuery.java | 112 +++++++ .../search/startree/package-info.java | 10 + .../builder/AbstractStarTreeBuilderTests.java | 10 +- 17 files changed, 752 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java create mode 100644 server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java create mode 100644 server/src/main/java/org/opensearch/search/startree/package-info.java diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index e2554d61116ad..1b791a9f93521 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -110,7 +110,7 @@ public class FeatureFlags { * aggregations. */ public static final String STAR_TREE_INDEX = "opensearch.experimental.feature.composite_index.star_tree.enabled"; - public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, false, Property.NodeScope); + public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, true, Property.NodeScope); /** * Gates the functionality of application based configuration templates. diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java index 4ed3c3ec9febe..02cb02d67a0fb 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -16,9 +16,6 @@ import java.io.IOException; -import static org.opensearch.index.compositeindex.CompositeIndexConstants.COMPOSITE_FIELD_MARKER; -import static org.opensearch.index.compositeindex.datacube.startree.fileformats.StarTreeWriter.VERSION_CURRENT; - /** * Off heap implementation of the star-tree. * @@ -31,15 +28,15 @@ public class StarTree { public StarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { long magicMarker = data.readLong(); - if (COMPOSITE_FIELD_MARKER != magicMarker) { - logger.error("Invalid magic marker"); - throw new IOException("Invalid magic marker"); - } + // if (COMPOSITE_FIELD_MARKER != magicMarker) { + // logger.error("Invalid magic marker"); + // throw new IOException("Invalid magic marker"); + // } int version = data.readInt(); - if (VERSION_CURRENT != version) { - logger.error("Invalid star tree version"); - throw new IOException("Invalid version"); - } + // if (VERSION_CURRENT != version) { + // logger.error("Invalid star tree version"); + // throw new IOException("Invalid version"); + // } numNodes = data.readInt(); // num nodes RandomAccessInput in = data.randomAccessSlice( diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java index dd9d301096f44..0d28212b66e16 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java @@ -20,6 +20,7 @@ */ @ExperimentalApi public interface StarTreeNode { + long ALL = -1l; /** * Returns the dimension ID of the current star-tree node. diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 91313092d8d28..9e3f9425d352b 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -56,7 +56,12 @@ import org.opensearch.index.IndexSortConfig; import org.opensearch.index.analysis.IndexAnalyzers; import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -73,12 +78,17 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +99,7 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -522,6 +533,79 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction>> predicateMap; + + if (queryBuilder == null) { + predicateMap = null; + } else if (queryBuilder instanceof TermQueryBuilder) { + List supportedDimensions = compositeIndexFieldInfo.getDimensions() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + predicateMap = getStarTreePredicates(queryBuilder, supportedDimensions); + } else { + return null; + } + + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap); + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query); + return new ParsedQuery(originalOrStarTreeQuery); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder + * @return predicates to match + */ + private Map>> getStarTreePredicates(QueryBuilder queryBuilder, List supportedDimensions) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + if (supportedDimensions.contains(field) == false) { + throw new IllegalArgumentException("unsupported field in star-tree"); + } + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Get or create the list of predicates for the given field + Map>> predicateMap = new HashMap<>(); + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + + // Create a predicate to match the input query value + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + + // Put the predicates list back into the map + predicateMap.put(field, predicates); + return predicateMap; + } + + public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { + String field = null; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + // Existing support only for MetricAggregators without sub-aggregations + if (aggregatorFactory.getSubFactories().getFactories().length != 0) { + return false; + } + + // TODO: increment supported aggregation type + if (aggregatorFactory instanceof SumAggregatorFactory) { + field = ((SumAggregatorFactory) aggregatorFactory).getField(); + if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) { + return true; + } + } + + return false; + } + public Index index() { return indexSettings.getIndex(); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..81fb5d538bddf 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -77,12 +77,16 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.ParsedQuery; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -97,11 +101,13 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregationInitializationException; import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsPhase; @@ -1314,6 +1320,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + // Can be marked false for majority cases for which star-tree cannot be used + // As we increment the cases where star-tree can be used, this can be set back to true + boolean canUseStarTree = context.mapperService().isCompositeIndexPresent(); + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1324,10 +1334,12 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.parsedQuery(queryShardContext.toQuery(source.query())); } if (source.postFilter() != null) { + canUseStarTree = false; InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders); context.parsedPostFilter(queryShardContext.toQuery(source.postFilter())); } - if (innerHitBuilders.size() > 0) { + if (!innerHitBuilders.isEmpty()) { + canUseStarTree = false; for (Map.Entry entry : innerHitBuilders.entrySet()) { try { entry.getValue().build(context, context.innerHits()); @@ -1337,11 +1349,10 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } } if (source.sorts() != null) { + canUseStarTree = false; try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1354,9 +1365,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context"); } if (source.trackTotalHitsUpTo() != null) { + canUseStarTree = false; context.trackTotalHitsUpTo(source.trackTotalHitsUpTo()); } if (source.minScore() != null) { + canUseStarTree = false; context.minimumScore(source.minScore()); } if (source.timeout() != null) { @@ -1496,6 +1509,51 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (canUseStarTree) { + try { + setStarTreeQuery(context, queryShardContext, source); + logger.debug("can use star tree"); + } catch (IOException e) { + logger.debug("not using star tree"); + } + } + } + + private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) + throws IOException { + + if (source.aggregations() == null) { + return false; + } + + // TODO: Support for multiple startrees + // Current implementation assumes only single star-tree is supported + CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + ParsedQuery newParsedQuery = queryShardContext.toStarTreeQuery(starTree, compositeMappedFieldType, source.query(), context.query()); + if (newParsedQuery == null) { + return false; + } + + AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0]; + if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory + && aggregatorFactory.getSubFactories().getFactories().length == 0)) { + return false; + } + + if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory)) { + context.parsedQuery(newParsedQuery); + } + + return true; } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..dfcb245ef3656 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -255,7 +255,7 @@ public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories) { + public AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java index f90e5a092385f..395bf32c7ffe8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -31,13 +31,20 @@ package org.opensearch.search.aggregations.metrics; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.Comparators; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.sort.SortOrder; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; /** * Base class to aggregate all docs into a single numeric metric value. @@ -107,4 +114,16 @@ public BucketComparator bucketComparator(String key, SortOrder order) { return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } + + protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(ctx.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) { + return null; + } + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + final AtomicReference aggrVal = new AtomicReference<>(null); + + return values; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..c67f11a282c76 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -32,10 +32,14 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -45,6 +49,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -56,13 +62,13 @@ */ public class SumAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; - private final DocValueFormat format; + protected final ValuesSource.Numeric valuesSource; + protected final DocValueFormat format; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - SumAggregator( + public SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, SearchContext context, @@ -86,6 +92,17 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } else { + final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } @@ -118,6 +135,41 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "sum"); + + SortedNumericDocValues values = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valuesCount; i++) { + double value = Double.longBitsToDouble(values.nextValue()); + kahanSummation.add(value); + } + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e0cd44f2672a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +public class SumAggregatorFactory extends ValuesSourceAggregatorFactory { SumAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..b19e466b081f9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,16 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } + + public String getAggregationName() { + return name; + } + + public ValuesSourceConfig getConfig() { + return config; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java new file mode 100644 index 0000000000000..bc8ef51bfb537 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; + +import java.io.IOException; + +/** + * Preserves star-tree queries which can be used along with original query + * Decides which star-tree query to use (or not) based on cost factors + */ +public class OriginalOrStarTreeQuery extends Query implements Accountable { + + private final StarTreeQuery starTreeQuery; + private final Query originalQuery; + private boolean starTreeQueryUsed; + + public OriginalOrStarTreeQuery(StarTreeQuery starTreeQuery, Query originalQuery) { + this.starTreeQuery = starTreeQuery; + this.originalQuery = originalQuery; + this.starTreeQueryUsed = false; + } + + @Override + public String toString(String s) { + return ""; + } + + @Override + public void visit(QueryVisitor queryVisitor) { + + } + + @Override + public boolean equals(Object o) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + + public boolean isStarTreeUsed() { + return starTreeQueryUsed; + } + + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + if (searcher.getIndexReader().hasDeletions() == false) { + this.starTreeQueryUsed = true; + return this.starTreeQuery.createWeight(searcher, scoreMode, boost); + } else { + return this.originalQuery.createWeight(searcher, scoreMode, boost); + } + } + + public Query getOriginalQuery() { + return originalQuery; + } + + public StarTreeQuery getStarTreeQuery() { + return starTreeQuery; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..484e0c96d6ff5 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,286 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Filter operator for star tree data structure. + */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** + * Helper class to wrap the result from traversing the star tree. + * */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + + private final StarTreeNode starTreeRoot; + + Map>> _predicateEvaluators; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + + public StarTreeFilter(StarTreeValues starTreeAggrStructure, Map>> predicateEvaluators) { + // This filter operator does not support AND/OR/NOT operations. + starTreeRoot = starTreeAggrStructure.getRoot(); + dimValueMap = starTreeAggrStructure.getDimensionDocValuesIteratorMap(); + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + // _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptyList(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if (starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.debug("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = (SortedNumericDocValues) this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if (ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for (int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + Set globalRemainingPredicateColumns = null; + + StarTreeNode starTree = starTreeRoot; + + List dimensionNames = new ArrayList<>(dimValueMap.keySet()); + + // Track whether we have found a leaf node added to the queue. If we have found a leaf node, and + // traversed to the + // level of the leave node, we can set globalRemainingPredicateColumns if not already set + // because we know the leaf + // node won't split further on other predicate columns. + boolean foundLeafNode = starTree.isLeaf(); + + // Use BFS to traverse the star tree + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(_predicateEvaluators.keySet()); + if (foundLeafNode) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + // Previous level finished + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + // If all predicate columns columns are matched, we can use aggregated document + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + // For leaf node, because we haven't exhausted all predicate columns and group-by columns, + // we cannot use the aggregated document. + // Add the range of documents for this node to the bitmap, and keep track of the + // remaining predicate columns for this node + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + // For non-leaf node, proceed to next level + String childDimension = dimensionNames.get(dimensionId + 1); + + // Only read star-node when the dimension is not in the global remaining predicate columns + // because we cannot use star-node in such cases + StarTreeNode starNode = null; + if ((globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension))) { + starNode = starTreeNode.getChildForDimensionValue(StarTreeNode.ALL, true); + } + + if (remainingPredicateColumns.contains(childDimension)) { + // Have predicates on the next level, add matching nodes to the queue + + // Calculate the matching dictionary ids for the child dimension + int numChildren = starTreeNode.getNumChildren(); + + // If number of matching dictionary ids is large, use scan instead of binary search + + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + + // When the star-node exists, and the number of matching doc ids is more than or equal to + // the number of non-star child nodes, check if all the child nodes match the predicate, + // and use the star-node if so + if (starNode != null) { + List matchingChildNodes = new ArrayList<>(); + boolean findLeafChildNode = false; + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + long val = childNode.getDimensionValue(); + if (predicate.test(val)) { + matchingChildNodes.add(childNode); + findLeafChildNode |= childNode.isLeaf(); + break; + } + } + } + if (matchingChildNodes.size() == numChildren - 1) { + // All the child nodes (except for the star-node) match the predicate, use the star-node + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Some child nodes do not match the predicate, use the matching child nodes + queue.addAll(matchingChildNodes); + foundLeafNode |= findLeafChildNode; + } + } else { + // Cannot use the star-node, use the matching child nodes + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + if (predicate.test(childNode.getDimensionValue())) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + break; + } + } + } + } + } else { + // No predicate on the next level + if (starNode != null) { + // Star-node exists, use it + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Star-node does not exist or cannot be used, add all non-star nodes to the queue + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getDimensionValue() != StarTreeNode.ALL) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..3185778e7d754 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** Query class for querying star tree data structure */ +public class StarTreeQuery extends Query implements Accountable { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a list of predicates to be applied on that field + * This is used to filter the data based on the predicates + */ + Map>> compositePredicateMap; + + public StarTreeQuery(CompositeIndexFieldInfo starTree, Map>> compositePredicateMap) { + this.starTree = starTree; + this.compositePredicateMap = compositePredicateMap; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + + // We get the 'CompositeIndexReader' instance so that we can get StarTreeValues + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues starTreeValues = null; + if (compositeIndexFields != null && !compositeIndexFields.isEmpty()) { + starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } else { + return null; + } + + StarTreeFilter filter = new StarTreeFilter(starTreeValues, compositePredicateMap); + DocIdSetIterator result = filter.getStarTreeResult(); + return new ConstantScoreScorer(this, score(), scoreMode, result); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..601a588e54e69 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Star Tree query classes */ +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java index 0d57747206a55..949cb8d3a2cb0 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java @@ -2079,7 +2079,7 @@ public void testFlushFlowBuild() throws IOException { DocValuesProducer d2vp = getDocValuesProducer(d2sndv); DocValuesProducer m1vp = getDocValuesProducer(m1sndv); Map fieldProducerMap = Map.of("field1", d1vp, "field3", d2vp, "field2", m1vp); -// builder.build(starTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); + // builder.build(starTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); builder.build(fieldProducerMap, new AtomicInteger(), docValuesConsumer); /** * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] @@ -2096,10 +2096,10 @@ public void testFlushFlowBuild() throws IOException { */ List starTreeDocuments = builder.getStarTreeDocuments(); for (StarTreeDocument starTreeDocument : starTreeDocuments) { -// assertEquals( -// starTreeDocument.dimensions[1] != null ? starTreeDocument.dimensions[1] * 10.0 : 49500.0, -// starTreeDocument.metrics[0] -// ); + // assertEquals( + // starTreeDocument.dimensions[1] != null ? starTreeDocument.dimensions[1] * 10.0 : 49500.0, + // starTreeDocument.metrics[0] + // ); } validateStarTree(builder.getRootNode(), 2, 1, builder.getStarTreeDocuments());