Skip to content

Commit

Permalink
Add Close Cursor API in v2.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed May 26, 2023
1 parent 73f18df commit 9100c62
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 14 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.opensearch.sql.planner.LogicalCursor;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFilter;
Expand Down Expand Up @@ -578,6 +580,11 @@ public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

@Override
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand Down Expand Up @@ -304,4 +305,8 @@ public T visitPaginate(Paginate paginate, C context) {
public T visitCursor(Cursor cursor, C context) {
return visit(cursor, context);
}

public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}
}
39 changes: 39 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.executor.pagination.PlanSerializer;

import java.util.List;

/**
* TODO
*/
public class CloseCursor extends UnresolvedPlan {

/** An instance of {@link Cursor} */
private UnresolvedPlan child;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCloseCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* Query plan which does not reflect a search query being executed.
*/
public class NonQueryPlan extends AbstractPlan {

/**
* The query plan ast.
*/
protected final UnresolvedPlan plan;

/**
* Query service.
*/
protected final QueryService queryService;

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

public NonQueryPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new UnsupportedOperationException("NonQueryPlan does not support explain");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import org.opensearch.sql.executor.QueryService;

/**
* Query plan. Which includes.
*
* <p>select query.
* Query plan which includes a <em>select</em> query.
*/
public class QueryPlan extends AbstractPlan {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
Expand Down Expand Up @@ -76,10 +77,15 @@ public AbstractPlan create(
/**
* Creates a ContinuePaginatedPlan from a cursor.
*/
public AbstractPlan create(String cursor, boolean isExplain,
public AbstractPlan create(String cursor, boolean isExplain, boolean isCursorClose,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
if (isCursorClose) {
return new NonQueryPlan(queryId, new CloseCursor().attach(new Cursor(cursor)),
queryService, queryResponseListener);
// TODO explain?
}
var plan = new QueryPlan(queryId, new Cursor(cursor), queryService, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.planner;

import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFilter;
Expand All @@ -24,6 +24,7 @@
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.logical.LogicalWindow;
import org.opensearch.sql.planner.physical.AggregationOperator;
import org.opensearch.sql.planner.physical.CursorCloseOperator;
import org.opensearch.sql.planner.physical.DedupeOperator;
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
Expand Down Expand Up @@ -149,12 +150,16 @@ public PhysicalPlan visitRelation(LogicalRelation node, C context) {
+ "implementing and optimizing logical plan with relation involved");
}


@Override
public PhysicalPlan visitCursor(LogicalCursor plan, C context) {
return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor());
}

@Override
public PhysicalPlan visitCloseCursor(LogicalCloseCursor node, C context) {
return new CursorCloseOperator(visitChild(node, context));
}

protected PhysicalPlan visitChild(LogicalPlan node, C context) {
// Logical operators visited here must have a single child
return node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* TODO
*/
@ToString
@EqualsAndHashCode(callSuper = false)
public class LogicalCloseCursor extends LogicalPlan {

public LogicalCloseCursor(LogicalPlan child) {
super(List.of(child));
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCloseCursor(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ public R visitPaginate(LogicalPaginate plan, C context) {
public R visitCursor(LogicalCursor plan, C context) {
return visitNode(plan, context);
}

public R visitCloseCursor(LogicalCloseCursor plan, C context) {
return visitNode(plan, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.storage.TableScanOperator;

// TODO add doc
@RequiredArgsConstructor
public class CursorCloseOperator extends PhysicalPlan {

// Entire deserialized from cursor plan tree
private final PhysicalPlan input;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCursorClose(this, context);
}

@Override
public boolean hasNext() {
return false;
}

@Override
public ExprValue next() {
throw new IllegalStateException();
}

@Override
public List<PhysicalPlan> getChild() {
return List.of(input);
}

@Override
public ExecutionEngine.Schema schema() {
return new ExecutionEngine.Schema(List.of());
}

// TODO remove
@Override
public long getTotalHits() {
return 0;
}

@Override
public void open() {
// no-op, no search should be invoked.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ public R visitAD(PhysicalPlan node, C context) {
public R visitML(PhysicalPlan node, C context) {
return visitNode(node, context);
}

public R visitCursorClose(PhysicalPlan node, C context) {
return visitNode(node, context);
}
}
Loading

0 comments on commit 9100c62

Please sign in to comment.