diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index 13bcfabe74..c08facd5b5 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -16,7 +16,6 @@ import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter; import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; -import org.opensearch.sql.planner.optimizer.rule.PushPageSize; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown; import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder; @@ -75,8 +74,8 @@ public static LogicalPlanOptimizer paginationCreate() { /* * Phase 2: Transformations that rely on data source push down capability */ - new PushPageSize(), new CreatePagingTableScanBuilder(), + TableScanPushDown.PUSH_DOWN_PAGINATION, TableScanPushDown.PUSH_DOWN_FILTER, TableScanPushDown.PUSH_DOWN_AGGREGATION, TableScanPushDown.PUSH_DOWN_SORT, diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java index 6e54897506..e75d83d0cd 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java @@ -66,6 +66,13 @@ public static Pattern highlight(Patter return Pattern.typeOf(LogicalHighlight.class).with(source(pattern)); } + /** + * Logical paginate operator with a given pattern on inner field. + */ + public static Pattern paginate(Pattern pattern) { + return Pattern.typeOf(LogicalPaginate.class).with(source(pattern)); + } + /** * Logical project operator with a given pattern on inner field. */ @@ -73,6 +80,10 @@ public static Pattern project(Pattern return Pattern.typeOf(LogicalProject.class).with(source(pattern)); } + public static Pattern project() { + return Pattern.typeOf(LogicalProject.class).capturedAs(Capture.newCapture()); + } + /** * Pattern for {@link TableScanBuilder} and capture it meanwhile. */ diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java index 22079ed9ca..e78e03b98f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java @@ -41,8 +41,7 @@ public CreatePagingTableScanBuilder() { @Override public LogicalPlan apply(LogicalRelation plan, Captures captures) { - TableScanBuilder scanBuilder = captures.get(capture) - .createPagedScanBuilder(plan.getPageSize()); + TableScanBuilder scanBuilder = captures.get(capture).createPagedScanBuilder(); // TODO: Remove this after Prometheus refactored to new table scan builder too return (scanBuilder == null) ? plan : scanBuilder; } diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java index 556a12bb34..bf4f877a44 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java @@ -9,6 +9,7 @@ import static org.opensearch.sql.planner.optimizer.pattern.Patterns.filter; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.highlight; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit; +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.paginate; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.project; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.scanBuilder; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.sort; @@ -74,6 +75,14 @@ public class TableScanPushDown implements Rule { scanBuilder())) .apply((highlight, scanBuilder) -> scanBuilder.pushDownHighlight(highlight)); + public static final Rule PUSH_DOWN_PAGINATION = + match( + paginate( + project( + scanBuilder() + ))) + .apply2((paginate, /*project?*/ scanBuilder) -> scanBuilder.pushDownPagination(paginate)); + /** Pattern that matches a plan node. */ private final WithPattern pattern; @@ -87,12 +96,20 @@ public class TableScanPushDown implements Rule { @SuppressWarnings("unchecked") private TableScanPushDown(WithPattern pattern, - BiFunction pushDownFunction) { + BiFunction pushDownFunction) { this.pattern = pattern; this.capture = ((CapturePattern) pattern.getPattern()).capture(); this.pushDownFunction = pushDownFunction; } + @SuppressWarnings("unchecked") + private TableScanPushDown(WithPattern parentPattern, WithPattern pattern, + BiFunction pushDownFunction) { + this.pattern = parentPattern; + this.capture = ((CapturePattern) pattern.getPattern()).capture(); + this.pushDownFunction = pushDownFunction; + } + @Override public Pattern pattern() { return pattern; @@ -125,5 +142,10 @@ public TableScanPushDown apply( BiFunction pushDownFunction) { return new TableScanPushDown<>(pattern, pushDownFunction); } + + public TableScanPushDown apply2( + BiFunction pushDownFunction) { + return new TableScanPushDown<>(pattern, (WithPattern) pattern.getPattern(), pushDownFunction); + } } } diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index a7f2b606ca..8117e2cc30 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -93,7 +93,7 @@ default StreamingSource asStreamingSource() { throw new UnsupportedOperationException(); } - default TableScanBuilder createPagedScanBuilder(int pageSize) { + default TableScanBuilder createPagedScanBuilder() { var error = String.format("'%s' does not support pagination", getClass().toString()); throw new UnsupportedOperationException(error); } diff --git a/core/src/main/java/org/opensearch/sql/storage/read/TableScanBuilder.java b/core/src/main/java/org/opensearch/sql/storage/read/TableScanBuilder.java index c0fdf36e70..74ec397b6e 100644 --- a/core/src/main/java/org/opensearch/sql/storage/read/TableScanBuilder.java +++ b/core/src/main/java/org/opensearch/sql/storage/read/TableScanBuilder.java @@ -10,6 +10,7 @@ import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalHighlight; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; import org.opensearch.sql.planner.logical.LogicalProject; @@ -104,6 +105,17 @@ public boolean pushDownHighlight(LogicalHighlight highlight) { return false; } + /** + * Can a page size be pushed down to table scan builder. Assume no such support + * by default unless subclass override this. + * + * @param paginate logical paginate operator + * @return true if pushed down, otherwise false + */ + public boolean pushDownPagination(LogicalPaginate paginate) { + return false; + } + @Override public R accept(LogicalPlanNodeVisitor visitor, C context) { return visitor.visitTableScanBuilder(this, context); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java index 8023a86006..a247efd51e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java @@ -13,6 +13,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; @@ -38,7 +39,6 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder { */ // TODO accept indexName as string (same way as `OpenSearchRequestBuilder` does)? public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, - int pageSize, Settings settings, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; @@ -46,7 +46,6 @@ public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); this.sourceBuilder = new SearchSourceBuilder() .from(0) - .size(pageSize) .timeout(DEFAULT_QUERY_TIMEOUT); } @@ -68,4 +67,9 @@ public void pushDownProjects(Set projects) { public void pushTypeMapping(Map typeMapping) { exprValueFactory.extendTypeMapping(typeMapping); } + + @Override + public void pushDownPageSize(int pageSize) { + sourceBuilder.size(pageSize); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PushDownRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PushDownRequestBuilder.java index ab1805ce4e..2509704217 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PushDownRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PushDownRequestBuilder.java @@ -8,17 +8,14 @@ import java.util.List; import java.util.Map; import java.util.Set; -import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.sort.SortBuilder; import org.opensearch.sql.ast.expression.Literal; -import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; -import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; public interface PushDownRequestBuilder { @@ -60,4 +57,8 @@ default void pushDownProjects(Set projects) { default void pushTypeMapping(Map typeMapping) { throw new UnsupportedOperationException(throwUnsupported("type mapping")); } -} \ No newline at end of file + + default void pushDownPageSize(int pageSize) { + throw new UnsupportedOperationException(throwUnsupported("type mapping")); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 110d3d640f..bd1cfbab5e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -164,8 +164,8 @@ public TableScanBuilder createScanBuilder() { } @Override - public TableScanBuilder createPagedScanBuilder(int pageSize) { - var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, settings, + public TableScanBuilder createPagedScanBuilder() { + var requestBuilder = new InitialPageRequestBuilder(indexName, settings, new OpenSearchExprValueFactory(getFieldOpenSearchTypes())); var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder); return new OpenSearchPagedIndexScanBuilder(indexScan); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java index e9d3fd52d3..bc8a2eb74f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.Iterator; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.ToString; import org.apache.commons.lang3.NotImplementedException; import org.opensearch.sql.data.model.ExprValue; @@ -21,6 +22,7 @@ @ToString(onlyExplicitlyIncluded = true) public class OpenSearchPagedIndexScan extends TableScanOperator { private final OpenSearchClient client; + @Getter private final PagedRequestBuilder requestBuilder; @EqualsAndHashCode.Include @ToString.Include diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanBuilder.java index 779df4ebec..0b63945a42 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanBuilder.java @@ -6,6 +6,8 @@ package org.opensearch.sql.opensearch.storage.scan; import lombok.EqualsAndHashCode; +import org.opensearch.sql.opensearch.request.InitialPageRequestBuilder; +import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.storage.TableScanOperator; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -22,6 +24,18 @@ public OpenSearchPagedIndexScanBuilder(OpenSearchPagedIndexScan indexScan) { this.indexScan = indexScan; } + @Override + public boolean pushDownPagination(LogicalPaginate paginate) { + var builder = indexScan.getRequestBuilder(); + if (builder instanceof InitialPageRequestBuilder) { + builder.pushDownPageSize(paginate.getPageSize()); + return false; + } else { + // should never happen actually + throw new UnsupportedOperationException("Trying to set page size not on the first request"); + } + } + @Override public TableScanOperator build() { return indexScan;