Skip to content

Commit

Permalink
[Enhancement] optimize sort rewrite logic
Browse files Browse the repository at this point in the history
Rewrite sort as DSL by using first and last paramater in Composite aggregation.

Signed-off-by: penghuo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Mar 3, 2022
1 parent 0fd7f2f commit 52d3fd2
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 113 deletions.
43 changes: 4 additions & 39 deletions docs/user/optimization/optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ The Aggregation operator will merge into OpenSearch Aggregation::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
},
"children": []
}
Expand All @@ -313,49 +313,14 @@ The Sort operator will merge into OpenSearch Aggregation.::
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"last\",\"order\":\"desc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
},
"children": []
}
]
}
}

Because the OpenSearch Composite Aggregation order doesn't support separate NULL_FIRST/NULL_LAST option. only the default sort option (ASC NULL_FIRST/DESC NULL_LAST) will be supported for push down to OpenSearch Aggregation, otherwise it will fall back to the default memory based operator::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X POST localhost:9200/_plugins/_sql/_explain \
... -d '{"query" : "SELECT gender, avg(age) FROM accounts GROUP BY gender ORDER BY gender ASC NULLS LAST"}'
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[gender, avg(age)]"
},
"children": [
{
"name": "SortOperator",
"description": {
"sortList": {
"gender": {
"sortOrder": "ASC",
"nullOrder": "NULL_LAST"
}
}
},
"children": [
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
},
"children": []
}
]
}
]
}
}

Because the OpenSearch Composite Aggregation doesn't support order by metrics field, then if the sort list include fields which refer to metrics aggregation, then the sort operator can't be push down to OpenSearch Aggregation::

Expand Down Expand Up @@ -383,7 +348,7 @@ Because the OpenSearch Composite Aggregation doesn't support order by metrics fi
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
"request": "OpenSearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg(age)\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone=false)"
},
"children": []
}
Expand All @@ -408,4 +373,4 @@ At the moment there is no optimization to merge similar sort operators to avoid

Sort Push Down
--------------
Without sort push down optimization, the sort operator will sort the result from child operator. By default, only 200 docs will extracted from the source index, `you can change this value by using size_limit setting <../admin/settings.rst#opensearch-query-size-limit>`_.
Without sort push down optimization, the sort operator will sort the result from child operator. By default, only 200 docs will extracted from the source index, `you can change this value by using size_limit setting <../admin/settings.rst#opensearch-query-size-limit>`_.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
"request": "OpenSearchQueryRequest(indexName\u003dopensearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}],\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"state\":{\"terms\":{\"field\":\"state.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}},{\"city\":{\"terms\":{\"field\":\"city.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, searchDone\u003dfalse)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public MergeSortAndIndexAgg() {

