Skip to content

Commit

Permalink
Add Close Cursor API in v2. (#1660)
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand authored May 30, 2023
1 parent 9ebdda5 commit bc5bede
Show file tree
Hide file tree
Showing 35 changed files with 876 additions and 114 deletions.
19 changes: 13 additions & 6 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
Expand Down Expand Up @@ -572,6 +574,17 @@ public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}

@Override
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand All @@ -587,10 +600,4 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
}
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
Expand Down Expand Up @@ -302,6 +303,10 @@ public T visitPaginate(Paginate paginate, C context) {
}

public T visitFetchCursor(FetchCursor cursor, C context) {
return visit(cursor, context);
return visitChildren(cursor, context);
}

public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}
}
38 changes: 38 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent close cursor operation.
* Actually a wrapper to the AST.
*/
public class CloseCursor extends UnresolvedPlan {

/**
* An instance of {@link FetchCursor}.
*/
private UnresolvedPlan cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCloseCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.cursor = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return List.of(cursor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* Query plan which does not reflect a search query being executed.
* It contains a command or an action, for example, a DDL query.
*/
public class CommandPlan extends AbstractPlan {

/**
* The query plan ast.
*/
protected final UnresolvedPlan plan;

/**
* Query service.
*/
protected final QueryService queryService;

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** Constructor. */
public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new UnsupportedOperationException("CommandPlan does not support explain");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import org.opensearch.sql.executor.QueryService;

/**
* Query plan. Which includes.
*
* <p>select query.
* Query plan which includes a <em>select</em> query.
*/
public class QueryPlan extends AbstractPlan {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
Expand Down Expand Up @@ -88,6 +89,15 @@ boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Creates a {@link CloseCursor} command on a cursor.
*/
public AbstractPlan createCloseCursor(String cursor,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener) {
return new CommandPlan(QueryId.queryId(), new CloseCursor().attach(new FetchCursor(cursor)),
queryService, queryResponseListener);
}

@Override
public AbstractPlan visitQuery(
Query node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.planner;

import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
Expand All @@ -25,6 +25,7 @@
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.logical.LogicalWindow;
import org.opensearch.sql.planner.physical.AggregationOperator;
import org.opensearch.sql.planner.physical.CursorCloseOperator;
import org.opensearch.sql.planner.physical.DedupeOperator;
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
Expand Down Expand Up @@ -155,6 +156,11 @@ public PhysicalPlan visitFetchCursor(LogicalFetchCursor plan, C context) {
return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor());
}

@Override
public PhysicalPlan visitCloseCursor(LogicalCloseCursor node, C context) {
return new CursorCloseOperator(visitChild(node, context));
}

protected PhysicalPlan visitChild(LogicalPlan node, C context) {
// Logical operators visited here must have a single child
return node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* A logical plan node which wraps {@link org.opensearch.sql.planner.LogicalCursor}
* and represent a cursor close operation.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
public class LogicalCloseCursor extends LogicalPlan {

public LogicalCloseCursor(LogicalPlan child) {
super(List.of(child));
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCloseCursor(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.storage.StorageEngine;

/**
* A plan node which represents operation of fetching a next page from the cursor.
*/
@EqualsAndHashCode(callSuper = false)
@ToString
public class LogicalFetchCursor extends LogicalPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,8 @@ public R visitPaginate(LogicalPaginate plan, C context) {
public R visitFetchCursor(LogicalFetchCursor plan, C context) {
return visitNode(plan, context);
}

public R visitCloseCursor(LogicalCloseCursor plan, C context) {
return visitNode(plan, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;

/**
* A plan node which blocks issuing a request in {@link #open} and
* getting results in {@link #hasNext}, but doesn't block releasing resources in {@link #close}.
* Designed to be on top of the deserialized tree.
*/
@RequiredArgsConstructor
public class CursorCloseOperator extends PhysicalPlan {

// Entire deserialized from cursor plan tree
private final PhysicalPlan input;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCursorClose(this, context);
}

@Override
public boolean hasNext() {
return false;
}

@Override
public ExprValue next() {
throw new IllegalStateException();
}

@Override
public List<PhysicalPlan> getChild() {
return List.of(input);
}

/**
* Provides an empty schema, because this plan node is always located on the top of the tree.
*/
@Override
public ExecutionEngine.Schema schema() {
return new ExecutionEngine.Schema(List.of());
}

// TODO remove
@Override
public long getTotalHits() {
return 0;
}

@Override
public void open() {
// no-op, no search should be invoked.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ public R visitAD(PhysicalPlan node, C context) {
public R visitML(PhysicalPlan node, C context) {
return visitNode(node, context);
}

public R visitCursorClose(CursorCloseOperator node, C context) {
return visitNode(node, context);
}
}
13 changes: 13 additions & 0 deletions core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.analysis;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.opensearch.sql.ast.expression.ScoreFunction;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.ML;
Expand All @@ -91,6 +93,7 @@
import org.opensearch.sql.expression.function.OpenSearchFunctions;
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
Expand Down Expand Up @@ -1651,4 +1654,14 @@ void visit_cursor() {
assertEquals(new LogicalFetchCursor("test",
dataSourceService.getDataSource("@opensearch").getStorageEngine()), actual);
}

@Test
public void visit_close_cursor() {
var analyzed = analyze(new CloseCursor().attach(new FetchCursor("pewpew")));
assertAll(
() -> assertTrue(analyzed instanceof LogicalCloseCursor),
() -> assertTrue(analyzed.getChild().get(0) instanceof LogicalFetchCursor),
() -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor())
);
}
}
Loading

0 comments on commit bc5bede

Please sign in to comment.