From 4c68e3aafbcc0647b70486a0c52c01bd66049df1 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Thu, 17 Oct 2024 11:14:10 -0500 Subject: [PATCH 01/14] fixes composite operation failing when track_total_hits is false --- esrally/driver/runner.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 137bf4a4d..a1d663272 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2501,7 +2501,17 @@ async def __call__(self, es, params): # id may be None if the operation has already returned search_id = response.get("id") CompositeContext.put(op_name, search_id) - + + #track_total_hits is true by default + CompositeContext.put("track_total_hits", True) + + if "track_total_hits" in params.get("body").keys(): + CompositeContext.put("track_total_hits", bool(eval(params.get("body").get("track_total_hits").capitalize()))) + + if "track_total_hits" in request_params.keys(): + CompositeContext.put("track_total_hits", bool(eval(request_params.get("track_total_hits").capitalize()))) + + def __repr__(self, *args, **kwargs): return "submit-async-search" @@ -2527,11 +2537,14 @@ async def __call__(self, es, params): success = success and not is_running if not is_running: stats[search] = { - "hits": response["response"]["hits"]["total"]["value"], - "hits_relation": response["response"]["hits"]["total"]["relation"], "timed_out": response["response"]["timed_out"], "took": response["response"]["took"], } + + if CompositeContext.get("track_total_hits"): + stats[search]["hits"] = response["response"]["hits"]["total"]["value"] + stats[search]["hits_relation"] = response["response"]["hits"]["total"]["relation"] + return { # only count completed searches - there is one key per search id in `stats` From 1086eafdbc62541765e1d4467d202ac07690450f Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Thu, 17 Oct 2024 11:40:39 -0500 Subject: [PATCH 02/14] adding True as default for track_total_hits --- esrally/driver/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index a1d663272..86d261cbf 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2506,10 +2506,10 @@ async def __call__(self, es, params): CompositeContext.put("track_total_hits", True) if "track_total_hits" in params.get("body").keys(): - CompositeContext.put("track_total_hits", bool(eval(params.get("body").get("track_total_hits").capitalize()))) + CompositeContext.put("track_total_hits", bool(eval(params.get("body").get("track_total_hits", True).capitalize()))) if "track_total_hits" in request_params.keys(): - CompositeContext.put("track_total_hits", bool(eval(request_params.get("track_total_hits").capitalize()))) + CompositeContext.put("track_total_hits", bool(eval(request_params.get("track_total_hits", True).capitalize()))) def __repr__(self, *args, **kwargs): From 69990978272ec7615a227d96882b3c6fe915c52c Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Fri, 18 Oct 2024 13:26:58 -0500 Subject: [PATCH 03/14] enforcing wait_for_completion_timeout = 0 so there is always an id associated with the async search --- esrally/driver/runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 86d261cbf..a5633809c 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2495,6 +2495,9 @@ def __repr__(self, *args, **kwargs): class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) + + #enforce wait_for_completion_timeout = 0 so there is always an id associated with the async search + request_params['wait_for_completion_timeout'] = 0 response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) From 5746dc3b4eaf3896aa20b82d4b949855061570de Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Fri, 18 Oct 2024 13:27:44 -0500 Subject: [PATCH 04/14] checking if total is reported (track_total_hits) --- esrally/driver/runner.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index a5633809c..e060aa46e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2501,18 +2501,9 @@ async def __call__(self, es, params): response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) - # id may be None if the operation has already returned + search_id = response.get("id") CompositeContext.put(op_name, search_id) - - #track_total_hits is true by default - CompositeContext.put("track_total_hits", True) - - if "track_total_hits" in params.get("body").keys(): - CompositeContext.put("track_total_hits", bool(eval(params.get("body").get("track_total_hits", True).capitalize()))) - - if "track_total_hits" in request_params.keys(): - CompositeContext.put("track_total_hits", bool(eval(request_params.get("track_total_hits", True).capitalize()))) def __repr__(self, *args, **kwargs): @@ -2523,7 +2514,7 @@ def async_search_ids(op_names): subjects = [op_names] if isinstance(op_names, str) else op_names for subject in subjects: subject_id = CompositeContext.get(subject) - # skip empty ids, searches have already completed + if subject_id: yield subject_id, subject @@ -2534,6 +2525,7 @@ async def __call__(self, es, params): searches = mandatory(params, "retrieve-results-for", self) request_params = params.get("request-params", {}) stats = {} + for search_id, search in async_search_ids(searches): response = await es.async_search.get(id=search_id, params=request_params) is_running = response["is_running"] @@ -2544,10 +2536,10 @@ async def __call__(self, es, params): "took": response["response"]["took"], } - if CompositeContext.get("track_total_hits"): + if "total" in response["response"]["hits"].keys(): stats[search]["hits"] = response["response"]["hits"]["total"]["value"] stats[search]["hits_relation"] = response["response"]["hits"]["total"]["relation"] - + return { # only count completed searches - there is one key per search id in `stats` From 69441bb14ac6cc53de6b1c70ba23e2164a99710b Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Fri, 18 Oct 2024 14:46:11 -0500 Subject: [PATCH 05/14] Squashed commit of the following: commit 5746dc3b4eaf3896aa20b82d4b949855061570de Author: Ugo Sangiorgi Date: Fri Oct 18 13:27:44 2024 -0500 checking if total is reported (track_total_hits) commit 69990978272ec7615a227d96882b3c6fe915c52c Author: Ugo Sangiorgi Date: Fri Oct 18 13:26:58 2024 -0500 enforcing wait_for_completion_timeout = 0 so there is always an id associated with the async search commit 1086eafdbc62541765e1d4467d202ac07690450f Author: Ugo Sangiorgi Date: Thu Oct 17 11:40:39 2024 -0500 adding True as default for track_total_hits commit 4c68e3aafbcc0647b70486a0c52c01bd66049df1 Author: Ugo Sangiorgi Date: Thu Oct 17 11:14:10 2024 -0500 fixes composite operation failing when track_total_hits is false --- esrally/driver/runner.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 137bf4a4d..e060aa46e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2495,13 +2495,17 @@ def __repr__(self, *args, **kwargs): class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) + + #enforce wait_for_completion_timeout = 0 so there is always an id associated with the async search + request_params['wait_for_completion_timeout'] = 0 response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) - # id may be None if the operation has already returned + search_id = response.get("id") CompositeContext.put(op_name, search_id) + def __repr__(self, *args, **kwargs): return "submit-async-search" @@ -2510,7 +2514,7 @@ def async_search_ids(op_names): subjects = [op_names] if isinstance(op_names, str) else op_names for subject in subjects: subject_id = CompositeContext.get(subject) - # skip empty ids, searches have already completed + if subject_id: yield subject_id, subject @@ -2521,17 +2525,21 @@ async def __call__(self, es, params): searches = mandatory(params, "retrieve-results-for", self) request_params = params.get("request-params", {}) stats = {} + for search_id, search in async_search_ids(searches): response = await es.async_search.get(id=search_id, params=request_params) is_running = response["is_running"] success = success and not is_running if not is_running: stats[search] = { - "hits": response["response"]["hits"]["total"]["value"], - "hits_relation": response["response"]["hits"]["total"]["relation"], "timed_out": response["response"]["timed_out"], "took": response["response"]["took"], } + + if "total" in response["response"]["hits"].keys(): + stats[search]["hits"] = response["response"]["hits"]["total"]["value"] + stats[search]["hits_relation"] = response["response"]["hits"]["total"]["relation"] + return { # only count completed searches - there is one key per search id in `stats` From def0158fbec7e99e823902a4fd035a014310bb9c Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Mon, 21 Oct 2024 13:23:52 -0500 Subject: [PATCH 06/14] lint --- esrally/driver/runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index e060aa46e..43570884d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -3050,3 +3050,4 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __repr__(self, *args, **kwargs): return "retryable %s" % repr(self.delegate) + From 2cd7426d05fc89aa810c3b9ec8f6044ef557676d Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Mon, 21 Oct 2024 13:24:08 -0500 Subject: [PATCH 07/14] test must include 'wait_for_completion_timeout': 0 --- tests/driver/runner_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 561404752..0838c4402 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -6119,7 +6119,7 @@ async def test_submit_async_search(self, es): # search id is registered in context assert runner.CompositeContext.get("search-1") == "12345" - es.async_search.submit.assert_awaited_once_with(body={"query": {"match_all": {}}}, index="_all", params={}) + es.async_search.submit.assert_awaited_once_with(body={"query": {"match_all": {}}}, index="_all", params={'wait_for_completion_timeout': 0}) class TestGetAsyncSearch: From 27976e38760eb243ef36df07392eab0f09ad7edc Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Mon, 21 Oct 2024 13:29:39 -0500 Subject: [PATCH 08/14] lint --- esrally/driver/runner.py | 15 ++++++--------- tests/driver/runner_test.py | 4 +++- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 43570884d..364f2d72d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2495,17 +2495,16 @@ def __repr__(self, *args, **kwargs): class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) - - #enforce wait_for_completion_timeout = 0 so there is always an id associated with the async search - request_params['wait_for_completion_timeout'] = 0 + + # enforce wait_for_completion_timeout = 0 so there is always an id associated with the async search + request_params["wait_for_completion_timeout"] = 0 response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) - + search_id = response.get("id") CompositeContext.put(op_name, search_id) - def __repr__(self, *args, **kwargs): return "submit-async-search" @@ -2525,7 +2524,7 @@ async def __call__(self, es, params): searches = mandatory(params, "retrieve-results-for", self) request_params = params.get("request-params", {}) stats = {} - + for search_id, search in async_search_ids(searches): response = await es.async_search.get(id=search_id, params=request_params) is_running = response["is_running"] @@ -2535,12 +2534,11 @@ async def __call__(self, es, params): "timed_out": response["response"]["timed_out"], "took": response["response"]["took"], } - + if "total" in response["response"]["hits"].keys(): stats[search]["hits"] = response["response"]["hits"]["total"]["value"] stats[search]["hits_relation"] = response["response"]["hits"]["total"]["relation"] - return { # only count completed searches - there is one key per search id in `stats` "weight": len(stats), @@ -3050,4 +3048,3 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __repr__(self, *args, **kwargs): return "retryable %s" % repr(self.delegate) - diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 0838c4402..22d60ddad 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -6119,7 +6119,9 @@ async def test_submit_async_search(self, es): # search id is registered in context assert runner.CompositeContext.get("search-1") == "12345" - es.async_search.submit.assert_awaited_once_with(body={"query": {"match_all": {}}}, index="_all", params={'wait_for_completion_timeout': 0}) + es.async_search.submit.assert_awaited_once_with( + body={"query": {"match_all": {}}}, index="_all", params={"wait_for_completion_timeout": 0} + ) class TestGetAsyncSearch: From be7fcf8693d0a560630b290596b56e14c84f0288 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 12:50:44 -0500 Subject: [PATCH 09/14] defaulting wait_for_completion_timeout instead of enforcing --- esrally/driver/runner.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 364f2d72d..7982d130b 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2496,8 +2496,10 @@ class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) - # enforce wait_for_completion_timeout = 0 so there is always an id associated with the async search - request_params["wait_for_completion_timeout"] = 0 + # defaults wait_for_completion_timeout = 0 so there is always an id associated with the async search + if "wait_for_completion_timeout" not in request_params: + request_params["wait_for_completion_timeout"] = 0 + response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) From c81cdf03ebaa1b401952f9afb3215479bd7ca179 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 12:52:27 -0500 Subject: [PATCH 10/14] Update esrally/driver/runner.py Co-authored-by: Gareth Ellis --- esrally/driver/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 7982d130b..0ad131945 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2503,7 +2503,6 @@ async def __call__(self, es, params): response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) - search_id = response.get("id") CompositeContext.put(op_name, search_id) From c4ce9a7a9b221e519cc509ecc097ccbb95883425 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 12:52:34 -0500 Subject: [PATCH 11/14] Update esrally/driver/runner.py Co-authored-by: Gareth Ellis --- esrally/driver/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 0ad131945..1f7873cb7 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2525,7 +2525,6 @@ async def __call__(self, es, params): searches = mandatory(params, "retrieve-results-for", self) request_params = params.get("request-params", {}) stats = {} - for search_id, search in async_search_ids(searches): response = await es.async_search.get(id=search_id, params=request_params) is_running = response["is_running"] From c3e7586e7acfa9036381996649a40d42c7d1a855 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 12:52:45 -0500 Subject: [PATCH 12/14] Update esrally/driver/runner.py Co-authored-by: Gareth Ellis --- esrally/driver/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 1f7873cb7..2ac3ada5c 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2514,7 +2514,6 @@ def async_search_ids(op_names): subjects = [op_names] if isinstance(op_names, str) else op_names for subject in subjects: subject_id = CompositeContext.get(subject) - if subject_id: yield subject_id, subject From 167b914b1aff3c9e4395c8221e5ac8a5d0f4862a Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 12:57:12 -0500 Subject: [PATCH 13/14] lint --- esrally/driver/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 2ac3ada5c..068f74da4 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2499,7 +2499,7 @@ async def __call__(self, es, params): # defaults wait_for_completion_timeout = 0 so there is always an id associated with the async search if "wait_for_completion_timeout" not in request_params: request_params["wait_for_completion_timeout"] = 0 - + response = await es.async_search.submit(body=mandatory(params, "body", self), index=params.get("index"), params=request_params) op_name = mandatory(params, "name", self) From 56b145c4d7fffe876d3e93f8429ff9b67d2da4f8 Mon Sep 17 00:00:00 2001 From: Ugo Sangiorgi Date: Tue, 22 Oct 2024 14:34:58 -0500 Subject: [PATCH 14/14] better comment --- esrally/driver/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 068f74da4..1b5018f05 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2496,7 +2496,7 @@ class SubmitAsyncSearch(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) - # defaults wait_for_completion_timeout = 0 so there is always an id associated with the async search + # defaults wait_for_completion_timeout = 0 to avoid sync fallback for fast searches if "wait_for_completion_timeout" not in request_params: request_params["wait_for_completion_timeout"] = 0