Skip to content

Commit

Permalink
Add timeout parameter to bulk operation (#1362)
Browse files Browse the repository at this point in the history
This change adds timeout parameter that can be used in bulk operation to define server-side timeout (for applying mappings updates, automatic index creation etc)
  • Loading branch information
probakowski authored Oct 19, 2021
1 parent 35de44e commit 6a16ec6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ Properties
* ``on-conflict`` (optional, defaults to ``index``): Determines whether Rally should use the action ``index`` or ``update`` on id conflicts.
* ``recency`` (optional, defaults to 0): A number between [0,1] indicating whether to bias conflicting ids towards more recent ids (``recency`` towards 1) or whether to consider all ids for id conflicts (``recency`` towards 0). See the diagram below for details.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned.
* ``timeout`` (optional, defaults to ``1m``): Defines the `time period that Elasticsearch will wait per action <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-query-params>`_ until it has finished processing the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.


The image below shows how Rally behaves with a ``recency`` set to 0.5. Internally, Rally uses the blue function for its calculations but to understand the behavior we will focus on red function (which is just the inverse). Suppose we have already generated ids from 1 to 100 and we are about to simulate an id conflict. Rally will randomly choose a value on the y-axis, e.g. 0.8 which is mapped to 0.1 on the x-axis. This means that in 80% of all cases, Rally will choose an id within the most recent 10%, i.e. between 90 and 100. With 20% probability the id will be between 1 and 89. The closer ``recency`` gets to zero, the "flatter" the red curve gets and the more likely Rally will choose less recent ids.

Expand Down
3 changes: 3 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,16 @@ async def __call__(self, es, params):
is in the single digit microsecond range when this feature is disabled and in the single digit millisecond range when this feature
is enabled; numbers based on a bulk size of 500 elements and no errors). For details please refer to the respective benchmarks
in ``benchmarks/driver``.
* ``timeout``: a time unit value indicating the server-side timeout for the operation
* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to
``None`` and potentially falls back to the global timeout setting.
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)

bulk_params = {}
if "timeout" in params:
bulk_params["timeout"] = params["timeout"]
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]

Expand Down
38 changes: 38 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,44 @@ async def test_bulk_index_missing_params(self, es):
ctx.exception.args[0],
)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_bulk_index_success_with_timeout(self, es):
bulk_response = {
"errors": False,
"took": 8,
}
es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response)))

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body(
"action_meta_data",
"index_line",
"action_meta_data",
"index_line",
"action_meta_data",
"index_line",
),
"action-metadata-present": True,
"bulk-size": 3,
"unit": "docs",
"timeout": "1m",
}

result = await bulk(es, bulk_params)

self.assertEqual(8, result["took"])
self.assertIsNone(result["index"])
self.assertEqual(3, result["weight"])
self.assertEqual("docs", result["unit"])
self.assertEqual(True, result["success"])
self.assertEqual(0, result["error-count"])
self.assertFalse("error-type" in result)

es.bulk.assert_called_with(body=bulk_params["body"], params={"timeout": "1m"})

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_bulk_index_success_with_metadata(self, es):
Expand Down

0 comments on commit 6a16ec6

Please sign in to comment.