From 5bc661b32b2a7623d71702dfcba6f5144fb9548a Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Thu, 26 Oct 2023 23:13:33 +0000 Subject: [PATCH] Compute pipelineStart before building request callback chain Also, IntelliJ suggested refactoring creation of the terminal request callback into a separate method since the existing method was really big. I liked that suggestion. Signed-off-by: Michael Froh --- .../opensearch/search/pipeline/Pipeline.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index fa80cf23b8075..2699f18d6f037 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -137,18 +137,7 @@ void transformRequest(SearchRequest request, ActionListener reque return; } - long[] pipelineStart = new long[1]; - - ActionListener finalListener = ActionListener.wrap(r -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformRequest(took); - requestListener.onResponse(new PipelinedRequest(this, r)); - }, e -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformRequest(took); - onTransformRequestFailure(); - requestListener.onFailure(new SearchPipelineProcessingException(e)); - }); + ActionListener finalListener = getTerminalSearchRequestActionListener(requestListener); // Chain listeners back-to-front ActionListener currentListener = finalListener; @@ -183,11 +172,25 @@ void transformRequest(SearchRequest request, ActionListener reque }, finalListener::onFailure); } - pipelineStart[0] = relativeTimeSupplier.getAsLong(); beforeTransformRequest(); currentListener.onResponse(request); } + private ActionListener getTerminalSearchRequestActionListener(ActionListener requestListener) { + final long pipelineStart = relativeTimeSupplier.getAsLong(); + + return ActionListener.wrap(r -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformRequest(took); + requestListener.onResponse(new PipelinedRequest(this, r)); + }, e -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformRequest(took); + onTransformRequestFailure(); + requestListener.onFailure(new SearchPipelineProcessingException(e)); + }); + } + ActionListener transformResponseListener(SearchRequest request, ActionListener responseListener) { if (searchResponseProcessors.isEmpty()) { // No response transformation necessary