Skip to content

Commit

Permalink
JSON response support for everything.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed Mar 17, 2023
1 parent 12bbb3c commit d22bb07
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
Expand Down Expand Up @@ -58,7 +60,7 @@ class QueryResponse {
private final List<ExprValue> results;
private final long total;
private final Cursor cursor;
private final String rawResponse;
private final ResponseMetadata responseMetadata;

/**
* Constructor for Query Response.
Expand All @@ -71,7 +73,7 @@ public QueryResponse(Schema schema, List<ExprValue> results) {
this.results = results;
this.total = 0;
this.cursor = null;
this.rawResponse = "";
this.responseMetadata = new ResponseMetadata();
}
}

Expand All @@ -88,19 +90,26 @@ public static class Column {
}

@Data
//@Builder
@Accessors(chain = true)
class ResponseMetadata {
private long took = 0;
private boolean timeOut = false;
private long maxScore = 1; //or double?
private Shards shards = new Shards();

@Data
//@Builder
@Accessors(chain = true)
public static class Shards {
private final long total = 0;
private final long successful = 0;
private final long skipped = 0;
private final long failed = 0;
private long total = 0;
private long successful = 0;
private long skipped = 0;
private long failed = 0;

public Shards() {
}
}

public ResponseMetadata() {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.opensearch.sql.planner.physical;

import java.util.Iterator;
import java.util.List;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.planner.PlanNode;
Expand Down Expand Up @@ -58,9 +57,9 @@ public long getTotalHits() {
return getChild().stream().mapToLong(PhysicalPlan::getTotalHits).max().orElse(0);
}

public String getRawResponse() {
return getChild().stream().map(PhysicalPlan::getRawResponse)
.filter(r -> r != null && !r.isEmpty()).findFirst().orElse("");
public ExecutionEngine.ResponseMetadata getResponseMetadata() {
return getChild().stream().map(PhysicalPlan::getResponseMetadata)
.findFirst().orElse(new ExecutionEngine.ResponseMetadata());
}

public String toCursor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.sql.protocol.response.format.Format;
import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.sql.protocol.response.format.OpenSearchJsonResponseFormatter;
import org.opensearch.sql.protocol.response.format.RawResponseFormatter;
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
import org.opensearch.sql.sql.SQLService;
Expand Down Expand Up @@ -169,17 +170,7 @@ private ResponseListener<QueryResponse> createQueryResponseListener(
} else if (format.equals(Format.RAW)) {
formatter = new RawResponseFormatter();
} else if (format.equals(Format.JSON)) {
return new ResponseListener<>() {
@Override
public void onResponse(QueryResponse response) {
sendResponse(channel, OK, response.getRawResponse());
}

@Override
public void onFailure(Exception e) {
errorHandler.accept(channel, e);
}
};
formatter = new OpenSearchJsonResponseFormatter();
} else {
formatter = new JdbcResponseFormatter(PRETTY);
}
Expand All @@ -188,7 +179,7 @@ public void onFailure(Exception e) {
public void onResponse(QueryResponse response) {
sendResponse(channel, OK,
formatter.format(new QueryResult(response.getSchema(), response.getResults(),
response.getCursor(), response.getTotal())));
response.getCursor(), response.getTotal(), response.getResponseMetadata())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import org.opensearch.sql.executor.PaginatedPlanCache;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.TableScanOperator;

/** OpenSearch execution engine implementation. */
Expand Down Expand Up @@ -53,11 +51,11 @@ public void execute(PhysicalPlan physicalPlan, ExecutionContext context,
result.add(plan.next());
}

String rawResponse = plan.getRawResponse();
var responseMetadata = plan.getResponseMetadata();
Cursor qc = paginatedPlanCache.convertToCursor(plan);

QueryResponse response = new QueryResponse(physicalPlan.schema(), result,
plan.getTotalHits(), qc, rawResponse);
plan.getTotalHits(), qc, responseMetadata);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.monitor.ResourceMonitor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
Expand Down Expand Up @@ -60,8 +61,8 @@ public void open() {
}

