Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Support aggregate window functions #946

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Here is a documentation list with features only available in this improved SQL q
* [Aggregations](./docs/user/dql/aggregations.rst): aggregation over expression and more other features
* [Complex queries](./docs/user/dql/complex.rst)
* Improvement on Subqueries in FROM clause
* [Window functions](./docs/user/dql/window.rst): ranking window function support
* [Window functions](./docs/user/dql/window.rst): ranking and aggregate window function support

To avoid impact on your side, normally you won't see any difference in query response. If you want to check if and why your query falls back to be handled by old SQL engine, please explain your query and check Elasticsearch log for "Request is falling back to old SQL engine due to ...".

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@
import com.amazon.opendistroforelasticsearch.sql.expression.DSL;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AggregationState;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.CaseClause;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.WhenClause;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionRepository;
import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionName;
import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction;
import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankingWindowFunction;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -166,9 +169,15 @@ public Expression visitFunction(Function node, AnalysisContext context) {
return (Expression) repository.compile(functionName, arguments);
}

@SuppressWarnings("unchecked")
@Override
public Expression visitWindowFunction(WindowFunction node, AnalysisContext context) {
return visitFunction(node.getFunction(), context);
Expression expr = node.getFunction().accept(this, context);
// Wrap regular aggregator by aggregate window function to adapt window operator use
if (expr instanceof Aggregator) {
return new AggregateWindowFunction((Aggregator<AggregationState>) expr);
}
return expr;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.expression.FunctionExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.CaseClause;
Expand Down Expand Up @@ -89,6 +90,14 @@ public Expression visitAggregator(Aggregator<?> node, AnalysisContext context) {
return expressionMap.getOrDefault(node, node);
}

@Override
public Expression visitNamed(NamedExpression node, AnalysisContext context) {
if (expressionMap.containsKey(node)) {
return expressionMap.get(node);
}
return node.getDelegated().accept(this, context);
}

/**
* Implement this because Case/When is not registered in function repository.
*/
Expand Down Expand Up @@ -145,7 +154,7 @@ public Void visitAggregation(LogicalAggregation plan, Void context) {
public Void visitWindow(LogicalWindow plan, Void context) {
Expression windowFunc = plan.getWindowFunction();
expressionMap.put(windowFunc,
new ReferenceExpression(windowFunc.toString(), windowFunc.type()));
new ReferenceExpression(((NamedExpression) windowFunc).getName(), windowFunc.type()));
return visitNode(plan, context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ public List<NamedExpression> visitAlias(Alias node, AnalysisContext context) {
private Expression referenceIfSymbolDefined(Alias expr,
AnalysisContext context) {
UnresolvedExpression delegatedExpr = expr.getDelegated();
return optimizer.optimize(delegatedExpr.accept(expressionAnalyzer, context), context);

// Pass named expression because expression like window function loses full name
// (OVER clause) and thus depends on name in alias to be replaced correctly
return optimizer.optimize(
DSL.named(
expr.getName(),
delegatedExpr.accept(expressionAnalyzer, context),
expr.getAlias()),
context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.expression.WindowFunction;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort.SortOption;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort;
Expand Down Expand Up @@ -68,19 +69,26 @@ public LogicalPlan analyze(UnresolvedExpression projectItem, AnalysisContext con

@Override
public LogicalPlan visitAlias(Alias node, AnalysisContext context) {
return node.getDelegated().accept(this, context);
}
if (!(node.getDelegated() instanceof WindowFunction)) {
return null;
}

WindowFunction unresolved = (WindowFunction) node.getDelegated();
Expression windowFunction = expressionAnalyzer.analyze(unresolved, context);
List<Expression> partitionByList = analyzePartitionList(unresolved, context);
List<Pair<SortOption, Expression>> sortList = analyzeSortList(unresolved, context);

@Override
public LogicalPlan visitWindowFunction(WindowFunction node, AnalysisContext context) {
Expression windowFunction = expressionAnalyzer.analyze(node, context);
List<Expression> partitionByList = analyzePartitionList(node, context);
List<Pair<SortOption, Expression>> sortList = analyzeSortList(node, context);
WindowDefinition windowDefinition = new WindowDefinition(partitionByList, sortList);
NamedExpression namedWindowFunction =
new NamedExpression(node.getName(), windowFunction, node.getAlias());
List<Pair<SortOption, Expression>> allSortItems = windowDefinition.getAllSortItems();

if (allSortItems.isEmpty()) {
return new LogicalWindow(child, namedWindowFunction, windowDefinition);
}
return new LogicalWindow(
new LogicalSort(child, windowDefinition.getAllSortItems()),
windowFunction,
new LogicalSort(child, allSortItems),
namedWindowFunction,
windowDefinition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public When when(UnresolvedExpression condition, UnresolvedExpression result) {
return new When(condition, result);
}

public UnresolvedExpression window(Function function,
public UnresolvedExpression window(UnresolvedExpression function,
List<UnresolvedExpression> partitionByList,
List<Pair<SortOption, UnresolvedExpression>> sortList) {
return new WindowFunction(function, partitionByList, sortList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.ast.Node;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort.SortOption;
import java.util.Collections;
import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -35,13 +35,17 @@
@ToString
public class WindowFunction extends UnresolvedExpression {

private final Function function;
private final UnresolvedExpression function;
private List<UnresolvedExpression> partitionByList;
private List<Pair<SortOption, UnresolvedExpression>> sortList;

@Override
public List<? extends Node> getChild() {
return Collections.singletonList(function);
ImmutableList.Builder<UnresolvedExpression> children = ImmutableList.builder();
children.add(function);
children.addAll(partitionByList);
sortList.forEach(pair -> children.add(pair.getRight()));
return children.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.expression.window;

import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.window.frame.WindowFrame;

/**
* Window function abstraction.
*/
public interface WindowFunctionExpression extends Expression {

/**
* Create specific window frame based on window definition and what's current window function.
* For now two types of cumulative window frame is returned:
* 1. Ranking window functions: ignore frame definition and always operates on
* previous and current row.
* 2. Aggregate window functions: frame partition into peers and sliding window is not supported.
*
* @param definition window definition
* @return window frame
*/
WindowFrame createWindowFrame(WindowDefinition definition);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.data.type.ExprType;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AggregationState;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.env.Environment;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowFunctionExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.window.frame.PeerRowsWindowFrame;
import com.amazon.opendistroforelasticsearch.sql.expression.window.frame.WindowFrame;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;

/**
* Aggregate function adapter that adapts Aggregator for window operator use.
*/
@EqualsAndHashCode
@RequiredArgsConstructor
public class AggregateWindowFunction implements WindowFunctionExpression {

private final Aggregator<AggregationState> aggregator;
private AggregationState state;

@Override
public WindowFrame createWindowFrame(WindowDefinition definition) {
return new PeerRowsWindowFrame(definition);
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
PeerRowsWindowFrame frame = (PeerRowsWindowFrame) valueEnv;
if (frame.isNewPartition()) {
state = aggregator.create();
}

List<ExprValue> peers = frame.next();
for (ExprValue peer : peers) {
state = aggregator.iterate(peer.bindingTuples(), state);
}
return state.result();
}

@Override
public ExprType type() {
return aggregator.type();
}

@Override
public <T, C> T accept(ExpressionNodeVisitor<T, C> visitor, C context) {
return aggregator.accept(visitor, context);
}

@Override
public String toString() {
return aggregator.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
*
*/

package com.amazon.opendistroforelasticsearch.sql.expression.window;
package com.amazon.opendistroforelasticsearch.sql.expression.window.frame;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprTupleValue;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.env.Environment;
import com.amazon.opendistroforelasticsearch.sql.expression.window.frame.WindowFrame;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.google.common.collect.PeekingIterator;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -30,22 +31,21 @@
import lombok.ToString;

/**
* Cumulative window frame that accumulates data row incrementally as window operator iterates
* input rows. Conceptually, cumulative window frame should hold all seen rows till next partition.
* Conceptually, cumulative window frame should hold all seen rows till next partition.
* This class is actually an optimized version that only hold previous and current row. This is
* efficient and sufficient for ranking and aggregate window function support for now, though need
* to add "real" cumulative frame implementation in future as needed.
*/
@EqualsAndHashCode
@Getter
@RequiredArgsConstructor
@ToString
public class CumulativeWindowFrame implements WindowFrame {
public class CurrentRowWindowFrame implements WindowFrame {

@Getter
private final WindowDefinition windowDefinition;

private ExprTupleValue previous;
private ExprTupleValue current;
private ExprValue previous;
private ExprValue current;

@Override
public boolean isNewPartition() {
Expand All @@ -61,30 +61,39 @@ public boolean isNewPartition() {
}

@Override
public int currentIndex() {
// Current row index is always 1 since only 2 rows maintained
return 1;
public void load(PeekingIterator<ExprValue> it) {
previous = current;
current = it.next();
}

@Override
public void add(ExprTupleValue row) {
previous = current;
current = row;
public ExprValue current() {
return current;
}

@Override
public ExprTupleValue get(int index) {
if (index != 0 && index != 1) {
throw new IndexOutOfBoundsException("Index is out of boundary of window frame: " + index);
}
return (index == 0) ? previous : current;
public ExprValue previous() {
return previous;
}

private List<ExprValue> resolve(List<Expression> expressions, ExprTupleValue row) {
private List<ExprValue> resolve(List<Expression> expressions, ExprValue row) {
Environment<Expression, ExprValue> valueEnv = row.bindingTuples();
return expressions.stream()
.map(expr -> expr.valueOf(valueEnv))
.collect(Collectors.toList());
}

/**
* Current row window frame won't pre-fetch any row ahead.
* So always return false as nothing "cached" in frame.
*/
@Override
public boolean hasNext() {
return false;
}

@Override
public List<ExprValue> next() {
return Collections.emptyList();
}

}
Loading