Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework push down page size #1499

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.PushPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;
Expand Down Expand Up @@ -75,8 +74,8 @@ public static LogicalPlanOptimizer paginationCreate() {
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new PushPageSize(),
new CreatePagingTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_PAGINATION,
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,24 @@ public static <T extends LogicalPlan> Pattern<LogicalHighlight> highlight(Patter
return Pattern.typeOf(LogicalHighlight.class).with(source(pattern));
}

/**
* Logical paginate operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalPaginate> paginate(Pattern<T> pattern) {
return Pattern.typeOf(LogicalPaginate.class).with(source(pattern));
}

/**
* Logical project operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T> pattern) {
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

public static <T extends LogicalPlan> Pattern<LogicalProject> project() {
return Pattern.typeOf(LogicalProject.class).capturedAs(Capture.newCapture());
}

/**
* Pattern for {@link TableScanBuilder} and capture it meanwhile.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public CreatePagingTableScanBuilder() {

@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture)
.createPagedScanBuilder(plan.getPageSize());
TableScanBuilder scanBuilder = captures.get(capture).createPagedScanBuilder();
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.filter;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.highlight;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.paginate;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.project;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.scanBuilder;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.sort;
Expand Down Expand Up @@ -74,6 +75,14 @@ public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {
scanBuilder()))
.apply((highlight, scanBuilder) -> scanBuilder.pushDownHighlight(highlight));

public static final Rule<?> PUSH_DOWN_PAGINATION =
match(
paginate(
project(
scanBuilder()
)))
.apply2((paginate, /*project?*/ scanBuilder) -> scanBuilder.pushDownPagination(paginate));


/** Pattern that matches a plan node. */
private final WithPattern<T> pattern;
Expand All @@ -87,12 +96,20 @@ public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {

@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> parentPattern, WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = parentPattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@Override
public Pattern<T> pattern() {
return pattern;
Expand Down Expand Up @@ -125,5 +142,10 @@ public TableScanPushDown<T> apply(
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
return new TableScanPushDown<>(pattern, pushDownFunction);
}

public TableScanPushDown<T> apply2(
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
return new TableScanPushDown<>(pattern, (WithPattern<T>) pattern.getPattern(), pushDownFunction);
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ default StreamingSource asStreamingSource() {
throw new UnsupportedOperationException();
}

default TableScanBuilder createPagedScanBuilder(int pageSize) {
default TableScanBuilder createPagedScanBuilder() {
var error = String.format("'%s' does not support pagination", getClass().toString());
throw new UnsupportedOperationException(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalProject;
Expand Down Expand Up @@ -104,6 +105,17 @@ public boolean pushDownHighlight(LogicalHighlight highlight) {
return false;
}

/**
* Can a page size be pushed down to table scan builder. Assume no such support
* by default unless subclass override this.
*
* @param paginate logical paginate operator
* @return true if pushed down, otherwise false
*/
public boolean pushDownPagination(LogicalPaginate paginate) {
return false;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableScanBuilder(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
Expand All @@ -38,15 +39,13 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder {
*/
// TODO accept indexName as string (same way as `OpenSearchRequestBuilder` does)?
public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName,
int pageSize,
Settings settings,
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.exprValueFactory = exprValueFactory;
this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
this.sourceBuilder = new SearchSourceBuilder()
.from(0)
.size(pageSize)
.timeout(DEFAULT_QUERY_TIMEOUT);
}

Expand All @@ -68,4 +67,9 @@ public void pushDownProjects(Set<ReferenceExpression> projects) {
public void pushTypeMapping(Map<String, OpenSearchDataType> typeMapping) {
exprValueFactory.extendTypeMapping(typeMapping);
}

@Override
public void pushDownPageSize(int pageSize) {
sourceBuilder.size(pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;

public interface PushDownRequestBuilder {
Expand Down Expand Up @@ -60,4 +57,8 @@ default void pushDownProjects(Set<ReferenceExpression> projects) {
default void pushTypeMapping(Map<String, OpenSearchDataType> typeMapping) {
throw new UnsupportedOperationException(throwUnsupported("type mapping"));
}
}

default void pushDownPageSize(int pageSize) {
throw new UnsupportedOperationException(throwUnsupported("type mapping"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public TableScanBuilder createScanBuilder() {
}

@Override
public TableScanBuilder createPagedScanBuilder(int pageSize) {
var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, settings,
public TableScanBuilder createPagedScanBuilder() {
var requestBuilder = new InitialPageRequestBuilder(indexName, settings,
new OpenSearchExprValueFactory(getFieldOpenSearchTypes()));
var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder);
return new OpenSearchPagedIndexScanBuilder(indexScan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Collections;
import java.util.Iterator;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.data.model.ExprValue;
Expand All @@ -21,6 +22,7 @@
@ToString(onlyExplicitlyIncluded = true)
public class OpenSearchPagedIndexScan extends TableScanOperator {
private final OpenSearchClient client;
@Getter
private final PagedRequestBuilder requestBuilder;
@EqualsAndHashCode.Include
@ToString.Include
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.sql.opensearch.storage.scan;

import lombok.EqualsAndHashCode;
import org.opensearch.sql.opensearch.request.InitialPageRequestBuilder;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

Expand All @@ -22,6 +24,18 @@ public OpenSearchPagedIndexScanBuilder(OpenSearchPagedIndexScan indexScan) {
this.indexScan = indexScan;
}

@Override
public boolean pushDownPagination(LogicalPaginate paginate) {
var builder = indexScan.getRequestBuilder();
if (builder instanceof InitialPageRequestBuilder) {
builder.pushDownPageSize(paginate.getPageSize());
return false;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I return true, PaginateOperator is being removed from the tree alongs with ProjectOperator.

} else {
// should never happen actually
throw new UnsupportedOperationException("Trying to set page size not on the first request");
}
}

@Override
public TableScanOperator build() {
return indexScan;
Expand Down