From a57512c46b36f1984bc22368e5f9cff349603704 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 3 May 2018 17:13:32 +0300 Subject: [PATCH] SQL: Fix bug caused by empty composites (#30343) When dealing with filtering, a composite aggregation might return empty buckets (which have been filtered) which gets sent as is to the client. Unfortunately this interprets the response as no more data instead of retrying. This now has changed and the listener keeps retrying until either the query has ended or data passes the filter. Fix #30292 --- .../search/CompositeAggregationCursor.java | 36 +++++++++++++++---- .../xpack/sql/execution/search/Querier.java | 9 ++++- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index fe9479f3c1aa4..31d933f9f59d6 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -113,12 +113,36 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices); - client.search(search, ActionListener.wrap(r -> { - updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, - serializeQuery(query), indices); - listener.onResponse(rowSet); - }, listener::onFailure)); + client.search(search, new ActionListener() { + @Override + public void onResponse(SearchResponse r) { + try { + // retry + if (shouldRetryDueToEmptyPage(r)) { + CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); + client.search(search, this); + return; + } + + updateCompositeAfterKey(r, query); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices); + listener.onResponse(rowSet); + } catch (Exception ex) { + listener.onFailure(ex); + } + } + + @Override + public void onFailure(Exception ex) { + listener.onFailure(ex); + } + }); + } + + static boolean shouldRetryDueToEmptyPage(SearchResponse response) { + CompositeAggregation composite = getComposite(response); + // if there are no buckets but a next page, go fetch it instead of sending an empty response to the client + return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty(); } static CompositeAggregation getComposite(SearchResponse response) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 3c10f08c53a8d..62941a5b14f07 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -206,8 +206,15 @@ static class CompositeActionListener extends BaseAggActionListener { protected void handleResponse(SearchResponse response, ActionListener listener) { // there are some results if (response.getAggregations().asList().size() > 0) { - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + // retry + if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + client.search(request, this); + return; + } + + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); byte[] nextSearch = null; try { nextSearch = CompositeAggregationCursor.serializeQuery(request.source());