@Override
public String getRawResponse() {
return delegate.getRawResponse();
public ExecutionEngine.ResponseMetadata getResponseMetadata() {
return delegate.getResponseMetadata();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;

/**
Expand All @@ -44,9 +45,10 @@ public class OpenSearchResponse implements Iterable<ExprValue> {
*/
@EqualsAndHashCode.Exclude
private final OpenSearchExprValueFactory exprValueFactory;

@EqualsAndHashCode.Exclude
@Getter
private String rawResponse;
private ExecutionEngine.ResponseMetadata responseMetadata;

/**
* Constructor of OpenSearchResponse.
Expand All @@ -56,7 +58,13 @@ public OpenSearchResponse(SearchResponse searchResponse,
this.hits = searchResponse.getHits();
this.aggregations = searchResponse.getAggregations();
this.exprValueFactory = exprValueFactory;
this.rawResponse = searchResponse.toString();
this.responseMetadata = new ExecutionEngine.ResponseMetadata().setShards(
new ExecutionEngine.ResponseMetadata.Shards()
.setTotal(searchResponse.getTotalShards())
.setSuccessful(searchResponse.getSuccessfulShards())
.setSkipped(searchResponse.getSkippedShards())
.setFailed(searchResponse.getFailedShards())
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.ToString;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class OpenSearchIndexScan extends TableScanOperator {
/** Search response for current batch. */
private transient Iterator<ExprValue> iterator;

private String rawResponse;
private ExecutionEngine.ResponseMetadata responseMetadata;

/**
* Constructor.
Expand Down Expand Up @@ -87,7 +88,7 @@ public void open() {
request = requestBuilder.build();
iterator = Collections.emptyIterator();
queryCount = 0;
rawResponse = fetchNextBatch().getRawResponse();
responseMetadata = fetchNextBatch().getResponseMetadata();
}

@Override
Expand All @@ -113,8 +114,8 @@ public long getTotalHits() {
}

@Override
public String getRawResponse() {
return rawResponse;
public ExecutionEngine.ResponseMetadata getResponseMetadata() {
return responseMetadata;
}

protected OpenSearchResponse fetchNextBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.ToString;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.PagedRequestBuilder;
Expand All @@ -27,7 +28,7 @@ public class OpenSearchPagedIndexScan extends TableScanOperator {
private OpenSearchRequest request;
private Iterator<ExprValue> iterator;
private long totalHits = 0;
private String rawResponse;
private ExecutionEngine.ResponseMetadata responseMetadata;

public OpenSearchPagedIndexScan(OpenSearchClient client,
PagedRequestBuilder requestBuilder) {
Expand Down Expand Up @@ -55,7 +56,7 @@ public void open() {
super.open();
request = requestBuilder.build();
OpenSearchResponse response = client.search(request);
rawResponse = response.getRawResponse();
responseMetadata = response.getResponseMetadata();
if (!response.isEmpty()) {
iterator = response.iterator();
totalHits = response.getTotalHits();
Expand All @@ -71,8 +72,8 @@ public void close() {
}

@Override
public String getRawResponse() {
return rawResponse;
public ExecutionEngine.ResponseMetadata getResponseMetadata() {
return responseMetadata;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ public class OpenSearchSystemIndexScan extends TableScanOperator {

private long totalHits = 0;

private List<ExprValue> rawResponse = List.of();

@Override
public void open() {
rawResponse = request.search();
var rawResponse = request.search();
totalHits = rawResponse.size();
iterator = rawResponse.iterator();
}
Expand All @@ -61,12 +59,6 @@ public long getTotalHits() {
return totalHits;
}

@Override
public String getRawResponse() {
return rawResponse.stream().map(ExprValueUtils::jsonify)
.collect(Collectors.joining(", ", "[ ", " ]"));
}

@Override
public String explain() {
return request.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private ResponseListener<ExecutionEngine.QueryResponse> createListener(
public void onResponse(ExecutionEngine.QueryResponse response) {
String responseContent =
formatter.format(new QueryResult(response.getSchema(), response.getResults(),
response.getCursor(), response.getTotal()));
response.getCursor(), response.getTotal(), response.getResponseMetadata()));
listener.onResponse(new TransportPPLQueryResponse(responseContent));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class PrometheusMetricScan extends TableScanOperator {

private static final Logger LOG = LogManager.getLogger();

private String rawResponse;

/**
* Constructor.
*
Expand All @@ -71,7 +69,6 @@ public void open() {
JSONObject responseObject = prometheusClient.queryRange(
request.getPromQl(),
request.getStartTime(), request.getEndTime(), request.getStep());
rawResponse = responseObject.toString();
return new PrometheusResponse(responseObject, prometheusResponseFieldNames,
isQueryRangeFunctionScan).iterator();
} catch (IOException e) {
Expand All @@ -81,11 +78,6 @@ public void open() {
});
}

@Override
public String getRawResponse() {
return rawResponse;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,16 @@ public class PrometheusSystemTableScan extends TableScanOperator {

private Iterator<ExprValue> iterator;

private List<ExprValue> rawResponse = List.of();

@Override
public void open() {
rawResponse = request.search();
iterator = rawResponse.iterator();
iterator = request.search().iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public String getRawResponse() {
return rawResponse.stream().map(ExprValueUtils::jsonify)
.collect(Collectors.joining(", ", "[ ", " ]"));
}

@Override
public ExprValue next() {
return iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class QueryResult implements Iterable<Object[]> {
/**
* Results which are collection of expression.
*/
@Getter
private final Collection<ExprValue> exprValues;

@Getter
Expand All @@ -39,9 +40,12 @@ public class QueryResult implements Iterable<Object[]> {
@Getter
private final long total;

@Getter
private final ExecutionEngine.ResponseMetadata responseMetadata;

public QueryResult(ExecutionEngine.Schema schema, Collection<ExprValue> exprValues) {
this(schema, exprValues, Cursor.None, exprValues.size());
this(schema, exprValues, Cursor.None, exprValues.size(),
new ExecutionEngine.ResponseMetadata());
}

/**
Expand Down
Loading

0 comments on commit d22bb07

Please sign in to comment.