Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support wait_for_completion for force-merge operations #1731

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,19 @@ Properties

* ``index`` (optional, defaults to the indices defined in the ``indices`` section or the data streams defined in the ``data-streams`` section. If neither are defined defaults to ``_all``.): The name of the index or data stream that should be force-merged.
* ``mode`` (optional, default to ``blocking``): In the default ``blocking`` mode the Elasticsearch client blocks until the operation returns or times out as dictated by the :ref:`client-options <clr_client_options>`. In mode `polling` the client timeout is ignored. Instead, the api call is given 1s to complete. If the operation has not finished, the operator will poll every ``poll-period`` until all force merges are complete.
* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the internal at which a check is performed that all force merge operations are complete.
* ``max-num-segments`` (optional) The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it.
* ``poll-period`` (defaults to 10s): Only applicable if ``mode`` is set to ``polling``. Determines the interval at which a check is performed that all force merge operations are complete.
* ``max-num-segments`` (optional): The number of segments the index should be merged into. Defaults to simply checking if a merge needs to execute, and if so, executes it.
* ``wait-for-completion`` (optional boolean): This parameter sets the ``wait_for_completion`` query parameter effectively controlling the blocking behavior of the force merge API request at the target. The default Elasticsearch behavior is to block until the force merge is complete. ``wait-for-completion: false`` differs from polling mode in that Rally will not block the ``force-merge`` track operation, allowing the load driver to move on with the next operation. Only use ``wait-for-completion: false`` in scenarios where running a background force merge is desired, or if the next operation blocks the benchmark until all merges are finished. For example:

{
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true
}

This is an administrative operation. Metrics are not reported by default. If reporting is forced by setting ``include-in-reporting`` to ``true``, then throughput is reported as the number of completed force-merge operations per second.

Expand Down
21 changes: 17 additions & 4 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,18 +663,31 @@ async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch

es_info = await es.info()
es_version = Version.from_string(es_info["version"]["number"])
max_num_segments = params.get("max-num-segments")
wait_for_completion = params.get("wait-for-completion")
mode = params.get("mode")
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
# _forcemerge supports wait_for_completion as of 8.1.0
# https://github.com/elastic/elasticsearch/pull/80463
if (es_version.major, es_version.minor) >= (8, 1):
if isinstance(wait_for_completion, bool):
merge_params["wait_for_completion"] = wait_for_completion
if mode == "polling":
complete = False
try:
if (es_version.major, es_version.minor) >= (8, 1):
# Explicitely set wait_for_completion to false for polling mode.
merge_params["wait_for_completion"] = False
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
else:
try:
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
Expand Down
2 changes: 2 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,13 +829,15 @@ def __init__(self, track, params, **kwargs):
self._max_num_segments = params.get("max-num-segments")
self._poll_period = params.get("poll-period", 10)
self._mode = params.get("mode", "blocking")
self._wait_for_completion = params.get("wait-for-completion")

def params(self):
parsed_params = {
"index": self._target_name,
"max-num-segments": self._max_num_segments,
"mode": self._mode,
"poll-period": self._poll_period,
"wait-for-completion": self._wait_for_completion,
}
parsed_params.update(self._client_params())
return parsed_params
Expand Down
70 changes: 70 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,76 @@ async def test_force_merge_with_polling_and_params(self, es):
)
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_wait_for_completion_false(self, es):
es.indices.forcemerge = mock.AsyncMock()
force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "max-num-segments": 1, "wait-for-completion": False})
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, wait_for_completion=False)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_wait_for_completion_true_with_polling(self, es):
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
es.tasks.list = mock.AsyncMock(
side_effect=[
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
},
{
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
await force_merge(
es,
params={
"index": "_all",
"max-num-segments": 1,
"request-timeout": 50000,
"wait-for-completion": True,
"mode": "polling",
"poll-period": 0,
},
)
es.indices.forcemerge.assert_awaited_once_with(
index="_all",
max_num_segments=1,
request_timeout=50000,
wait_for_completion=True,
)


class TestIndicesStatsRunner:
@mock.patch("elasticsearch.Elasticsearch")
Expand Down
2 changes: 2 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2895,6 +2895,7 @@ def test_force_merge_all_params(self):
"max-num-segments": 1,
"polling-period": 20,
"mode": "polling",
"wait-for-completion": False,
},
)

Expand All @@ -2903,6 +2904,7 @@ def test_force_merge_all_params(self):
assert p["request-timeout"] == 30
assert p["max-num-segments"] == 1
assert p["mode"] == "polling"
assert p["wait-for-completion"] is False


class TestDownsampleParamSource:
Expand Down