diff --git a/docs/track.rst b/docs/track.rst index 8fe63fdba..80fd7bd4c 100644 --- a/docs/track.rst +++ b/docs/track.rst @@ -917,8 +917,19 @@ Properties * ``index`` (optional, defaults to the indices defined in the ``indices`` section or the data streams defined in the ``data-streams`` section. If neither are defined defaults to ``_all``.): The name of the index or data stream that should be force-merged. * ``mode`` (optional, default to ``blocking``): In the default ``blocking`` mode the Elasticsearch client blocks until the operation returns or times out as dictated by the :ref:`client-options `. In mode `polling` the client timeout is ignored. Instead, the api call is given 1s to complete. If the operation has not finished, the operator will poll every ``poll-period`` until all force merges are complete. -* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the internal at which a check is performed that all force merge operations are complete. -* ``max-num-segments`` (optional) The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it. +* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the interval at which a check is performed that all force merge operations are complete. +* ``max-num-segments`` (optional): The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it. +* ``wait-for-completion`` (optional boolean): This parameter sets the ``wait_for_completion`` query parameter effectively controlling the blocking behavior of the force merge API request at the target. The default Elasticsearch behavior is to block until the force merge is complete. ``wait-for-completion: false`` differs from polling mode in that Rally will not block the ``force-merge`` track operation, allowing the load driver to move on with the next operation. Only use ``wait-for-completion: false`` in scenarios where running a background force merge is desired, or if the next operation blocks the benchmark until all merges are finished. For example: + + { + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true + } This is an administrative operation. Metrics are not reported by default. If reporting is forced by setting ``include-in-reporting`` to ``true``, then throughput is reported as the number of completed force-merge operations per second. diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 0664506a2..851f34870 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -663,18 +663,31 @@ async def __call__(self, es, params): # pylint: disable=import-outside-toplevel import elasticsearch + es_info = await es.info() + es_version = Version.from_string(es_info["version"]["number"]) max_num_segments = params.get("max-num-segments") + wait_for_completion = params.get("wait-for-completion") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments + # _forcemerge supports wait_for_completion as of 8.1.0 + # https://github.com/elastic/elasticsearch/pull/80463 + if (es_version.major, es_version.minor) >= (8, 1): + if isinstance(wait_for_completion, bool): + merge_params["wait_for_completion"] = wait_for_completion if mode == "polling": complete = False - try: + if (es_version.major, es_version.minor) >= (8, 1): + # Explicitely set wait_for_completion to false for polling mode. + merge_params["wait_for_completion"] = False await es.indices.forcemerge(**merge_params) - complete = True - except elasticsearch.ConnectionTimeout: - pass + else: + try: + await es.indices.forcemerge(**merge_params) + complete = True + except elasticsearch.ConnectionTimeout: + pass while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) diff --git a/esrally/track/params.py b/esrally/track/params.py index 141cf2977..3f6aa2bf3 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -829,6 +829,7 @@ def __init__(self, track, params, **kwargs): self._max_num_segments = params.get("max-num-segments") self._poll_period = params.get("poll-period", 10) self._mode = params.get("mode", "blocking") + self._wait_for_completion = params.get("wait-for-completion") def params(self): parsed_params = { @@ -836,6 +837,7 @@ def params(self): "max-num-segments": self._max_num_segments, "mode": self._mode, "poll-period": self._poll_period, + "wait-for-completion": self._wait_for_completion, } parsed_params.update(self._client_params()) return parsed_params diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 065fd79dc..97ce4c43a 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1398,6 +1398,76 @@ async def test_force_merge_with_polling_and_params(self, es): ) es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000) + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_wait_for_completion_false(self, es): + es.indices.forcemerge = mock.AsyncMock() + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "max-num-segments": 1, "wait-for-completion": False}) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, wait_for_completion=False) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_wait_for_completion_true_with_polling(self, es): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.tasks.list = mock.AsyncMock( + side_effect=[ + { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + }, + { + "nodes": {}, + }, + ] + ) + force_merge = runner.ForceMerge() + await force_merge( + es, + params={ + "index": "_all", + "max-num-segments": 1, + "request-timeout": 50000, + "wait-for-completion": True, + "mode": "polling", + "poll-period": 0, + }, + ) + es.indices.forcemerge.assert_awaited_once_with( + index="_all", + max_num_segments=1, + request_timeout=50000, + wait_for_completion=True, + ) + class TestIndicesStatsRunner: @mock.patch("elasticsearch.Elasticsearch") diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 2ca5d21cc..b2b907fd8 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -2895,6 +2895,7 @@ def test_force_merge_all_params(self): "max-num-segments": 1, "polling-period": 20, "mode": "polling", + "wait-for-completion": False, }, ) @@ -2903,6 +2904,7 @@ def test_force_merge_all_params(self): assert p["request-timeout"] == 30 assert p["max-num-segments"] == 1 assert p["mode"] == "polling" + assert p["wait-for-completion"] is False class TestDownsampleParamSource: