From 9040f6f0440d3cecbd86c7c08bdcff36f2389845 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 7 Aug 2024 10:43:38 -0700 Subject: [PATCH] remove circular dependency Signed-off-by: bowenlan-amzn --- .../filterrewrite/AggregatorBridge.java | 24 ++++++++++++------- .../DateHistogramAggregatorBridge.java | 24 ++++++++++++------- .../FilterRewriteOptimizationContext.java | 11 ++++----- .../filterrewrite/RangeAggregatorBridge.java | 14 +++++++---- 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java index ba8859625faa3..46494157db6dd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/AggregatorBridge.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * This interface provides a bridge between an aggregator and the optimization context, allowing @@ -31,18 +32,17 @@ */ public abstract class AggregatorBridge { - /** - * The optimization context associated with this aggregator bridge. - */ - FilterRewriteOptimizationContext filterRewriteOptimizationContext; - /** * The field type associated with this aggregator bridge. */ MappedFieldType fieldType; - void setOptimizationContext(FilterRewriteOptimizationContext context) { - this.filterRewriteOptimizationContext = context; + Consumer setRanges; + BiConsumer setRangesFromSegment; + + void setRangesConsumer(Consumer setRanges, BiConsumer setRangesFromSegment) { + this.setRanges = setRanges; + this.setRangesFromSegment = setRangesFromSegment; } /** @@ -72,7 +72,13 @@ void setOptimizationContext(FilterRewriteOptimizationContext context) { * * @param values the point values (index structure for numeric values) for a segment * @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket - * @param leafOrd + * @param consumeDebugInfo + * @param ranges */ - protected abstract void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException; + abstract void tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Consumer consumeDebugInfo, + Ranges ranges + ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java index ca3df11b3596c..26a90c7e6af90 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.OptionalLong; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse; @@ -32,6 +33,8 @@ */ public abstract class DateHistogramAggregatorBridge extends AggregatorBridge { + int maxRewriteFilters; + protected boolean canOptimize(ValuesSourceConfig config) { if (config.script() == null && config.missing() == null) { MappedFieldType fieldType = config.fieldType(); @@ -47,16 +50,17 @@ protected boolean canOptimize(ValuesSourceConfig config) { protected void buildRanges(SearchContext context) throws IOException { long[] bounds = Helper.getDateHistoAggBounds(context, fieldType.name()); - filterRewriteOptimizationContext.setRanges(buildRanges(bounds)); + this.maxRewriteFilters = context.maxAggRewriteFilters(); + setRanges.accept(buildRanges(bounds, maxRewriteFilters)); } @Override protected void prepareFromSegment(LeafReaderContext leaf) throws IOException { long[] bounds = Helper.getSegmentBounds(leaf, fieldType.name()); - filterRewriteOptimizationContext.setRangesFromSegment(leaf.ord, buildRanges(bounds)); + setRangesFromSegment.accept(leaf.ord, buildRanges(bounds, maxRewriteFilters)); } - private Ranges buildRanges(long[] bounds) { + private Ranges buildRanges(long[] bounds, int maxRewriteFilters) { bounds = processHardBounds(bounds); if (bounds == null) { return null; @@ -79,7 +83,7 @@ private Ranges buildRanges(long[] bounds) { getRoundingPrepared(), bounds[0], bounds[1], - filterRewriteOptimizationContext.maxAggRewriteFilters + maxRewriteFilters ); } @@ -123,11 +127,15 @@ protected int getSize() { } @Override - protected final void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException { + final void tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Consumer consumeDebugInfo, + Ranges ranges + ) throws IOException { int size = getSize(); DateFieldMapper.DateFieldType fieldType = getFieldType(); - Ranges ranges = filterRewriteOptimizationContext.getRanges(leafOrd); BiConsumer incrementFunc = (activeIndex, docCount) -> { long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0); rangeStart = fieldType.convertNanosToMillis(rangeStart); @@ -135,9 +143,7 @@ protected final void tryOptimize(PointValues values, BiConsumer incr incrementDocCount.accept(bucketOrd, (long) docCount); }; - filterRewriteOptimizationContext.consumeDebugInfo( - multiRangesTraverse(values.getPointTree(), filterRewriteOptimizationContext.getRanges(leafOrd), incrementFunc, size) - ); + consumeDebugInfo.accept(multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size)); } private static long getBucketOrd(long bucketOrd) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java index 8811346b24b77..c3e22db1bc6ac 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteOptimizationContext.java @@ -39,7 +39,6 @@ public final class FilterRewriteOptimizationContext { private boolean preparedAtShardLevel = false; private final AggregatorBridge aggregatorBridge; - int maxAggRewriteFilters; private String shardId; private Ranges ranges; @@ -72,8 +71,8 @@ private boolean canOptimize(final Object parent, final int subAggLength, SearchC boolean canOptimize = aggregatorBridge.canOptimize(); if (canOptimize) { - aggregatorBridge.setOptimizationContext(this); - this.maxAggRewriteFilters = context.maxAggRewriteFilters(); + aggregatorBridge.setRangesConsumer(this::setRanges, this::setRangesFromSegment); + this.shardId = context.indexShard().shardId().toString(); assert ranges == null : "Ranges should only be built once at shard level, but they are already built"; @@ -139,7 +138,7 @@ public boolean tryOptimize(final LeafReaderContext leafCtx, final BiConsumer add) { if (optimizedSegments > 0) { add.accept("optimized_segments", optimizedSegments); add.accept("unoptimized_segments", segments - optimizedSegments); - add.accept("leaf_node_visited", leafNodeVisited); - add.accept("inner_node_visited", innerNodeVisited); + add.accept("leaf_visited", leafNodeVisited); + add.accept("inner_visited", innerNodeVisited); } } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java index ab3e6e6b89a69..8a7fc5306c4bf 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/RangeAggregatorBridge.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse; @@ -65,7 +66,7 @@ protected void buildRanges(RangeAggregator.Range[] ranges) { uppers[i] = upper; } - filterRewriteOptimizationContext.setRanges(new Ranges(lowers, uppers)); + setRanges.accept(new Ranges(lowers, uppers)); } @Override @@ -74,7 +75,12 @@ public void prepareFromSegment(LeafReaderContext leaf) { } @Override - protected final void tryOptimize(PointValues values, BiConsumer incrementDocCount, int leafOrd) throws IOException { + final void tryOptimize( + PointValues values, + BiConsumer incrementDocCount, + Consumer consumeDebugInfo, + Ranges ranges + ) throws IOException { int size = Integer.MAX_VALUE; BiConsumer incrementFunc = (activeIndex, docCount) -> { @@ -82,9 +88,7 @@ protected final void tryOptimize(PointValues values, BiConsumer incr incrementDocCount.accept(bucketOrd, (long) docCount); }; - filterRewriteOptimizationContext.consumeDebugInfo( - multiRangesTraverse(values.getPointTree(), filterRewriteOptimizationContext.getRanges(leafOrd), incrementFunc, size) - ); + consumeDebugInfo.accept(multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size)); } /**