Skip to content

Commit

Permalink
Improve pushdown optimization and logical to physical transformation (o…
Browse files Browse the repository at this point in the history
…pensearch-project#1091)

* Add new table scan builder and optimizer rules

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix jacoco test coverage

Signed-off-by: Chen Dai <daichen@amazon.com>

* Update javadoc with more details

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix highlight pushdown issue

Signed-off-by: Chen Dai <daichen@amazon.com>

* Rename new class more properly

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix default sort by doc issue

Signed-off-by: Chen Dai <daichen@amazon.com>

* Rename visit method and javadoc

Signed-off-by: Chen Dai <daichen@amazon.com>

* Move table scan builder and optimize rule to read package

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix sort push down issue

Signed-off-by: Chen Dai <daichen@amazon.com>

* Move sortByFields to parent scan builder

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add back old test

Signed-off-by: Chen Dai <daichen@amazon.com>

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Dec 7, 2022
1 parent f530770 commit 64a3794
Show file tree
Hide file tree
Showing 44 changed files with 1,810 additions and 2,050 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Default implementor for implementing logical to physical translation. "Default" here means all
Expand Down Expand Up @@ -123,6 +124,11 @@ public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
}

@Override
public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) {
return plan.build();
}

@Override
public PhysicalPlan visitRelation(LogicalRelation node, C context) {
throw new UnsupportedOperationException("Storage engine is responsible for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.opensearch.sql.planner.logical;

import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* The visitor of {@link LogicalPlan}.
*
Expand All @@ -22,6 +24,10 @@ public R visitRelation(LogicalRelation plan, C context) {
return visitNode(plan, context);
}

public R visitTableScanBuilder(TableScanBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitFilter(LogicalFilter plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;

/**
* {@link LogicalPlan} Optimizer.
Expand All @@ -39,8 +41,21 @@ public LogicalPlanOptimizer(List<Rule<?>> rules) {
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort()));
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT));
}

/**
Expand All @@ -63,7 +78,14 @@ private LogicalPlan internalOptimize(LogicalPlan plan) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());
done = false;

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,78 @@

package org.opensearch.sql.planner.optimizer.pattern;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.Property;
import com.facebook.presto.matching.PropertyPattern;
import java.util.Optional;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Pattern helper class.
*/
@UtilityClass
public class Patterns {

/**
* Logical filter with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalFilter> filter(Pattern<T> pattern) {
return Pattern.typeOf(LogicalFilter.class).with(source(pattern));
}

/**
* Logical aggregate operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalAggregation> aggregate(Pattern<T> pattern) {
return Pattern.typeOf(LogicalAggregation.class).with(source(pattern));
}

/**
* Logical sort operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalSort> sort(Pattern<T> pattern) {
return Pattern.typeOf(LogicalSort.class).with(source(pattern));
}

/**
* Logical limit operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalLimit> limit(Pattern<T> pattern) {
return Pattern.typeOf(LogicalLimit.class).with(source(pattern));
}

/**
* Logical highlight operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalHighlight> highlight(Pattern<T> pattern) {
return Pattern.typeOf(LogicalHighlight.class).with(source(pattern));
}

/**
* Logical project operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T> pattern) {
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

/**
* Pattern for {@link TableScanBuilder} and capture it meanwhile.
*/
public static Pattern<TableScanBuilder> scanBuilder() {
return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture());
}

/**
* LogicalPlan source {@link Property}.
*/
Expand All @@ -25,4 +86,28 @@ public static Property<LogicalPlan, LogicalPlan> source() {
? Optional.of(plan.getChild().get(0))
: Optional.empty());
}

/**
* Source (children field) with a given pattern.
*/
@SuppressWarnings("unchecked")
public static <T extends LogicalPlan>
PropertyPattern<LogicalPlan, T> source(Pattern<T> pattern) {
Property<LogicalPlan, T> property = Property.optionalProperty("source",
plan -> plan.getChild().size() == 1
? Optional.of((T) plan.getChild().get(0))
: Optional.empty());

return property.matching(pattern);
}

/**
* Logical relation with table field.
*/
public static Property<LogicalPlan, Table> table() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalRelation
? Optional.of(((LogicalRelation) plan).getTable())
: Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.read;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule that replace logical relation operator to {@link TableScanBuilder} for later
* push down optimization. All push down optimization rules that depends on table scan
* builder needs to run after this.
*/
public class CreateTableScanBuilder implements Rule<LogicalRelation> {

/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;

/**
* Construct create table scan builder rule.
*/
public CreateTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
}

@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture).createScanBuilder();
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.read;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.aggregate;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.filter;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.highlight;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.project;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.scanBuilder;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.sort;
import static org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown.TableScanPushDownBuilder.match;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.pattern.CapturePattern;
import com.facebook.presto.matching.pattern.WithPattern;
import java.util.function.BiFunction;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule template for all table scan push down rules. Because all push down optimization rules
* have similar workflow in common, such as a pattern that match an operator on top of table scan
* builder, and action that eliminates the original operator if pushed down, this class helps
* remove redundant code and improve readability.
*
* @param <T> logical plan node type
*/
public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {

/** Push down optimize rule for filtering condition. */
public static final Rule<?> PUSH_DOWN_FILTER =
match(
filter(
scanBuilder()))
.apply((filter, scanBuilder) -> scanBuilder.pushDownFilter(filter));

/** Push down optimize rule for aggregate operator. */
public static final Rule<?> PUSH_DOWN_AGGREGATION =
match(
aggregate(
scanBuilder()))
.apply((agg, scanBuilder) -> scanBuilder.pushDownAggregation(agg));

/** Push down optimize rule for sort operator. */
public static final Rule<?> PUSH_DOWN_SORT =
match(
sort(
scanBuilder()))
.apply((sort, scanBuilder) -> scanBuilder.pushDownSort(sort));

/** Push down optimize rule for limit operator. */
public static final Rule<?> PUSH_DOWN_LIMIT =
match(
limit(
scanBuilder()))
.apply((limit, scanBuilder) -> scanBuilder.pushDownLimit(limit));

public static final Rule<?> PUSH_DOWN_PROJECT =
match(
project(
scanBuilder()))
.apply((project, scanBuilder) -> scanBuilder.pushDownProject(project));

public static final Rule<?> PUSH_DOWN_HIGHLIGHT =
match(
highlight(
scanBuilder()))
.apply((highlight, scanBuilder) -> scanBuilder.pushDownHighlight(highlight));


/** Pattern that matches a plan node. */
private final WithPattern<T> pattern;

/** Capture table scan builder inside a plan node. */
private final Capture<TableScanBuilder> capture;

/** Push down function applied to the plan node and captured table scan builder. */
private final BiFunction<T, TableScanBuilder, Boolean> pushDownFunction;


@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@Override
public Pattern<T> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(T plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture);
if (pushDownFunction.apply(plan, scanBuilder)) {
return scanBuilder;
}
return plan;
}

/**
* Custom builder class other than generated by Lombok to provide more readable code.
*/
static class TableScanPushDownBuilder<T extends LogicalPlan> {

private WithPattern<T> pattern;

public static <T extends LogicalPlan>
TableScanPushDownBuilder<T> match(Pattern<T> pattern) {
TableScanPushDownBuilder<T> builder = new TableScanPushDownBuilder<>();
builder.pattern = (WithPattern<T>) pattern;
return builder;
}

public TableScanPushDown<T> apply(
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
return new TableScanPushDown<>(pattern, pushDownFunction);
}
}
}
Loading

0 comments on commit 64a3794

Please sign in to comment.