From 3f9cae3463e570278d7b20dd701954e0d292ab64 Mon Sep 17 00:00:00 2001 From: Jason Bryan Date: Wed, 7 Jun 2023 12:59:01 -0400 Subject: [PATCH 1/4] Add async support for forcemerge * Accept wait-for-completion (default: None) as a force-merge operation parameter * Use wait-for-completion only when configured as a boolean --- esrally/driver/runner.py | 3 +++ esrally/track/params.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 0664506a2..edf06a1ca 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -664,10 +664,13 @@ async def __call__(self, es, params): import elasticsearch max_num_segments = params.get("max-num-segments") + wait_for_completion = params.get("wait-for-completion") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments + if isinstance(wait_for_completion, bool): + merge_params["wait_for_completion"] = wait_for_completion if mode == "polling": complete = False try: diff --git a/esrally/track/params.py b/esrally/track/params.py index 141cf2977..3f6aa2bf3 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -829,6 +829,7 @@ def __init__(self, track, params, **kwargs): self._max_num_segments = params.get("max-num-segments") self._poll_period = params.get("poll-period", 10) self._mode = params.get("mode", "blocking") + self._wait_for_completion = params.get("wait-for-completion") def params(self): parsed_params = { @@ -836,6 +837,7 @@ def params(self): "max-num-segments": self._max_num_segments, "mode": self._mode, "poll-period": self._poll_period, + "wait-for-completion": self._wait_for_completion, } parsed_params.update(self._client_params()) return parsed_params From 27a9200e5729bfe21e545f2c566dbdff1568b986 Mon Sep 17 00:00:00 2001 From: Jason Bryan Date: Sat, 10 Jun 2023 13:16:57 -0400 Subject: [PATCH 2/4] Add wait_for_completion tests --- tests/driver/runner_test.py | 70 +++++++++++++++++++++++++++++++++++++ tests/track/params_test.py | 2 ++ 2 files changed, 72 insertions(+) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 065fd79dc..97ce4c43a 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1398,6 +1398,76 @@ async def test_force_merge_with_polling_and_params(self, es): ) es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000) + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_wait_for_completion_false(self, es): + es.indices.forcemerge = mock.AsyncMock() + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "max-num-segments": 1, "wait-for-completion": False}) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, wait_for_completion=False) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_wait_for_completion_true_with_polling(self, es): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock( + side_effect=[ + { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + }, + { + "nodes": {}, + }, + ] + ) + force_merge = runner.ForceMerge() + await force_merge( + es, + params={ + "index": "_all", + "max-num-segments": 1, + "request-timeout": 50000, + "wait-for-completion": True, + "mode": "polling", + "poll-period": 0, + }, + ) + es.indices.forcemerge.assert_awaited_once_with( + index="_all", + max_num_segments=1, + request_timeout=50000, + wait_for_completion=True, + ) + class TestIndicesStatsRunner: @mock.patch("elasticsearch.Elasticsearch") diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 2ca5d21cc..b2b907fd8 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -2895,6 +2895,7 @@ def test_force_merge_all_params(self): "max-num-segments": 1, "polling-period": 20, "mode": "polling", + "wait-for-completion": False, }, ) @@ -2903,6 +2904,7 @@ def test_force_merge_all_params(self): assert p["request-timeout"] == 30 assert p["max-num-segments"] == 1 assert p["mode"] == "polling" + assert p["wait-for-completion"] is False class TestDownsampleParamSource: From 77f3b90c8636d2f69674aeae17d2863c5b7d0a47 Mon Sep 17 00:00:00 2001 From: Jason Bryan Date: Thu, 15 Jun 2023 12:14:23 -0400 Subject: [PATCH 3/4] Use wait_for_completion for polling mode --- docs/track.rst | 15 +++++++++++++-- esrally/driver/runner.py | 13 ++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/docs/track.rst b/docs/track.rst index 8fe63fdba..80fd7bd4c 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -917,8 +917,19 @@ Properties * ``index`` (optional, defaults to the indices defined in the ``indices`` section or the data streams defined in the ``data-streams`` section. If neither are defined defaults to ``_all``.): The name of the index or data stream that should be force-merged. * ``mode`` (optional, default to ``blocking``): In the default ``blocking`` mode the Elasticsearch client blocks until the operation returns or times out as dictated by the :ref:`client-options `. In mode `polling` the client timeout is ignored. Instead, the api call is given 1s to complete. If the operation has not finished, the operator will poll every ``poll-period`` until all force merges are complete. -* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the internal at which a check is performed that all force merge operations are complete. -* ``max-num-segments`` (optional) The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it. +* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the interval at which a check is performed that all force merge operations are complete. +* ``max-num-segments`` (optional): The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it. +* ``wait-for-completion`` (optional boolean): This parameter sets the ``wait_for_completion`` query parameter effectively controlling the blocking behavior of the force merge API request at the target. The default Elasticsearch behavior is to block until the force merge is complete. ``wait-for-completion: false`` differs from polling mode in that Rally will not block the ``force-merge`` track operation, allowing the load driver to move on with the next operation. Only use ``wait-for-completion: false`` in scenarios where running a background force merge is desired, or if the next operation blocks the benchmark until all merges are finished. For example: + + { + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true + } This is an administrative operation. Metrics are not reported by default. If reporting is forced by setting ``include-in-reporting`` to ``true``, then throughput is reported as the number of completed force-merge operations per second. diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index edf06a1ca..bb965ba5a 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -672,12 +672,15 @@ async def __call__(self, es, params): if isinstance(wait_for_completion, bool): merge_params["wait_for_completion"] = wait_for_completion if mode == "polling": + # Explicitely set wait_for_completion to false for polling mode. + merge_params["wait_for_completion"] = False complete = False - try: - await es.indices.forcemerge(**merge_params) - complete = True - except elasticsearch.ConnectionTimeout: - pass + # try: + # await es.indices.forcemerge(**merge_params) + # complete = True + # except elasticsearch.ConnectionTimeout: + # pass + await es.indices.forcemerge(**merge_params) while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) From b8ddf972fcbd5b6178fe56525da19f7f7601aa78 Mon Sep 17 00:00:00 2001 From: Jason Bryan Date: Wed, 21 Jun 2023 17:11:14 -0400 Subject: [PATCH 4/4] Add ES version check --- esrally/driver/runner.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index bb965ba5a..851f34870 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -663,24 +663,31 @@ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch + es_info = await es.info() + es_version = Version.from_string(es_info["version"]["number"]) max_num_segments = params.get("max-num-segments") wait_for_completion = params.get("wait-for-completion") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments - if isinstance(wait_for_completion, bool): - merge_params["wait_for_completion"] = wait_for_completion + # _forcemerge supports wait_for_completion as of 8.1.0 + # https://github.com/elastic/elasticsearch/pull/80463 + if (es_version.major, es_version.minor) >= (8, 1): + if isinstance(wait_for_completion, bool): + merge_params["wait_for_completion"] = wait_for_completion if mode == "polling": - # Explicitely set wait_for_completion to false for polling mode. - merge_params["wait_for_completion"] = False complete = False - # try: - # await es.indices.forcemerge(**merge_params) - # complete = True - # except elasticsearch.ConnectionTimeout: - # pass - await es.indices.forcemerge(**merge_params) + if (es_version.major, es_version.minor) >= (8, 1): + # Explicitely set wait_for_completion to false for polling mode. + merge_params["wait_for_completion"] = False + await es.indices.forcemerge(**merge_params) + else: + try: + await es.indices.forcemerge(**merge_params) + complete = True + except elasticsearch.ConnectionTimeout: + pass while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})