this.pattern = typeOf(LogicalSort.class)
.matching(OptimizationRuleUtils::sortByFieldsOnly)
.matching(OptimizationRuleUtils::sortByDefaultOptionOnly)
.matching(sort -> {
sortRef.set(sort);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.List;
import java.util.Set;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
Expand All @@ -32,20 +31,6 @@ public static boolean sortByFieldsOnly(LogicalSort logicalSort) {
.reduce(true, Boolean::logicalAnd);
}

/**
* Does the sort list has option other than default options.
* The default sort options are (ASC NULL_FIRST) or (DESC NULL LAST)
*
* @param logicalSort LogicalSort.
* @return true sort list only option default options, otherwise false.
*/
public static boolean sortByDefaultOptionOnly(LogicalSort logicalSort) {
return logicalSort.getSortList().stream()
.map(sort -> Sort.SortOption.DEFAULT_ASC.equals(sort.getLeft())
|| Sort.SortOption.DEFAULT_DESC.equals(sort.getLeft()))
.reduce(true, Boolean::logicalAnd);
}

/**
* Find reference expression from expression.
* @param expressions a list of expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.data.type.ExprType;
Expand Down Expand Up @@ -92,7 +93,9 @@ public AggregationQueryBuilder(
bucketBuilder.build(
groupByList.stream()
.sorted(groupSortOrder)
.map(expr -> Pair.of(expr, groupSortOrder.apply(expr)))
.map(expr -> Triple.of(expr,
groupSortOrder.sortOrder(expr),
groupSortOrder.missingOrder(expr)))
.collect(Collectors.toList())))
.subAggregations(metrics.getLeft())
.size(AGGREGATION_BUCKET_SIZE)),
Expand All @@ -112,27 +115,37 @@ public Map<String, ExprType> buildTypeMapping(
return builder.build();
}

/**
* Group By field sort order.
*/
@VisibleForTesting
public static class GroupSortOrder implements Comparator<NamedExpression>,
Function<NamedExpression, SortOrder> {
public static class GroupSortOrder implements Comparator<NamedExpression> {

/**
* The default order of group field.
* The order is ASC NULL_FIRST.
* The field should be the last one in the group list.
*/
private static final Pair<SortOrder, Integer> DEFAULT_ORDER =
Pair.of(SortOrder.ASC, Integer.MAX_VALUE);
private static final Pair<Sort.SortOption, Integer> DEFAULT_ORDER =
Pair.of(Sort.SortOption.DEFAULT_ASC, Integer.MAX_VALUE);

/**
* The mapping between {@link Sort.SortOrder} and {@link SortOrder}.
*/
private static final Map<Sort.SortOrder, SortOrder> SORT_MAP =
new ImmutableMap.Builder<Sort.SortOrder, SortOrder>()
.put(Sort.SortOrder.ASC, SortOrder.ASC)
.put(Sort.SortOrder.DESC, SortOrder.DESC).build();

/**
* The mapping betwen {@link Sort.SortOption} and {@link SortOrder}.
* The mapping between {@link Sort.NullOrder} and {@link MissingOrder}.
*/
private static final Map<Sort.SortOption, SortOrder> SORT_MAP =
new ImmutableMap.Builder<Sort.SortOption, SortOrder>()
.put(Sort.SortOption.DEFAULT_ASC, SortOrder.ASC)
.put(Sort.SortOption.DEFAULT_DESC, SortOrder.DESC).build();
private static final Map<Sort.NullOrder, MissingOrder> NULL_MAP =
new ImmutableMap.Builder<Sort.NullOrder, MissingOrder>()
.put(Sort.NullOrder.NULL_FIRST, MissingOrder.FIRST)
.put(Sort.NullOrder.NULL_LAST, MissingOrder.LAST).build();

private final Map<String, Pair<SortOrder, Integer>> map = new HashMap<>();
private final Map<String, Pair<Sort.SortOption, Integer>> map = new HashMap<>();

/**
* Constructor of GroupSortOrder.
Expand All @@ -144,7 +157,7 @@ public GroupSortOrder(List<Pair<Sort.SortOption, Expression>> sortList) {
int pos = 0;
for (Pair<Sort.SortOption, Expression> sortPair : sortList) {
map.put(((ReferenceExpression) sortPair.getRight()).getAttr(),
Pair.of(SORT_MAP.getOrDefault(sortPair.getLeft(), SortOrder.ASC), pos++));
Pair.of(sortPair.getLeft(), pos++));
}
}

Expand All @@ -161,9 +174,9 @@ public GroupSortOrder(List<Pair<Sort.SortOption, Expression>> sortList) {
*/
@Override
public int compare(NamedExpression o1, NamedExpression o2) {
final Pair<SortOrder, Integer> o1Value =
final Pair<Sort.SortOption, Integer> o1Value =
map.getOrDefault(o1.getName(), DEFAULT_ORDER);
final Pair<SortOrder, Integer> o2Value =
final Pair<Sort.SortOption, Integer> o2Value =
map.getOrDefault(o2.getName(), DEFAULT_ORDER);
return o1Value.getRight().compareTo(o2Value.getRight());
}
Expand All @@ -172,8 +185,19 @@ public int compare(NamedExpression o1, NamedExpression o2) {
* Get the {@link SortOrder} for expression.
* By default, the {@link SortOrder} is ASC.
*/
@Override
public SortOrder apply(NamedExpression expression) {
public SortOrder sortOrder(NamedExpression expression) {
return SORT_MAP.get(sortOption(expression).getSortOrder());
}

/**
* Get the {@link MissingOrder} for expression.
* By default, the {@link MissingOrder} is ASC missing first / DESC missing last.
*/
public MissingOrder missingOrder(NamedExpression expression) {
return NULL_MAP.get(sortOption(expression).getNullOrder());
}

private Sort.SortOption sortOption(NamedExpression expression) {
return map.getOrDefault(expression.getName(), DEFAULT_ORDER).getLeft();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.composite.HistogramValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.expression.NamedExpression;
Expand All @@ -35,50 +36,56 @@ public BucketAggregationBuilder(
* Build the list of CompositeValuesSourceBuilder.
*/
public List<CompositeValuesSourceBuilder<?>> build(
List<Pair<NamedExpression, SortOrder>> groupList) {
List<Triple<NamedExpression, SortOrder, MissingOrder>> groupList) {
ImmutableList.Builder<CompositeValuesSourceBuilder<?>> resultBuilder =
new ImmutableList.Builder<>();
for (Pair<NamedExpression, SortOrder> groupPair : groupList) {
for (Triple<NamedExpression, SortOrder, MissingOrder> groupPair : groupList) {
resultBuilder.add(
buildCompositeValuesSourceBuilder(groupPair.getLeft(), groupPair.getRight()));
buildCompositeValuesSourceBuilder(groupPair.getLeft(),
groupPair.getMiddle(), groupPair.getRight()));
}
return resultBuilder.build();
}

// todo, Expression should implement buildCompositeValuesSourceBuilder() interface.
private CompositeValuesSourceBuilder<?> buildCompositeValuesSourceBuilder(
NamedExpression expr, SortOrder order) {
NamedExpression expr, SortOrder sortOrder, MissingOrder missingOrder) {
if (expr.getDelegated() instanceof SpanExpression) {
SpanExpression spanExpr = (SpanExpression) expr.getDelegated();
return buildHistogram(
expr.getNameOrAlias(),
spanExpr.getField().toString(),
spanExpr.getValue().valueOf(null).doubleValue(),
spanExpr.getUnit());
spanExpr.getUnit(),
missingOrder);
} else {
CompositeValuesSourceBuilder<?> sourceBuilder =
new TermsValuesSourceBuilder(expr.getNameOrAlias()).missingBucket(true).order(order);
new TermsValuesSourceBuilder(expr.getNameOrAlias())
.missingBucket(true)
.missingOrder(missingOrder)
.order(sortOrder);
return helper.build(expr.getDelegated(), sourceBuilder::field, sourceBuilder::script);
}
}

private CompositeValuesSourceBuilder<?> buildHistogram(
String name, String field, Double value, SpanUnit unit) {
String name, String field, Double value, SpanUnit unit, MissingOrder missingOrder) {
switch (unit) {
case NONE:
return new HistogramValuesSourceBuilder(name)
.field(field)
.interval(value)
.missingBucket(true);
.missingBucket(true)
.missingOrder(missingOrder);
case UNKNOWN:
throw new IllegalStateException("Invalid span unit");
default:
return buildDateHistogram(name, field, value.intValue(), unit);
return buildDateHistogram(name, field, value.intValue(), unit, missingOrder);
}
}

private CompositeValuesSourceBuilder<?> buildDateHistogram(
String name, String field, Integer value, SpanUnit unit) {
String name, String field, Integer value, SpanUnit unit, MissingOrder missingOrder) {
String spanValue = value + unit.getName();
switch (unit) {
case MILLISECOND:
Expand All @@ -94,11 +101,13 @@ private CompositeValuesSourceBuilder<?> buildDateHistogram(
return new DateHistogramValuesSourceBuilder(name)
.field(field)
.missingBucket(true)
.missingOrder(missingOrder)
.fixedInterval(new DateHistogramInterval(spanValue));
default:
return new DateHistogramValuesSourceBuilder(name)
.field(field)
.missingBucket(true)
.missingOrder(missingOrder)
.calendarInterval(new DateHistogramInterval(spanValue));
}
}
Expand Down
Loading

0 comments on commit 52d3fd2

Please sign in to comment.