diff --git a/tests/__init__.py b/tests/__init__.py index cd971cc7c..8e1ab0021 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -29,22 +29,3 @@ def async_wrapper(*args, **kwargs): asyncio.run(t(*args, **kwargs), debug=True) return async_wrapper - - -def as_future(result=None, exception=None): - """ - - Helper to create a future that completes immediately either with a result or exceptionally. - - :param result: Regular result. - :param exception: Exceptional result. - :return: The corresponding future. - """ - f = asyncio.get_running_loop().create_future() - if exception and result: - raise AssertionError("Specify a result or an exception but not both") - if exception: - f.set_exception(exception) - else: - f.set_result(result) - return f diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index f7b86faf9..7ca8603f4 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -29,7 +29,7 @@ from esrally import config, exceptions, metrics, track from esrally.driver import driver, runner, scheduler from esrally.track import params -from tests import as_future, run_async +from tests import run_async class DriverTestParamSource: @@ -1369,7 +1369,7 @@ async def test_execute_schedule_in_throughput_mode(self, es): task_start = time.perf_counter() es.new_request_context.return_value = AsyncExecutorTests.StaticRequestTiming(task_start=task_start) - es.bulk.return_value = as_future(io.StringIO('{"errors": false, "took": 8}')) + es.bulk = mock.AsyncMock(return_value=io.StringIO('{"errors": false, "took": 8}')) params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource) test_track = track.Track(name="unittest", description="unittest track", indices=None, challenges=None) @@ -1566,8 +1566,8 @@ async def test_execute_schedule_runner_overrides_times(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_execute_schedule_throughput_throttled(self, es): - def perform_request(*args, **kwargs): - return as_future() + async def perform_request(*args, **kwargs): + return None es.init_request_context.return_value = {"request_start": 0, "request_end": 10} # as this method is called several times we need to return a fresh instance every time as the previous @@ -1635,7 +1635,7 @@ def perform_request(*args, **kwargs): @run_async async def test_cancel_execute_schedule(self, es): es.init_request_context.return_value = {"request_start": 0, "request_end": 10} - es.bulk.return_value = as_future(io.StringIO('{"errors": false, "took": 8}')) + es.bulk = mock.AsyncMock(return_value=io.StringIO('{"errors": false, "took": 8}')) params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource) test_track = track.Track(name="unittest", description="unittest track", indices=None, challenges=None) @@ -1742,8 +1742,7 @@ async def __call__(self): async def test_execute_single_no_return_value(self): es = None params = None - runner = mock.Mock() - runner.return_value = as_future() + runner = mock.AsyncMock() ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1755,8 +1754,7 @@ async def test_execute_single_no_return_value(self): async def test_execute_single_tuple(self): es = None params = None - runner = mock.Mock() - runner.return_value = as_future(result=(500, "MB")) + runner = mock.AsyncMock(return_value=(500, "MB")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1768,9 +1766,8 @@ async def test_execute_single_tuple(self): async def test_execute_single_dict(self): es = None params = None - runner = mock.Mock() - runner.return_value = as_future( - { + runner = mock.AsyncMock( + return_value={ "weight": 50, "unit": "docs", "some-custom-meta-data": "valid", @@ -1798,7 +1795,7 @@ async def test_execute_single_with_connection_error_always_aborts(self): es = None params = None # ES client uses pseudo-status "N/A" in this case... - runner = mock.Mock(side_effect=as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host", None))) + runner = mock.AsyncMock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host", None)) with self.assertRaises(exceptions.RallyAssertionError) as ctx: await driver.execute_single(self.context_managed(runner), es, params, on_error=on_error) @@ -1808,9 +1805,7 @@ async def test_execute_single_with_connection_error_always_aborts(self): async def test_execute_single_with_http_400_aborts_when_specified(self): es = None params = None - runner = mock.Mock( - side_effect=as_future(exception=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) - ) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) with self.assertRaises(exceptions.RallyAssertionError) as ctx: await driver.execute_single(self.context_managed(runner), es, params, on_error="abort") @@ -1823,9 +1818,7 @@ async def test_execute_single_with_http_400_aborts_when_specified(self): async def test_execute_single_with_http_400(self): es = None params = None - runner = mock.Mock( - side_effect=as_future(exception=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) - ) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1845,7 +1838,7 @@ async def test_execute_single_with_http_400(self): async def test_execute_single_with_http_413(self): es = None params = None - runner = mock.Mock(side_effect=as_future(exception=elasticsearch.NotFoundError(413, b"", b""))) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(413, b"", b"")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 03a40945b..578faf32f 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -27,7 +27,7 @@ from esrally import client, exceptions from esrally.driver import runner -from tests import as_future, run_async +from tests import run_async class BaseUnitTestContextManagerRunner: @@ -163,8 +163,7 @@ async def test_asserts_equal_succeeds(self): }, }, } - delegate = mock.MagicMock() - delegate.return_value = as_future(response) + delegate = mock.AsyncMock(return_value=response) r = runner.AssertingRunner(delegate) async with r: final_response = await r( @@ -191,8 +190,7 @@ async def test_asserts_equal_fails(self): }, }, } - delegate = mock.MagicMock() - delegate.return_value = as_future(response) + delegate = mock.AsyncMock(return_value=response) r = runner.AssertingRunner(delegate) with self.assertRaisesRegex( exceptions.RallyTaskAssertionError, r"Expected \[hits.hits.relation\] in \[test-task\] to be == \[eq\] but was \[gte\]." @@ -213,8 +211,7 @@ async def test_asserts_equal_fails(self): async def test_skips_asserts_for_non_dicts(self): es = None response = (1, "ops") - delegate = mock.MagicMock() - delegate.return_value = as_future(response) + delegate = mock.AsyncMock(return_value=response) r = runner.AssertingRunner(delegate) async with r: final_response = await r( @@ -361,7 +358,7 @@ async def test_bulk_index_missing_params(self, es): "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -391,7 +388,7 @@ async def test_bulk_index_success_with_timeout(self, es): "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -420,7 +417,7 @@ async def test_bulk_index_success_with_timeout(self, es): self.assertEqual(0, result["error-count"]) self.assertFalse("error-type" in result) - es.bulk.assert_called_with(body=bulk_params["body"], params={"timeout": "1m"}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={"timeout": "1m"}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -429,7 +426,7 @@ async def test_bulk_index_success_with_metadata(self, es): "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -457,7 +454,7 @@ async def test_bulk_index_success_with_metadata(self, es): self.assertEqual(0, result["error-count"]) self.assertFalse("error-type" in result) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -466,7 +463,7 @@ async def test_simple_bulk_with_timeout_and_headers(self, es): "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -495,7 +492,7 @@ async def test_simple_bulk_with_timeout_and_headers(self, es): self.assertEqual(0, result["error-count"]) self.assertFalse("error-type" in result) - es.bulk.assert_called_with( + es.bulk.assert_awaited_with( doc_type="_doc", params={}, body="index_line\nindex_line\nindex_line\n", @@ -512,7 +509,7 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, es): "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() bulk_params = { @@ -538,7 +535,7 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, es): self.assertEqual(0, result["error-count"]) self.assertFalse("error-type" in result) - es.bulk.assert_called_with(body=bulk_params["body"], index="test-index", doc_type="_doc", params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], index="test-index", doc_type="_doc", params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -547,7 +544,7 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, es "errors": False, "took": 8, } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() bulk_params = { @@ -572,7 +569,7 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, es self.assertEqual(0, result["error-count"]) self.assertFalse("error-type" in result) - es.bulk.assert_called_with(body=bulk_params["body"], index="test-index", doc_type=None, params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], index="test-index", doc_type=None, params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -587,7 +584,7 @@ async def test_bulk_index_error(self, es): ], } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -616,7 +613,7 @@ async def test_bulk_index_error(self, es): self.assertEqual(2, result["error-count"]) self.assertEqual("bulk", result["error-type"]) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -655,7 +652,7 @@ async def test_bulk_index_error_no_shards(self, es): ], } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() @@ -685,7 +682,7 @@ async def test_bulk_index_error_no_shards(self, es): self.assertEqual(3, result["error-count"]) self.assertEqual("bulk", result["error-type"]) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -747,7 +744,7 @@ async def test_mixed_bulk_with_simple_stats(self, es): }, ], } - es.bulk.return_value = as_future(io.StringIO(json.dumps(bulk_response))) + es.bulk = mock.AsyncMock(return_value=io.StringIO(json.dumps(bulk_response))) bulk = runner.BulkIndex() bulk_params = { @@ -779,13 +776,13 @@ async def test_mixed_bulk_with_simple_stats(self, es): self.assertEqual(2, result["error-count"]) self.assertEqual("bulk", result["error-type"]) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): - es.bulk.return_value = as_future( - { + es.bulk = mock.AsyncMock( + return_value={ "took": 30, "ingest_took": 20, "errors": True, @@ -921,17 +918,17 @@ async def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): self.assertEqual(582, result["bulk-request-size-bytes"]) self.assertEqual(234, result["total-document-size-bytes"]) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) - es.bulk.return_value.result().pop("ingest_took") + es.bulk.return_value.pop("ingest_took") result = await bulk(es, bulk_params) self.assertNotIn("ingest_took", result) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_simple_bulk_with_detailed_stats_body_as_list(self, es): - es.bulk.return_value = as_future( - { + es.bulk = mock.AsyncMock( + return_value={ "took": 30, "ingest_took": 20, "errors": False, @@ -993,17 +990,17 @@ async def test_simple_bulk_with_detailed_stats_body_as_list(self, es): self.assertEqual(93, result["bulk-request-size-bytes"]) self.assertEqual(39, result["total-document-size-bytes"]) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) - es.bulk.return_value.result().pop("ingest_took") + es.bulk.return_value.pop("ingest_took") result = await bulk(es, bulk_params) self.assertNotIn("ingest_took", result) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, es): - es.bulk.return_value = as_future( - { + es.bulk = mock.AsyncMock( + return_value={ "took": 30, "ingest_took": 20, "errors": False, @@ -1043,63 +1040,65 @@ async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, e with self.assertRaisesRegex(exceptions.DataError, "bulk body is neither string nor list"): await bulk(es, bulk_params) - es.bulk.assert_called_with(body=bulk_params["body"], params={}) + es.bulk.assert_awaited_with(body=bulk_params["body"], params={}) class ForceMergeRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_defaults(self, es): - es.indices.forcemerge.return_value = as_future() + es.indices.forcemerge = mock.AsyncMock() force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all"}) - es.indices.forcemerge.assert_called_once_with(index="_all") + es.indices.forcemerge.assert_awaited_once_with(index="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_timeout_and_headers(self, es): - es.indices.forcemerge.return_value = as_future() + es.indices.forcemerge = mock.AsyncMock() force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "opaque-id": "test-id", "request-timeout": 3.0, "headers": {"header1": "value1"}}) - es.indices.forcemerge.assert_called_once_with(headers={"header1": "value1"}, index="_all", opaque_id="test-id", request_timeout=3.0) + es.indices.forcemerge.assert_awaited_once_with( + headers={"header1": "value1"}, index="_all", opaque_id="test-id", request_timeout=3.0 + ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_override_request_timeout(self, es): - es.indices.forcemerge.return_value = as_future() + es.indices.forcemerge = mock.AsyncMock() force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "request-timeout": 50000}) - es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000) + es.indices.forcemerge.assert_awaited_once_with(index="_all", request_timeout=50000) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_params(self, es): - es.indices.forcemerge.return_value = as_future() + es.indices.forcemerge = mock.AsyncMock() force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "max-num-segments": 1, "request-timeout": 50000}) - es.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + es.indices.forcemerge.assert_awaited_with(index="_all", max_num_segments=1, request_timeout=50000) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_polling_no_timeout(self, es): - es.indices.forcemerge.return_value = as_future() + es.indices.forcemerge = mock.AsyncMock() force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) - es.indices.forcemerge.assert_called_once_with(index="_all") + es.indices.forcemerge.assert_awaited_once_with(index="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_polling(self, es): - es.indices.forcemerge.return_value = as_future(exception=elasticsearch.ConnectionTimeout()) - es.tasks.list.side_effect = [ - as_future( + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) + es.tasks.list = mock.AsyncMock( + side_effect=[ { "nodes": { "Ap3OfntPT7qL4CBeKvamxg": { @@ -1131,20 +1130,22 @@ async def test_force_merge_with_polling(self, es): }, } } - } - ), - as_future({"nodes": {}}), - ] + }, + { + "nodes": {}, + }, + ] + ) force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) - es.indices.forcemerge.assert_called_once_with(index="_all") + es.indices.forcemerge.assert_awaited_once_with(index="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_force_merge_with_polling_and_params(self, es): - es.indices.forcemerge.return_value = as_future(exception=elasticsearch.ConnectionTimeout()) - es.tasks.list.side_effect = [ - as_future( + es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout()) + es.tasks.list = mock.AsyncMock( + side_effect=[ { "nodes": { "Ap3OfntPT7qL4CBeKvamxg": { @@ -1176,10 +1177,12 @@ async def test_force_merge_with_polling_and_params(self, es): }, } } - } - ), - as_future({"nodes": {}}), - ] + }, + { + "nodes": {}, + }, + ] + ) force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling await force_merge( @@ -1192,26 +1195,26 @@ async def test_force_merge_with_polling_and_params(self, es): "poll-period": 0, }, ) - es.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000) class IndicesStatsRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_indices_stats_without_parameters(self, es): - es.indices.stats.return_value = as_future({}) + es.indices.stats = mock.AsyncMock(return_value={}) indices_stats = runner.IndicesStats() result = await indices_stats(es, params={}) self.assertEqual(1, result["weight"]) self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.indices.stats.assert_called_once_with(index="_all", metric="_all") + es.indices.stats.assert_awaited_once_with(index="_all", metric="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_indices_stats_with_timeout_and_headers(self, es): - es.indices.stats.return_value = as_future({}) + es.indices.stats = mock.AsyncMock(return_value={}) indices_stats = runner.IndicesStats() result = await indices_stats( es, @@ -1225,15 +1228,15 @@ async def test_indices_stats_with_timeout_and_headers(self, es): self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.indices.stats.assert_called_once_with( + es.indices.stats.assert_awaited_once_with( index="_all", metric="_all", headers={"header1": "value1"}, opaque_id="test-id1", request_timeout=3.0 ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_indices_stats_with_failed_condition(self, es): - es.indices.stats.return_value = as_future( - { + es.indices.stats = mock.AsyncMock( + return_value={ "_all": { "total": { "merges": { @@ -1262,13 +1265,13 @@ async def test_indices_stats_with_failed_condition(self, es): result["condition"], ) - es.indices.stats.assert_called_once_with(index="logs-*", metric="_all") + es.indices.stats.assert_awaited_once_with(index="logs-*", metric="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_indices_stats_with_successful_condition(self, es): - es.indices.stats.return_value = as_future( - { + es.indices.stats = mock.AsyncMock( + return_value={ "_all": { "total": { "merges": { @@ -1304,12 +1307,12 @@ async def test_indices_stats_with_successful_condition(self, es): result["condition"], ) - es.indices.stats.assert_called_once_with(index="logs-*", metric="_all") + es.indices.stats.assert_awaited_once_with(index="logs-*", metric="_all") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_indices_stats_with_non_existing_path(self, es): - es.indices.stats.return_value = as_future({"indices": {"total": {"docs": {"current": 0}}}}) + es.indices.stats = mock.AsyncMock(return_value={"indices": {"total": {"docs": {"current": 0}}}}) indices_stats = runner.IndicesStats() @@ -1336,7 +1339,7 @@ async def test_indices_stats_with_non_existing_path(self, es): result["condition"], ) - es.indices.stats.assert_called_once_with(index="logs-*", metric="_all") + es.indices.stats.assert_awaited_once_with(index="logs-*", metric="_all") class QueryRunnerTests(TestCase): @@ -1357,7 +1360,7 @@ async def test_query_match_only_request_body_defined(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) query_runner = runner.Query() @@ -1383,7 +1386,7 @@ async def test_query_match_only_request_body_defined(self, es): self.assertEqual(5, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={"request_cache": "true"}, body=params["body"], headers=None ) es.clear_scroll.assert_not_called() @@ -1402,7 +1405,7 @@ async def test_query_with_timeout_and_headers(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) query_runner = runner.Query() @@ -1427,7 +1430,7 @@ async def test_query_with_timeout_and_headers(self, es): self.assertEqual(5, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={"request_timeout": 3.0, "request_cache": "true"}, @@ -1453,7 +1456,7 @@ async def test_query_match_using_request_params(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(response))) query_runner = runner.Query() params = { @@ -1477,7 +1480,7 @@ async def test_query_match_using_request_params(self, es): self.assertEqual(62, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={ @@ -1506,7 +1509,7 @@ async def test_query_no_detailed_results(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(response))) query_runner = runner.Query() params = { @@ -1527,7 +1530,7 @@ async def test_query_no_detailed_results(self, es): self.assertNotIn("took", result) self.assertNotIn("error-type", result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={ @@ -1552,7 +1555,7 @@ async def test_query_hits_total_as_number(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) query_runner = runner.Query() @@ -1578,7 +1581,7 @@ async def test_query_hits_total_as_number(self, es): self.assertEqual(5, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={ @@ -1606,7 +1609,7 @@ async def test_query_match_all(self, es): ], }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) query_runner = runner.Query() @@ -1632,7 +1635,7 @@ async def test_query_match_all(self, es): self.assertEqual(5, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/unittest/_search", params={}, @@ -1658,7 +1661,7 @@ async def test_query_match_all_doc_type_fallback(self, es): }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) query_runner = runner.Query() @@ -1685,7 +1688,7 @@ async def test_query_match_all_doc_type_fallback(self, es): self.assertEqual(5, result["took"]) self.assertFalse("error-type" in result) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/unittest/type/_search", body=params["body"], @@ -1711,8 +1714,8 @@ async def test_scroll_query_only_one_page(self, es): }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) - es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}')) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) query_runner = runner.Query() @@ -1740,7 +1743,7 @@ async def test_scroll_query_only_one_page(self, es): self.assertFalse(results["timed_out"]) self.assertFalse("error-type" in results) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/unittest/_search", params={ @@ -1752,7 +1755,7 @@ async def test_scroll_query_only_one_page(self, es): body=params["body"], headers=None, ) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -1771,8 +1774,8 @@ async def test_scroll_query_no_request_cache(self, es): }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) - es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}')) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) query_runner = runner.Query() @@ -1800,14 +1803,14 @@ async def test_scroll_query_no_request_cache(self, es): self.assertFalse(results["timed_out"]) self.assertFalse("error-type" in results) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/unittest/_search", params={"sort": "_doc", "scroll": "10s", "size": 100}, body=params["body"], headers={"Accept-Encoding": "identity"}, ) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -1826,8 +1829,8 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es): }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) - es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}')) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) query_runner = runner.Query() @@ -1854,7 +1857,7 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es): self.assertFalse(results["timed_out"]) self.assertFalse("error-type" in results) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "GET", "/_all/_search", params={ @@ -1866,7 +1869,7 @@ async def test_scroll_query_only_one_page_only_request_body_defined(self, es): headers=None, ) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -1901,12 +1904,14 @@ async def test_scroll_query_with_explicit_number_of_pages(self, es): }, } - es.transport.perform_request.side_effect = [ - as_future(io.StringIO(json.dumps(search_response))), - as_future(io.StringIO(json.dumps(scroll_response))), - ] + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + io.StringIO(json.dumps(search_response)), + io.StringIO(json.dumps(scroll_response)), + ] + ) - es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}')) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) query_runner = runner.Query() @@ -1934,7 +1939,7 @@ async def test_scroll_query_with_explicit_number_of_pages(self, es): self.assertTrue(results["timed_out"]) self.assertFalse("error-type" in results) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -1952,8 +1957,8 @@ async def test_scroll_query_cannot_clear_scroll(self, es): }, } - es.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) - es.clear_scroll.return_value = as_future(exception=elasticsearch.ConnectionTimeout()) + es.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + es.clear_scroll = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) query_runner = runner.Query() @@ -1980,7 +1985,7 @@ async def test_scroll_query_cannot_clear_scroll(self, es): self.assertEqual(53, results["took"]) self.assertFalse("error-type" in results) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -2011,11 +2016,13 @@ async def test_scroll_query_request_all_pages(self, es): }, } - es.transport.perform_request.side_effect = [ - as_future(io.StringIO(json.dumps(search_response))), - as_future(io.StringIO(json.dumps(scroll_response))), - ] - es.clear_scroll.return_value = as_future(io.StringIO('{"acknowledged": true}')) + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + io.StringIO(json.dumps(search_response)), + io.StringIO(json.dumps(scroll_response)), + ] + ) + es.clear_scroll = mock.AsyncMock(return_value=io.StringIO('{"acknowledged": true}')) query_runner = runner.Query() @@ -2043,14 +2050,14 @@ async def test_scroll_query_request_all_pages(self, es): self.assertFalse(results["timed_out"]) self.assertFalse("error-type" in results) - es.clear_scroll.assert_called_once_with(body={"scroll_id": ["some-scroll-id"]}) + es.clear_scroll.assert_awaited_once_with(body={"scroll_id": ["some-scroll-id"]}) class PutPipelineRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_pipeline(self, es): - es.ingest.put_pipeline.return_value = as_future() + es.ingest.put_pipeline = mock.AsyncMock() r = runner.PutPipeline() @@ -2068,12 +2075,12 @@ async def test_create_pipeline(self, es): await r(es, params) - es.ingest.put_pipeline.assert_called_once_with(id="rename", body=params["body"], master_timeout=None, timeout=None) + es.ingest.put_pipeline.assert_awaited_once_with(id="rename", body=params["body"], master_timeout=None, timeout=None) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_body_mandatory(self, es): - es.ingest.put_pipeline.return_value = as_future() + es.ingest.put_pipeline = mock.AsyncMock() r = runner.PutPipeline() @@ -2085,12 +2092,12 @@ async def test_param_body_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.ingest.put_pipeline.call_count) + self.assertEqual(0, es.ingest.put_pipeline.await_count) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_id_mandatory(self, es): - es.ingest.put_pipeline.return_value = as_future() + es.ingest.put_pipeline = mock.AsyncMock() r = runner.PutPipeline() @@ -2102,14 +2109,14 @@ async def test_param_id_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.ingest.put_pipeline.call_count) + self.assertEqual(0, es.ingest.put_pipeline.await_count) class ClusterHealthRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_waits_for_expected_cluster_status(self, es): - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) r = runner.ClusterHealth() params = {"request-params": {"wait_for_status": "green"}} @@ -2127,12 +2134,12 @@ async def test_waits_for_expected_cluster_status(self, es): result, ) - es.cluster.health.assert_called_once_with(params={"wait_for_status": "green"}) + es.cluster.health.assert_awaited_once_with(params={"wait_for_status": "green"}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_accepts_better_cluster_status(self, es): - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) r = runner.ClusterHealth() params = {"request-params": {"wait_for_status": "yellow"}} @@ -2150,12 +2157,12 @@ async def test_accepts_better_cluster_status(self, es): result, ) - es.cluster.health.assert_called_once_with(params={"wait_for_status": "yellow"}) + es.cluster.health.assert_awaited_once_with(params={"wait_for_status": "yellow"}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_cluster_health_with_timeout_and_headers(self, es): - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) cluster_health_runner = runner.ClusterHealth() params = { @@ -2178,14 +2185,14 @@ async def test_cluster_health_with_timeout_and_headers(self, es): result, ) - es.cluster.health.assert_called_once_with( + es.cluster.health.assert_awaited_once_with( headers={"header1": "value1"}, opaque_id="testid-1", params={"wait_for_status": "yellow"}, request_timeout=3.0 ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_rejects_relocating_shards(self, es): - es.cluster.health.return_value = as_future({"status": "yellow", "relocating_shards": 3}) + es.cluster.health = mock.AsyncMock(return_value={"status": "yellow", "relocating_shards": 3}) r = runner.ClusterHealth() params = { @@ -2209,12 +2216,12 @@ async def test_rejects_relocating_shards(self, es): result, ) - es.cluster.health.assert_called_once_with(index="logs-*", params={"wait_for_status": "red", "wait_for_no_relocating_shards": True}) + es.cluster.health.assert_awaited_once_with(index="logs-*", params={"wait_for_status": "red", "wait_for_no_relocating_shards": True}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_rejects_unknown_cluster_status(self, es): - es.cluster.health.return_value = as_future({"status": None, "relocating_shards": 0}) + es.cluster.health = mock.AsyncMock(return_value={"status": None, "relocating_shards": 0}) r = runner.ClusterHealth() params = {"request-params": {"wait_for_status": "green"}} @@ -2232,14 +2239,14 @@ async def test_rejects_unknown_cluster_status(self, es): result, ) - es.cluster.health.assert_called_once_with(params={"wait_for_status": "green"}) + es.cluster.health.assert_awaited_once_with(params={"wait_for_status": "green"}) class CreateIndexRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_creates_multiple_indices(self, es): - es.indices.create.return_value = as_future() + es.indices.create = mock.AsyncMock() r = runner.CreateIndex() @@ -2257,7 +2264,7 @@ async def test_creates_multiple_indices(self, es): self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.indices.create.assert_has_calls( + es.indices.create.assert_has_awaits( [ mock.call(index="indexA", body={"settings": {}}, params=request_params), mock.call(index="indexB", body={"settings": {}}, params=request_params), @@ -2267,7 +2274,7 @@ async def test_creates_multiple_indices(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_with_timeout_and_headers(self, es): - es.indices.create.return_value = as_future() + es.indices.create = mock.AsyncMock() create_index_runner = runner.CreateIndex() @@ -2287,7 +2294,7 @@ async def test_create_with_timeout_and_headers(self, es): self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.indices.create.assert_called_once_with( + es.indices.create.assert_awaited_once_with( index="indexA", body={"settings": {}}, headers={"header1": "value1"}, @@ -2299,7 +2306,7 @@ async def test_create_with_timeout_and_headers(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_ignore_invalid_params(self, es): - es.indices.create.return_value = as_future() + es.indices.create = mock.AsyncMock() r = runner.CreateIndex() @@ -2318,12 +2325,12 @@ async def test_ignore_invalid_params(self, es): self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.indices.create.assert_called_once_with(index="indexA", body={"settings": {}}, params={"wait_for_active_shards": "true"}) + es.indices.create.assert_awaited_once_with(index="indexA", body={"settings": {}}, params={"wait_for_active_shards": "true"}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_indices_mandatory(self, es): - es.indices.create.return_value = as_future() + es.indices.create = mock.AsyncMock() r = runner.CreateIndex() @@ -2335,14 +2342,14 @@ async def test_param_indices_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.create.call_count) + self.assertEqual(0, es.indices.create.await_count) class CreateDataStreamRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_creates_multiple_data_streams(self, es): - es.indices.create_data_stream.return_value = as_future() + es.indices.create_data_stream = mock.AsyncMock() r = runner.CreateDataStream() @@ -2360,7 +2367,7 @@ async def test_creates_multiple_data_streams(self, es): self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.indices.create_data_stream.assert_has_calls( + es.indices.create_data_stream.assert_has_awaits( [ mock.call("data-stream-A", params=request_params), mock.call("data-stream-B", params=request_params), @@ -2370,7 +2377,7 @@ async def test_creates_multiple_data_streams(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_data_streams_mandatory(self, es): - es.indices.create_data_stream.return_value = as_future() + es.indices.create_data_stream = mock.AsyncMock() r = runner.CreateDataStream() @@ -2382,17 +2389,17 @@ async def test_param_data_streams_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.create_data_stream.call_count) + self.assertEqual(0, es.indices.create_data_stream.await_count) class DeleteIndexRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_existing_indices(self, es): - es.indices.exists.side_effect = [as_future(False), as_future(True)] - es.indices.delete.return_value = as_future() - es.cluster.get_settings.return_value = as_future({"persistent": {}, "transient": {"action.destructive_requires_name": True}}) - es.cluster.put_settings.return_value = as_future() + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) + es.cluster.put_settings = mock.AsyncMock() r = runner.DeleteIndex() params = {"indices": ["indexA", "indexB"], "only-if-exists": True} @@ -2401,20 +2408,20 @@ async def test_deletes_existing_indices(self, es): self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.cluster.put_settings.assert_has_calls( + es.cluster.put_settings.assert_has_awaits( [ mock.call(body={"transient": {"action.destructive_requires_name": False}}), mock.call(body={"transient": {"action.destructive_requires_name": True}}), ] ) - es.indices.delete.assert_called_once_with(index="indexB", params={}) + es.indices.delete.assert_awaited_once_with(index="indexB", params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_all_indices(self, es): - es.indices.delete.return_value = as_future() - es.cluster.get_settings.return_value = as_future({"persistent": {}, "transient": {}}) - es.cluster.put_settings.return_value = as_future() + es.indices.delete = mock.AsyncMock() + es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {}}) + es.cluster.put_settings = mock.AsyncMock() r = runner.DeleteIndex() params = { @@ -2427,14 +2434,17 @@ async def test_deletes_all_indices(self, es): self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.cluster.put_settings.assert_has_calls( + es.cluster.put_settings.assert_has_awaits( [ mock.call(body={"transient": {"action.destructive_requires_name": False}}), mock.call(body={"transient": {"action.destructive_requires_name": None}}), ] ) - es.indices.delete.assert_has_calls( - [mock.call(index="indexA", params=params["request-params"]), mock.call(index="indexB", params=params["request-params"])] + es.indices.delete.assert_has_awaits( + [ + mock.call(index="indexA", params=params["request-params"]), + mock.call(index="indexB", params=params["request-params"]), + ] ) self.assertEqual(0, es.indices.exists.call_count) @@ -2443,8 +2453,8 @@ class DeleteDataStreamRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_existing_data_streams(self, es): - es.indices.exists.side_effect = [as_future(False), as_future(True)] - es.indices.delete_data_stream.return_value = as_future() + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete_data_stream = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2454,12 +2464,13 @@ async def test_deletes_existing_data_streams(self, es): self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.indices.delete_data_stream.assert_called_once_with("data-stream-B", params={}) + es.indices.delete_data_stream.assert_awaited_once_with("data-stream-B", params={}) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_all_data_streams(self, es): - es.indices.delete_data_stream.return_value = as_future() + es.indices.delete_data_stream = mock.AsyncMock() + es.indices.exists = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2473,20 +2484,20 @@ async def test_deletes_all_data_streams(self, es): self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.indices.delete_data_stream.assert_has_calls( + es.indices.delete_data_stream.assert_has_awaits( [ mock.call("data-stream-A", ignore=[404], params=params["request-params"]), mock.call("data-stream-B", ignore=[404], params=params["request-params"]), ] ) - self.assertEqual(0, es.indices.exists.call_count) + self.assertEqual(0, es.indices.exists.await_count) class CreateIndexTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_index_templates(self, es): - es.indices.put_template.return_value = as_future() + es.indices.put_template = mock.AsyncMock() r = runner.CreateIndexTemplate() @@ -2502,7 +2513,7 @@ async def test_create_index_templates(self, es): self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.indices.put_template.assert_has_calls( + es.indices.put_template.assert_has_awaits( [ mock.call(name="templateA", body={"settings": {}}, params=params["request-params"]), mock.call(name="templateB", body={"settings": {}}, params=params["request-params"]), @@ -2512,7 +2523,7 @@ async def test_create_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_templates_mandatory(self, es): - es.indices.put_template.return_value = as_future() + es.indices.put_template = mock.AsyncMock() r = runner.CreateIndexTemplate() @@ -2524,15 +2535,15 @@ async def test_param_templates_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.put_template.call_count) + self.assertEqual(0, es.indices.put_template.await_count) class DeleteIndexTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_all_index_templates(self, es): - es.indices.delete_template.return_value = as_future() - es.indices.delete.return_value = as_future() + es.indices.delete_template = mock.AsyncMock() + es.indices.delete = mock.AsyncMock() r = runner.DeleteIndexTemplate() @@ -2548,16 +2559,17 @@ async def test_deletes_all_index_templates(self, es): # 2 times delete index template, one time delete matching indices self.assertDictEqual({"weight": 3, "unit": "ops", "success": True}, result) - es.indices.delete_template.assert_has_calls( + es.indices.delete_template.assert_has_awaits( [mock.call(name="templateA", params=params["request-params"]), mock.call(name="templateB", params=params["request-params"])] ) - es.indices.delete.assert_called_once_with(index="logs-*") + es.indices.delete.assert_awaited_once_with(index="logs-*") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_template.side_effect = [as_future(False), as_future(True)] - es.indices.delete_template.return_value = as_future() + es.indices.exists_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete_template = mock.AsyncMock() + es.indices.delete = mock.AsyncMock() r = runner.DeleteIndexTemplate() @@ -2575,13 +2587,14 @@ async def test_deletes_only_existing_index_templates(self, es): # 2 times delete index template, one time delete matching indices self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.indices.delete_template.assert_called_once_with(name="templateB", params=params["request-params"]) + es.indices.delete_template.assert_awaited_once_with(name="templateB", params=params["request-params"]) # not called because the matching index is empty. - self.assertEqual(0, es.indices.delete.call_count) + self.assertEqual(0, es.indices.delete.await_count) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_templates_mandatory(self, es): + es.indices.delete_template = mock.AsyncMock() r = runner.DeleteIndexTemplate() params = {} @@ -2592,14 +2605,14 @@ async def test_param_templates_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.delete_template.call_count) + self.assertEqual(0, es.indices.delete_template.await_count) class CreateComponentTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_index_templates(self, es): - es.cluster.put_component_template.return_value = as_future() + es.cluster.put_component_template = mock.AsyncMock() r = runner.CreateComponentTemplate() params = { "templates": [ @@ -2611,7 +2624,7 @@ async def test_create_index_templates(self, es): result = await r(es, params) self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.cluster.put_component_template.assert_has_calls( + es.cluster.put_component_template.assert_has_awaits( [ mock.call( name="templateA", @@ -2629,7 +2642,7 @@ async def test_create_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_templates_mandatory(self, es): - es.cluster.put_component_template.return_value = as_future() + es.cluster.put_component_template = mock.AsyncMock() r = runner.CreateComponentTemplate() @@ -2641,15 +2654,14 @@ async def test_param_templates_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.cluster.put_component_template.call_count) + self.assertEqual(0, es.cluster.put_component_template.await_count) class DeleteComponentTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_all_index_templates(self, es): - es.cluster.delete_component_template.return_value = as_future() - es.cluster.delete_component_template.return_value = as_future() + es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -2664,7 +2676,7 @@ async def test_deletes_all_index_templates(self, es): result = await r(es, params) self.assertDictEqual({"weight": 2, "unit": "ops", "success": True}, result) - es.cluster.delete_component_template.assert_has_calls( + es.cluster.delete_component_template.assert_has_awaits( [ mock.call(name="templateA", params=params["request-params"], ignore=[404]), mock.call(name="templateB", params=params["request-params"], ignore=[404]), @@ -2674,13 +2686,12 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_only_existing_index_templates(self, es): - def _side_effect(http_method, path): + async def _side_effect(http_method, path): if http_method == "HEAD": - return as_future(path == "/_component_template/templateB") - return as_future() + return path == "/_component_template/templateB" - es.transport.perform_request.side_effect = _side_effect - es.cluster.delete_component_template.return_value = as_future() + es.transport.perform_request = mock.AsyncMock(side_effect=_side_effect) + es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -2696,11 +2707,12 @@ def _side_effect(http_method, path): self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.cluster.delete_component_template.assert_called_once_with(name="templateB", params=params["request-params"]) + es.cluster.delete_component_template.assert_awaited_once_with(name="templateB", params=params["request-params"]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_templates_mandatory(self, es): + es.indices.delete_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() params = {} @@ -2711,14 +2723,14 @@ async def test_param_templates_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.delete_template.call_count) + self.assertEqual(0, es.indices.delete_template.await_count) class CreateComposableTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_index_templates(self, es): - es.indices.put_index_template.return_value = as_future() + es.indices.put_index_template = mock.AsyncMock() r = runner.CreateComposableTemplate() params = { "templates": [ @@ -2751,7 +2763,7 @@ async def test_create_index_templates(self, es): }, result, ) - es.indices.put_index_template.assert_has_calls( + es.indices.put_index_template.assert_has_awaits( [ mock.call( name="templateA", @@ -2785,7 +2797,7 @@ async def test_create_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_param_templates_mandatory(self, es): - es.indices.put_index_template.return_value = as_future() + es.indices.put_index_template = mock.AsyncMock() r = runner.CreateComposableTemplate() @@ -2797,15 +2809,15 @@ async def test_param_templates_mandatory(self, es): ): await r(es, params) - self.assertEqual(0, es.indices.put_index_template.call_count) + self.assertEqual(0, es.indices.put_index_template.await_count) class DeleteComposableTemplateRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_all_index_templates(self, es): - es.indices.delete_index_template.return_value = as_future() - es.indices.delete.return_value = as_future() + es.indices.delete_index_template = mock.AsyncMock() + es.indices.delete = mock.AsyncMock() r = runner.DeleteComposableTemplate() @@ -2822,19 +2834,19 @@ async def test_deletes_all_index_templates(self, es): # 2 times delete index template, one time delete matching indices self.assertDictEqual({"weight": 3, "unit": "ops", "success": True}, result) - es.indices.delete_index_template.assert_has_calls( + es.indices.delete_index_template.assert_has_awaits( [ mock.call(name="templateA", params=params["request-params"], ignore=[404]), mock.call(name="templateB", params=params["request-params"], ignore=[404]), ] ) - es.indices.delete.assert_called_once_with(index="logs-*") + es.indices.delete.assert_awaited_once_with(index="logs-*") @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_index_template.side_effect = [as_future(False), as_future(True)] - es.indices.delete_index_template.return_value = as_future() + es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.delete_index_template = mock.AsyncMock() r = runner.DeleteComposableTemplate() @@ -2852,7 +2864,7 @@ async def test_deletes_only_existing_index_templates(self, es): # 2 times delete index template, one time delete matching indices self.assertDictEqual({"weight": 1, "unit": "ops", "success": True}, result) - es.indices.delete_index_template.assert_called_once_with(name="templateB", params=params["request-params"]) + es.indices.delete_index_template.assert_awaited_once_with(name="templateB", params=params["request-params"]) # not called because the matching index is empty. self.assertEqual(0, es.indices.delete.call_count) @@ -2876,20 +2888,20 @@ class CreateMlDatafeedTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ml_datafeed(self, es): - es.xpack.ml.put_datafeed.return_value = as_future() + es.xpack.ml.put_datafeed = mock.AsyncMock() params = {"datafeed-id": "some-data-feed", "body": {"job_id": "total-requests", "indices": ["server-metrics"]}} r = runner.CreateMlDatafeed() await r(es, params) - es.xpack.ml.put_datafeed.assert_called_once_with(datafeed_id=params["datafeed-id"], body=params["body"]) + es.xpack.ml.put_datafeed.assert_awaited_once_with(datafeed_id=params["datafeed-id"], body=params["body"]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ml_datafeed_fallback(self, es): - es.xpack.ml.put_datafeed.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.put_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" body = {"job_id": "total-requests", "indices": ["server-metrics"]} params = {"datafeed-id": datafeed_id, "body": body} @@ -2897,14 +2909,14 @@ async def test_create_ml_datafeed_fallback(self, es): r = runner.CreateMlDatafeed() await r(es, params) - es.transport.perform_request.assert_called_once_with("PUT", f"/_xpack/ml/datafeeds/{datafeed_id}", body=body) + es.transport.perform_request.assert_awaited_once_with("PUT", f"/_xpack/ml/datafeeds/{datafeed_id}", body=body) class DeleteMlDatafeedTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ml_datafeed(self, es): - es.xpack.ml.delete_datafeed.return_value = as_future() + es.xpack.ml.delete_datafeed = mock.AsyncMock() datafeed_id = "some-data-feed" params = {"datafeed-id": datafeed_id} @@ -2912,13 +2924,13 @@ async def test_delete_ml_datafeed(self, es): r = runner.DeleteMlDatafeed() await r(es, params) - es.xpack.ml.delete_datafeed.assert_called_once_with(datafeed_id=datafeed_id, force=False, ignore=[404]) + es.xpack.ml.delete_datafeed.assert_awaited_once_with(datafeed_id=datafeed_id, force=False, ignore=[404]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ml_datafeed_fallback(self, es): - es.xpack.ml.delete_datafeed.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.delete_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" params = { "datafeed-id": datafeed_id, @@ -2927,7 +2939,7 @@ async def test_delete_ml_datafeed_fallback(self, es): r = runner.DeleteMlDatafeed() await r(es, params) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "DELETE", f"/_xpack/ml/datafeeds/{datafeed_id}", params={"force": "false", "ignore": 404} ) @@ -2936,33 +2948,33 @@ class StartMlDatafeedTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_start_ml_datafeed_with_body(self, es): - es.xpack.ml.start_datafeed.return_value = as_future() + es.xpack.ml.start_datafeed = mock.AsyncMock() params = {"datafeed-id": "some-data-feed", "body": {"end": "now"}} r = runner.StartMlDatafeed() await r(es, params) - es.xpack.ml.start_datafeed.assert_called_once_with( + es.xpack.ml.start_datafeed.assert_awaited_once_with( datafeed_id=params["datafeed-id"], body=params["body"], start=None, end=None, timeout=None ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_start_ml_datafeed_with_body_fallback(self, es): - es.xpack.ml.start_datafeed.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.start_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() body = {"end": "now"} params = {"datafeed-id": "some-data-feed", "body": body} r = runner.StartMlDatafeed() await r(es, params) - es.transport.perform_request.assert_called_once_with("POST", f"/_xpack/ml/datafeeds/{params['datafeed-id']}/_start", body=body) + es.transport.perform_request.assert_awaited_once_with("POST", f"/_xpack/ml/datafeeds/{params['datafeed-id']}/_start", body=body) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_start_ml_datafeed_with_params(self, es): - es.xpack.ml.start_datafeed.return_value = as_future() + es.xpack.ml.start_datafeed = mock.AsyncMock() params = { "datafeed-id": "some-data-feed", "start": "2017-01-01T01:00:00Z", @@ -2973,7 +2985,7 @@ async def test_start_ml_datafeed_with_params(self, es): r = runner.StartMlDatafeed() await r(es, params) - es.xpack.ml.start_datafeed.assert_called_once_with( + es.xpack.ml.start_datafeed.assert_awaited_once_with( datafeed_id=params["datafeed-id"], body=None, start=params["start"], end=params["end"], timeout=params["timeout"] ) @@ -2982,7 +2994,7 @@ class StopMlDatafeedTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_stop_ml_datafeed(self, es): - es.xpack.ml.stop_datafeed.return_value = as_future() + es.xpack.ml.stop_datafeed = mock.AsyncMock() params = { "datafeed-id": "some-data-feed", "force": random.choice([False, True]), @@ -2992,15 +3004,15 @@ async def test_stop_ml_datafeed(self, es): r = runner.StopMlDatafeed() await r(es, params) - es.xpack.ml.stop_datafeed.assert_called_once_with( + es.xpack.ml.stop_datafeed.assert_awaited_once_with( datafeed_id=params["datafeed-id"], force=params["force"], timeout=params["timeout"] ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_stop_ml_datafeed_fallback(self, es): - es.xpack.ml.stop_datafeed.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.stop_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() params = { "datafeed-id": "some-data-feed", @@ -3011,7 +3023,7 @@ async def test_stop_ml_datafeed_fallback(self, es): r = runner.StopMlDatafeed() await r(es, params) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "POST", f"/_xpack/ml/datafeeds/{params['datafeed-id']}/_stop", params={"force": str(params["force"]).lower(), "timeout": params["timeout"]}, @@ -3022,7 +3034,7 @@ class CreateMlJobTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ml_job(self, es): - es.xpack.ml.put_job.return_value = as_future() + es.xpack.ml.put_job = mock.AsyncMock() params = { "job-id": "an-ml-job", @@ -3045,13 +3057,13 @@ async def test_create_ml_job(self, es): r = runner.CreateMlJob() await r(es, params) - es.xpack.ml.put_job.assert_called_once_with(job_id=params["job-id"], body=params["body"]) + es.xpack.ml.put_job.assert_awaited_once_with(job_id=params["job-id"], body=params["body"]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ml_job_fallback(self, es): - es.xpack.ml.put_job.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.put_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() body = { "description": "Total sum of requests", @@ -3072,14 +3084,14 @@ async def test_create_ml_job_fallback(self, es): r = runner.CreateMlJob() await r(es, params) - es.transport.perform_request.assert_called_once_with("PUT", f"/_xpack/ml/anomaly_detectors/{params['job-id']}", body=body) + es.transport.perform_request.assert_awaited_once_with("PUT", f"/_xpack/ml/anomaly_detectors/{params['job-id']}", body=body) class DeleteMlJobTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ml_job(self, es): - es.xpack.ml.delete_job.return_value = as_future() + es.xpack.ml.delete_job = mock.AsyncMock() job_id = "an-ml-job" params = {"job-id": job_id} @@ -3087,13 +3099,13 @@ async def test_delete_ml_job(self, es): r = runner.DeleteMlJob() await r(es, params) - es.xpack.ml.delete_job.assert_called_once_with(job_id=job_id, force=False, ignore=[404]) + es.xpack.ml.delete_job.assert_awaited_once_with(job_id=job_id, force=False, ignore=[404]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ml_job_fallback(self, es): - es.xpack.ml.delete_job.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.delete_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() job_id = "an-ml-job" params = {"job-id": job_id} @@ -3101,7 +3113,7 @@ async def test_delete_ml_job_fallback(self, es): r = runner.DeleteMlJob() await r(es, params) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "DELETE", f"/_xpack/ml/anomaly_detectors/{params['job-id']}", params={"force": "false", "ignore": 404} ) @@ -3110,7 +3122,7 @@ class OpenMlJobTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_open_ml_job(self, es): - es.xpack.ml.open_job.return_value = as_future() + es.xpack.ml.open_job = mock.AsyncMock() job_id = "an-ml-job" params = {"job-id": job_id} @@ -3118,13 +3130,13 @@ async def test_open_ml_job(self, es): r = runner.OpenMlJob() await r(es, params) - es.xpack.ml.open_job.assert_called_once_with(job_id=job_id) + es.xpack.ml.open_job.assert_awaited_once_with(job_id=job_id) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_open_ml_job_fallback(self, es): - es.xpack.ml.open_job.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.open_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() job_id = "an-ml-job" params = {"job-id": job_id} @@ -3132,14 +3144,14 @@ async def test_open_ml_job_fallback(self, es): r = runner.OpenMlJob() await r(es, params) - es.transport.perform_request.assert_called_once_with("POST", f"/_xpack/ml/anomaly_detectors/{params['job-id']}/_open") + es.transport.perform_request.assert_awaited_once_with("POST", f"/_xpack/ml/anomaly_detectors/{params['job-id']}/_open") class CloseMlJobTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_close_ml_job(self, es): - es.xpack.ml.close_job.return_value = as_future() + es.xpack.ml.close_job = mock.AsyncMock() params = { "job-id": "an-ml-job", "force": random.choice([False, True]), @@ -3149,13 +3161,13 @@ async def test_close_ml_job(self, es): r = runner.CloseMlJob() await r(es, params) - es.xpack.ml.close_job.assert_called_once_with(job_id=params["job-id"], force=params["force"], timeout=params["timeout"]) + es.xpack.ml.close_job.assert_awaited_once_with(job_id=params["job-id"], force=params["force"], timeout=params["timeout"]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_close_ml_job_fallback(self, es): - es.xpack.ml.close_job.side_effect = as_future(exception=elasticsearch.TransportError(400, "Bad Request")) - es.transport.perform_request.return_value = as_future() + es.xpack.ml.close_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + es.transport.perform_request = mock.AsyncMock() params = { "job-id": "an-ml-job", @@ -3166,7 +3178,7 @@ async def test_close_ml_job_fallback(self, es): r = runner.CloseMlJob() await r(es, params) - es.transport.perform_request.assert_called_once_with( + es.transport.perform_request.assert_awaited_once_with( "POST", f"/_xpack/ml/anomaly_detectors/{params['job-id']}/_close", params={"force": str(params["force"]).lower(), "timeout": params["timeout"]}, @@ -3177,7 +3189,7 @@ class RawRequestRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_raises_missing_slash(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = {"path": "_cat/count"} @@ -3193,7 +3205,7 @@ async def test_raises_missing_slash(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_issue_request_with_defaults(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = {"path": "/_cat/count"} @@ -3204,7 +3216,7 @@ async def test_issue_request_with_defaults(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_issue_delete_index(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = { @@ -3224,7 +3236,7 @@ async def test_issue_delete_index(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_issue_create_index(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = { @@ -3247,7 +3259,7 @@ async def test_issue_create_index(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_issue_msearch(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = { @@ -3278,7 +3290,7 @@ async def test_issue_msearch(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_raw_with_timeout_and_opaqueid(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() r = runner.RawRequest() params = { @@ -3346,7 +3358,7 @@ class DeleteSnapshotRepositoryTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_snapshot_repository(self, es): - es.snapshot.delete_repository.return_value = as_future() + es.snapshot.delete_repository = mock.AsyncMock() params = {"repository": "backups"} r = runner.DeleteSnapshotRepository() @@ -3359,7 +3371,7 @@ class CreateSnapshotRepositoryTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_snapshot_repository(self, es): - es.snapshot.create_repository.return_value = as_future() + es.snapshot.create_repository = mock.AsyncMock() params = { "repository": "backups", "body": { @@ -3382,7 +3394,7 @@ class CreateSnapshotTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_snapshot_no_wait(self, es): - es.snapshot.create.return_value = as_future({}) + es.snapshot.create = mock.AsyncMock(return_value={}) params = { "repository": "backups", @@ -3395,7 +3407,7 @@ async def test_create_snapshot_no_wait(self, es): r = runner.CreateSnapshot() await r(es, params) - es.snapshot.create.assert_called_once_with( + es.snapshot.create.assert_awaited_once_with( repository="backups", snapshot="snapshot-001", body={"indices": "logs-*"}, @@ -3406,8 +3418,8 @@ async def test_create_snapshot_no_wait(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_snapshot_wait_for_completion(self, es): - es.snapshot.create.return_value = as_future( - { + es.snapshot.create = mock.AsyncMock( + return_value={ "snapshot": { "snapshot": "snapshot-001", "uuid": "wjt6zFEIRua_-jutT5vrAw", @@ -3438,7 +3450,7 @@ async def test_create_snapshot_wait_for_completion(self, es): r = runner.CreateSnapshot() await r(es, params) - es.snapshot.create.assert_called_once_with( + es.snapshot.create.assert_awaited_once_with( repository="backups", snapshot="snapshot-001", body={"indices": "logs-*"}, @@ -3451,11 +3463,11 @@ class WaitForSnapshotCreateTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_snapshot_create_entire_lifecycle(self, es): - es.snapshot.status.side_effect = [ - # empty response - as_future({}), - # active snapshot - as_future( + es.snapshot.status = mock.AsyncMock( + side_effect=[ + # empty response + {}, + # active snapshot { "snapshots": [ { @@ -3493,10 +3505,8 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): }, } ] - } - ), - # completed - as_future( + }, + # completed { "snapshots": [ { @@ -3530,9 +3540,9 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): }, } ] - } - ), - ] + }, + ] + ) basic_params = { "repository": "restore_speed", @@ -3543,7 +3553,7 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): r = runner.WaitForSnapshotCreate() result = await r(es, basic_params) - es.snapshot.status.assert_called_with(repository="restore_speed", snapshot="restore_speed_snapshot", ignore_unavailable=True) + es.snapshot.status.assert_awaited_with(repository="restore_speed", snapshot="restore_speed_snapshot", ignore_unavailable=True) self.assertDictEqual( { @@ -3559,13 +3569,13 @@ async def test_wait_for_snapshot_create_entire_lifecycle(self, es): result, ) - self.assertEqual(3, es.snapshot.status.call_count) + self.assertEqual(3, es.snapshot.status.await_count) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_snapshot_create_immediate_success(self, es): - es.snapshot.status.return_value = as_future( - { + es.snapshot.status = mock.AsyncMock( + return_value={ "snapshots": [ { "snapshot": "snapshot-001", @@ -3608,7 +3618,7 @@ async def test_wait_for_snapshot_create_immediate_success(self, es): result, ) - es.snapshot.status.assert_called_once_with(repository="backups", snapshot="snapshot-001", ignore_unavailable=True) + es.snapshot.status.assert_awaited_once_with(repository="backups", snapshot="snapshot-001", ignore_unavailable=True) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -3623,7 +3633,7 @@ async def test_wait_for_snapshot_create_failure(self, es): }, ] } - es.snapshot.status.return_value = as_future(snapshot_status) + es.snapshot.status = mock.AsyncMock(return_value=snapshot_status) params = { "repository": "backups", @@ -3645,7 +3655,7 @@ class RestoreSnapshotTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_restore_snapshot(self, es): - es.snapshot.restore.return_value = as_future() + es.snapshot.restore = mock.AsyncMock() params = { "repository": "backups", @@ -3657,14 +3667,14 @@ async def test_restore_snapshot(self, es): r = runner.RestoreSnapshot() await r(es, params) - es.snapshot.restore.assert_called_once_with( + es.snapshot.restore.assert_awaited_once_with( repository="backups", snapshot="snapshot-001", wait_for_completion=True, params={"request_timeout": 7200} ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_restore_snapshot_with_body(self, es): - es.snapshot.restore.return_value = as_future() + es.snapshot.restore = mock.AsyncMock() params = { "repository": "backups", "snapshot": "snapshot-001", @@ -3682,7 +3692,7 @@ async def test_restore_snapshot_with_body(self, es): r = runner.RestoreSnapshot() await r(es, params) - es.snapshot.restore.assert_called_once_with( + es.snapshot.restore.assert_awaited_once_with( repository="backups", snapshot="snapshot-001", body={ @@ -3702,11 +3712,11 @@ class IndicesRecoveryTests(TestCase): @run_async async def test_waits_for_ongoing_indices_recovery(self, es): # empty response - es.indices.recovery.side_effect = [ - # recovery did not yet start - as_future({}), - # recovery about to be started - as_future( + es.indices.recovery = mock.AsyncMock( + side_effect=[ + # recovery did not yet start + {}, + # recovery about to be started { "index1": { "shards": [ @@ -3743,10 +3753,8 @@ async def test_waits_for_ongoing_indices_recovery(self, es): }, ] } - } - ), - # active recovery - one shard is not yet finished - as_future( + }, + # active recovery - one shard is not yet finished { "index1": { "shards": [ @@ -3784,10 +3792,8 @@ async def test_waits_for_ongoing_indices_recovery(self, es): }, ] } - } - ), - # completed - as_future( + }, + # completed { "index1": { "shards": [ @@ -3825,9 +3831,9 @@ async def test_waits_for_ongoing_indices_recovery(self, es): }, ] } - } - ), - ] + }, + ] + ) r = runner.IndicesRecovery() @@ -3842,9 +3848,9 @@ async def test_waits_for_ongoing_indices_recovery(self, es): self.assertEqual(1393244155000, result["start_time_millis"]) self.assertEqual(1393244160000, result["stop_time_millis"]) - es.indices.recovery.assert_called_with(index="index1") + es.indices.recovery.assert_awaited_with(index="index1") # retries four times - self.assertEqual(4, es.indices.recovery.call_count) + self.assertEqual(4, es.indices.recovery.await_count) class ShrinkIndexTests(TestCase): @@ -3853,11 +3859,11 @@ class ShrinkIndexTests(TestCase): @mock.patch("asyncio.sleep") @run_async async def test_shrink_index_with_shrink_node(self, sleep, es): - es.indices.get.return_value = as_future({"src": {}}) + es.indices.get = mock.AsyncMock(return_value={"src": {}}) # cluster health API - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) - es.indices.put_settings.return_value = as_future() - es.indices.shrink.return_value = as_future() + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) + es.indices.put_settings = mock.AsyncMock() + es.indices.shrink = mock.AsyncMock() r = runner.ShrinkIndex() params = { @@ -3874,7 +3880,7 @@ async def test_shrink_index_with_shrink_node(self, sleep, es): await r(es, params) - es.indices.put_settings.assert_called_once_with( + es.indices.put_settings.assert_awaited_once_with( index="src", body={ "settings": { @@ -3885,14 +3891,14 @@ async def test_shrink_index_with_shrink_node(self, sleep, es): preserve_existing=True, ) - es.cluster.health.assert_has_calls( + es.cluster.health.assert_has_awaits( [ mock.call(index="src", params={"wait_for_no_relocating_shards": "true"}), mock.call(index="target", params={"wait_for_no_relocating_shards": "true"}), ] ) - es.indices.shrink.assert_called_once_with( + es.indices.shrink.assert_awaited_once_with( index="src", target="target", body={ @@ -3910,11 +3916,11 @@ async def test_shrink_index_with_shrink_node(self, sleep, es): @mock.patch("asyncio.sleep") @run_async async def test_shrink_index_derives_shrink_node(self, sleep, es): - es.indices.get.return_value = as_future({"src": {}}) + es.indices.get = mock.AsyncMock(return_value={"src": {}}) # cluster health API - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) - es.nodes.info.return_value = as_future( - { + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) + es.nodes.info = mock.AsyncMock( + return_value={ "_nodes": {"total": 3, "successful": 3, "failed": 0}, "cluster_name": "elasticsearch", "nodes": { @@ -3924,8 +3930,8 @@ async def test_shrink_index_derives_shrink_node(self, sleep, es): }, } ) - es.indices.put_settings.return_value = as_future() - es.indices.shrink.return_value = as_future() + es.indices.put_settings = mock.AsyncMock() + es.indices.shrink = mock.AsyncMock() r = runner.ShrinkIndex() params = { @@ -3936,7 +3942,7 @@ async def test_shrink_index_derives_shrink_node(self, sleep, es): await r(es, params) - es.indices.put_settings.assert_called_once_with( + es.indices.put_settings.assert_awaited_once_with( index="src", body={ "settings": { @@ -3948,14 +3954,14 @@ async def test_shrink_index_derives_shrink_node(self, sleep, es): preserve_existing=True, ) - es.cluster.health.assert_has_calls( + es.cluster.health.assert_has_awaits( [ mock.call(index="src", params={"wait_for_no_relocating_shards": "true"}), mock.call(index="target", params={"wait_for_no_relocating_shards": "true"}), ] ) - es.indices.shrink.assert_called_once_with( + es.indices.shrink.assert_awaited_once_with( index="src", target="target", body={ @@ -3973,11 +3979,11 @@ async def test_shrink_index_derives_shrink_node(self, sleep, es): @mock.patch("asyncio.sleep") @run_async async def test_shrink_index_pattern_with_shrink_node(self, sleep, es): - es.indices.get.return_value = as_future({"src1": {}, "src2": {}, "src-2020": {}}) + es.indices.get = mock.AsyncMock(return_value={"src1": {}, "src2": {}, "src-2020": {}}) # cluster health API - es.cluster.health.return_value = as_future({"status": "green", "relocating_shards": 0}) - es.indices.put_settings.return_value = as_future() - es.indices.shrink.return_value = as_future() + es.cluster.health = mock.AsyncMock(return_value={"status": "green", "relocating_shards": 0}) + es.indices.put_settings = mock.AsyncMock() + es.indices.shrink = mock.AsyncMock() r = runner.ShrinkIndex() params = { @@ -3989,7 +3995,7 @@ async def test_shrink_index_pattern_with_shrink_node(self, sleep, es): await r(es, params) - es.indices.put_settings.assert_has_calls( + es.indices.put_settings.assert_has_awaits( [ mock.call( index="src1", @@ -4009,7 +4015,7 @@ async def test_shrink_index_pattern_with_shrink_node(self, sleep, es): ] ) - es.cluster.health.assert_has_calls( + es.cluster.health.assert_has_awaits( [ mock.call(index="src1", params={"wait_for_no_relocating_shards": "true"}), mock.call(index="target1", params={"wait_for_no_relocating_shards": "true"}), @@ -4020,7 +4026,7 @@ async def test_shrink_index_pattern_with_shrink_node(self, sleep, es): ] ) - es.indices.shrink.assert_has_calls( + es.indices.shrink.assert_has_awaits( [ mock.call( index="src1", @@ -4066,20 +4072,20 @@ class PutSettingsTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_put_settings(self, es): - es.cluster.put_settings.return_value = as_future() + es.cluster.put_settings = mock.AsyncMock() params = {"body": {"transient": {"indices.recovery.max_bytes_per_sec": "20mb"}}} r = runner.PutSettings() await r(es, params) - es.cluster.put_settings.assert_called_once_with(body={"transient": {"indices.recovery.max_bytes_per_sec": "20mb"}}) + es.cluster.put_settings.assert_awaited_once_with(body={"transient": {"indices.recovery.max_bytes_per_sec": "20mb"}}) class CreateTransformTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_transform(self, es): - es.transform.put_transform.return_value = as_future() + es.transform.put_transform = mock.AsyncMock() params = { "transform-id": "a-transform", @@ -4098,7 +4104,7 @@ async def test_create_transform(self, es): r = runner.CreateTransform() await r(es, params) - es.transform.put_transform.assert_called_once_with( + es.transform.put_transform.assert_awaited_once_with( transform_id=params["transform-id"], body=params["body"], defer_validation=params["defer-validation"] ) @@ -4107,7 +4113,7 @@ class StartTransformTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_start_transform(self, es): - es.transform.start_transform.return_value = as_future() + es.transform.start_transform = mock.AsyncMock() transform_id = "a-transform" params = {"transform-id": transform_id, "timeout": "5s"} @@ -4115,14 +4121,14 @@ async def test_start_transform(self, es): r = runner.StartTransform() await r(es, params) - es.transform.start_transform.assert_called_once_with(transform_id=transform_id, timeout=params["timeout"]) + es.transform.start_transform.assert_awaited_once_with(transform_id=transform_id, timeout=params["timeout"]) class WaitForTransformTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_transform(self, es): - es.transform.stop_transform.return_value = as_future() + es.transform.stop_transform = mock.AsyncMock() transform_id = "a-transform" params = { "transform-id": transform_id, @@ -4132,8 +4138,8 @@ async def test_wait_for_transform(self, es): "wait-for-checkpoint": random.choice([False, True]), } - es.transform.get_transform_stats.return_value = as_future( - { + es.transform.get_transform_stats = mock.AsyncMock( + return_value={ "count": 1, "transforms": [ { @@ -4173,7 +4179,7 @@ async def test_wait_for_transform(self, es): self.assertEqual(2, result["weight"], 2) self.assertEqual(result["unit"], "docs") - es.transform.stop_transform.assert_called_once_with( + es.transform.stop_transform.assert_awaited_once_with( transform_id=transform_id, force=params["force"], timeout=params["timeout"], @@ -4184,7 +4190,7 @@ async def test_wait_for_transform(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_wait_for_transform_progress(self, es): - es.transform.stop_transform.return_value = as_future() + es.transform.stop_transform = mock.AsyncMock() transform_id = "a-transform" params = { "transform-id": transform_id, @@ -4193,8 +4199,8 @@ async def test_wait_for_transform_progress(self, es): } # return 4 times, simulating progress - es.transform.get_transform_stats.side_effect = [ - as_future( + es.transform.get_transform_stats = mock.AsyncMock( + side_effect=[ { "count": 1, "transforms": [ @@ -4225,9 +4231,7 @@ async def test_wait_for_transform_progress(self, es): }, } ], - } - ), - as_future( + }, { "count": 1, "transforms": [ @@ -4258,9 +4262,7 @@ async def test_wait_for_transform_progress(self, es): }, } ], - } - ), - as_future( + }, { "count": 1, "transforms": [ @@ -4291,9 +4293,7 @@ async def test_wait_for_transform_progress(self, es): }, } ], - } - ), - as_future( + }, { "count": 1, "transforms": [ @@ -4320,9 +4320,9 @@ async def test_wait_for_transform_progress(self, es): "checkpointing": {"last": {"checkpoint": 1, "timestamp_millis": 16}, "changes_last_detected_at": 16}, } ], - } - ), - ] + }, + ] + ) r = runner.WaitForTransform() self.assertFalse(r.completed) @@ -4341,7 +4341,7 @@ async def test_wait_for_transform_progress(self, es): self.assertEqual(result["weight"], 60000) self.assertEqual(result["unit"], "docs") - es.transform.stop_transform.assert_called_once_with( + es.transform.stop_transform.assert_awaited_once_with( transform_id=transform_id, force=params["force"], timeout=params["timeout"], wait_for_completion=False, wait_for_checkpoint=True ) @@ -4350,7 +4350,7 @@ class DeleteTransformTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_transform(self, es): - es.transform.delete_transform.return_value = as_future() + es.transform.delete_transform = mock.AsyncMock() transform_id = "a-transform" params = {"transform-id": transform_id, "force": random.choice([False, True])} @@ -4358,14 +4358,14 @@ async def test_delete_transform(self, es): r = runner.DeleteTransform() await r(es, params) - es.transform.delete_transform.assert_called_once_with(transform_id=transform_id, force=params["force"], ignore=[404]) + es.transform.delete_transform.assert_awaited_once_with(transform_id=transform_id, force=params["force"], ignore=[404]) class TransformStatsRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_transform_stats_with_timeout_and_headers(self, es): - es.transform.get_transform_stats.return_value = as_future({}) + es.transform.get_transform_stats = mock.AsyncMock(return_value={}) transform_stats = runner.TransformStats() transform_id = "a-transform" result = await transform_stats( @@ -4381,7 +4381,7 @@ async def test_transform_stats_with_timeout_and_headers(self, es): self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.transform.get_transform_stats.assert_called_once_with( + es.transform.get_transform_stats.assert_awaited_once_with( transform_id=transform_id, headers={"header1": "value1"}, opaque_id="test-id1", @@ -4392,8 +4392,8 @@ async def test_transform_stats_with_timeout_and_headers(self, es): @run_async async def test_transform_stats_with_failed_condition(self, es): transform_id = "a-transform" - es.transform.get_transform_stats.return_value = as_future( - { + es.transform.get_transform_stats = mock.AsyncMock( + return_value={ "count": 3, "transforms": [ { @@ -4433,14 +4433,14 @@ async def test_transform_stats_with_failed_condition(self, es): result["condition"], ) - es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id) + es.transform.get_transform_stats.assert_awaited_once_with(transform_id=transform_id) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_transform_stats_with_successful_condition(self, es): transform_id = "a-transform" - es.transform.get_transform_stats.return_value = as_future( - { + es.transform.get_transform_stats = mock.AsyncMock( + return_value={ "count": 3, "transforms": [ { @@ -4479,14 +4479,14 @@ async def test_transform_stats_with_successful_condition(self, es): result["condition"], ) - es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id) + es.transform.get_transform_stats.assert_awaited_once_with(transform_id=transform_id) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_transform_stats_with_non_existing_path(self, es): transform_id = "a-transform" - es.transform.get_transform_stats.return_value = as_future( - { + es.transform.get_transform_stats = mock.AsyncMock( + return_value={ "count": 3, "transforms": [ { @@ -4525,7 +4525,7 @@ async def test_transform_stats_with_non_existing_path(self, es): result["condition"], ) - es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id) + es.transform.get_transform_stats.assert_awaited_once_with(transform_id=transform_id) class CreateIlmPolicyRunner(TestCase): @@ -4543,21 +4543,21 @@ class CreateIlmPolicyRunner(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ilm_policy_with_request_params(self, es): - es.ilm.put_lifecycle.return_value = as_future({}) + es.ilm.put_lifecycle = mock.AsyncMock(return_value={}) create_ilm_policy = runner.CreateIlmPolicy() result = await create_ilm_policy(es, params=self.params) self.assertEqual(1, result["weight"]) self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.ilm.put_lifecycle.assert_called_once_with( + es.ilm.put_lifecycle.assert_awaited_once_with( policy=self.params["policy-name"], body=self.params["body"], params=self.params["request-params"] ) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_create_ilm_policy_without_request_params(self, es): - es.ilm.put_lifecycle.return_value = as_future({}) + es.ilm.put_lifecycle = mock.AsyncMock(return_value={}) create_ilm_policy = runner.CreateIlmPolicy() params = copy.deepcopy(self.params) del params["request-params"] @@ -4566,7 +4566,7 @@ async def test_create_ilm_policy_without_request_params(self, es): self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.ilm.put_lifecycle.assert_called_once_with(policy=params["policy-name"], body=params["body"], params={}) + es.ilm.put_lifecycle.assert_awaited_once_with(policy=params["policy-name"], body=params["body"], params={}) class DeleteIlmPolicyRunner(TestCase): @@ -4576,19 +4576,19 @@ class DeleteIlmPolicyRunner(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ilm_policy_with_request_params(self, es): - es.ilm.delete_lifecycle.return_value = as_future({}) + es.ilm.delete_lifecycle = mock.AsyncMock(return_value={}) delete_ilm_policy = runner.DeleteIlmPolicy() result = await delete_ilm_policy(es, params=self.params) self.assertEqual(1, result["weight"]) self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.ilm.delete_lifecycle.assert_called_once_with(policy=self.params["policy-name"], params=self.params["request-params"]) + es.ilm.delete_lifecycle.assert_awaited_once_with(policy=self.params["policy-name"], params=self.params["request-params"]) @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_ilm_policy_without_request_params(self, es): - es.ilm.delete_lifecycle.return_value = as_future({}) + es.ilm.delete_lifecycle = mock.AsyncMock(return_value={}) delete_ilm_policy = runner.DeleteIlmPolicy() params = copy.deepcopy(self.params) del params["request-params"] @@ -4597,14 +4597,14 @@ async def test_delete_ilm_policy_without_request_params(self, es): self.assertEqual("ops", result["unit"]) self.assertTrue(result["success"]) - es.ilm.delete_lifecycle.assert_called_once_with(policy=params["policy-name"], params={}) + es.ilm.delete_lifecycle.assert_awaited_once_with(policy=params["policy-name"], params={}) class SubmitAsyncSearchTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_submit_async_search(self, es): - es.async_search.submit.return_value = as_future({"id": "12345"}) + es.async_search.submit = mock.AsyncMock(return_value={"id": "12345"}) r = runner.SubmitAsyncSearch() params = { "name": "search-1", @@ -4621,15 +4621,15 @@ async def test_submit_async_search(self, es): # search id is registered in context self.assertEqual("12345", runner.CompositeContext.get("search-1")) - es.async_search.submit.assert_called_once_with(body={"query": {"match_all": {}}}, index="_all", params={}) + es.async_search.submit.assert_awaited_once_with(body={"query": {"match_all": {}}}, index="_all", params={}) class GetAsyncSearchTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_get_async_search(self, es): - es.async_search.get.return_value = as_future( - { + es.async_search.get = mock.AsyncMock( + return_value={ "is_running": False, "response": { "took": 1122, @@ -4666,14 +4666,14 @@ async def test_get_async_search(self, es): }, ) - es.async_search.get.assert_called_once_with(id="12345", params={}) + es.async_search.get.assert_awaited_once_with(id="12345", params={}) class DeleteAsyncSearchTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_delete_async_search(self, es): - es.async_search.delete.side_effect = [as_future({}), as_future({})] + es.async_search.delete = mock.AsyncMock(side_effect=[{}, {}]) r = runner.DeleteAsyncSearch() params = {"delete-results-for": ["search-1", "search-2", "search-3"]} @@ -4683,7 +4683,7 @@ async def test_delete_async_search(self, es): runner.CompositeContext.put("search-3", "6789") await r(es, params) - es.async_search.delete.assert_has_calls( + es.async_search.delete.assert_has_awaits( [ mock.call(id="12345"), mock.call(id="6789"), @@ -4698,7 +4698,7 @@ async def test_creates_point_in_time(self, es): pit_id = "0123456789abcdef" params = {"name": "open-pit-test", "index": "test-index"} - es.open_point_in_time.return_value = as_future({"id": pit_id}) + es.open_point_in_time = mock.AsyncMock(return_value={"id": pit_id}) r = runner.OpenPointInTime() async with runner.CompositeContext(): @@ -4711,7 +4711,7 @@ async def test_can_only_be_run_in_composite(self, es): pit_id = "0123456789abcdef" params = {"name": "open-pit-test", "index": "test-index"} - es.open_point_in_time.return_value = as_future({"id": pit_id}) + es.open_point_in_time = mock.AsyncMock(return_value={"id": pit_id}) r = runner.OpenPointInTime() with self.assertRaises(exceptions.RallyAssertionError) as ctx: @@ -4729,13 +4729,13 @@ async def test_closes_point_in_time(self, es): "name": "close-pit-test", "with-point-in-time-from": "open-pit-task1", } - es.close_point_in_time.return_value = as_future() + es.close_point_in_time = mock.AsyncMock() r = runner.ClosePointInTime() async with runner.CompositeContext(): runner.CompositeContext.put("open-pit-task1", pit_id) await r(es, params) - es.close_point_in_time.assert_called_once_with(body={"id": "0123456789abcdef"}, params={}, headers=None) + es.close_point_in_time.assert_awaited_once_with(body={"id": "0123456789abcdef"}, params={}, headers=None) class QueryWithSearchAfterScrollTests(TestCase): @@ -4786,10 +4786,12 @@ async def test_search_after_with_pit(self, es): }, } - es.transport.perform_request.side_effect = [ - as_future(io.BytesIO(json.dumps(page_1).encode())), - as_future(io.BytesIO(json.dumps(page_2).encode())), - ] + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + io.BytesIO(json.dumps(page_1).encode()), + io.BytesIO(json.dumps(page_2).encode()), + ] + ) r = runner.Query() @@ -4799,7 +4801,7 @@ async def test_search_after_with_pit(self, es): # make sure pit_id is updated afterward self.assertEqual("fedcba9876543211", runner.CompositeContext.get(pit_op)) - es.transport.perform_request.assert_has_calls( + es.transport.perform_request.assert_has_awaits( [ mock.call( "GET", @@ -4894,14 +4896,16 @@ async def test_search_after_without_pit(self, es): }, } - es.transport.perform_request.side_effect = [ - as_future(io.BytesIO(json.dumps(page_1).encode())), - as_future(io.BytesIO(json.dumps(page_2).encode())), - ] + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + io.BytesIO(json.dumps(page_1).encode()), + io.BytesIO(json.dumps(page_2).encode()), + ] + ) r = runner.Query() await r(es, params) - es.transport.perform_request.assert_has_calls( + es.transport.perform_request.assert_has_awaits( [ mock.call( "GET", @@ -5090,11 +5094,11 @@ def tearDown(self): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_execute_multiple_streams(self, es): - es.transport.perform_request.side_effect = [ - # raw-request - as_future(), - # search - as_future( + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + # raw-request + None, + # search io.StringIO( json.dumps( { @@ -5107,8 +5111,8 @@ async def test_execute_multiple_streams(self, es): }, ), ), - ), - ] + ] + ) params = { "max-connections": 4, @@ -5153,7 +5157,7 @@ async def test_execute_multiple_streams(self, es): r = runner.Composite() await r(es, params) - es.transport.perform_request.assert_has_calls( + es.transport.perform_request.assert_has_awaits( [ mock.call(method="GET", url="/", headers=None, body={}, params={}), mock.call("GET", "/test/_search", params={}, body={"query": {"match_all": {}}}, headers=None), @@ -5163,9 +5167,9 @@ async def test_execute_multiple_streams(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_propagates_violated_assertions(self, es): - es.transport.perform_request.side_effect = [ - # search - as_future( + es.transport.perform_request = mock.AsyncMock( + side_effect=[ + # search io.StringIO( json.dumps( { @@ -5178,8 +5182,8 @@ async def test_propagates_violated_assertions(self, es): } ) ) - ) - ] + ] + ) params = { "max-connections": 4, @@ -5212,7 +5216,7 @@ async def test_propagates_violated_assertions(self, es): with self.assertRaisesRegex(exceptions.RallyTaskAssertionError, r"Expected \[hits\] to be > \[0\] but was \[0\]."): await r(es, params) - es.transport.perform_request.assert_has_calls( + es.transport.perform_request.assert_has_awaits( [ mock.call( "GET", @@ -5231,7 +5235,7 @@ async def test_propagates_violated_assertions(self, es): @mock.patch("elasticsearch.Elasticsearch") @run_async async def test_executes_tasks_in_specified_order(self, es): - es.transport.perform_request.return_value = as_future() + es.transport.perform_request = mock.AsyncMock() params = { "requests": [ @@ -5456,7 +5460,7 @@ async def test_merges_timing_info(self, es): multi_cluster_client = {"default": es} es.new_request_context.return_value = RequestTimingTests.StaticRequestTiming(task_start=2) - delegate = mock.Mock(return_value=as_future({"weight": 5, "unit": "ops", "success": True})) + delegate = mock.AsyncMock(return_value={"weight": 5, "unit": "ops", "success": True}) params = {"name": "unit-test-operation", "operation-type": "test-op"} timer = runner.RequestTiming(delegate) @@ -5483,7 +5487,7 @@ async def test_creates_new_timing_info(self, es): es.new_request_context.return_value = RequestTimingTests.StaticRequestTiming(task_start=2) # a simple runner without a return value - delegate = mock.Mock(return_value=as_future()) + delegate = mock.AsyncMock() params = {"name": "unit-test-operation", "operation-type": "test-op"} timer = runner.RequestTiming(delegate) @@ -5509,7 +5513,7 @@ async def test_creates_new_timing_info(self, es): class RetryTests(TestCase): @run_async async def test_is_transparent_on_success_when_no_retries(self): - delegate = mock.Mock(return_value=as_future()) + delegate = mock.AsyncMock() es = None params = { # no retries @@ -5522,7 +5526,7 @@ async def test_is_transparent_on_success_when_no_retries(self): @run_async async def test_is_transparent_on_exception_when_no_retries(self): - delegate = mock.Mock(side_effect=as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host"))) + delegate = mock.AsyncMock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host")) es = None params = { # no retries @@ -5538,7 +5542,7 @@ async def test_is_transparent_on_exception_when_no_retries(self): async def test_is_transparent_on_application_error_when_no_retries(self): original_return_value = {"weight": 1, "unit": "ops", "success": False} - delegate = mock.Mock(return_value=as_future(original_return_value)) + delegate = mock.AsyncMock(return_value=original_return_value) es = None params = { # no retries @@ -5552,7 +5556,7 @@ async def test_is_transparent_on_application_error_when_no_retries(self): @run_async async def test_is_does_not_retry_on_success(self): - delegate = mock.Mock(return_value=as_future()) + delegate = mock.AsyncMock() es = None params = {"retries": 3, "retry-wait-period": 0.1, "retry-on-timeout": True, "retry-on-error": True} retrier = runner.Retry(delegate) @@ -5563,12 +5567,12 @@ async def test_is_does_not_retry_on_success(self): @run_async async def test_retries_on_timeout_if_wanted_and_raises_if_no_recovery(self): - delegate = mock.Mock( + delegate = mock.AsyncMock( side_effect=[ - as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host")), - as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host")), - as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host")), - as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host")), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), ] ) es = None @@ -5590,8 +5594,11 @@ async def test_retries_on_timeout_if_wanted_and_raises_if_no_recovery(self): async def test_retries_on_timeout_if_wanted_and_returns_first_call(self): failed_return_value = {"weight": 1, "unit": "ops", "success": False} - delegate = mock.Mock( - side_effect=[as_future(exception=elasticsearch.ConnectionError("N/A", "no route to host")), as_future(failed_return_value)] + delegate = mock.AsyncMock( + side_effect=[ + elasticsearch.ConnectionError("N/A", "no route to host"), + failed_return_value, + ] ) es = None params = {"retries": 3, "retry-wait-period": 0.01, "retry-on-timeout": True, "retry-on-error": False} @@ -5615,14 +5622,14 @@ async def test_retries_mixed_timeout_and_application_errors(self): failed_return_value = {"weight": 1, "unit": "ops", "success": False} success_return_value = {"weight": 1, "unit": "ops", "success": False} - delegate = mock.Mock( + delegate = mock.AsyncMock( side_effect=[ - as_future(exception=connection_error), - as_future(failed_return_value), - as_future(exception=connection_error), - as_future(exception=connection_error), - as_future(failed_return_value), - as_future(success_return_value), + connection_error, + failed_return_value, + connection_error, + connection_error, + failed_return_value, + success_return_value, ] ) es = None @@ -5657,7 +5664,7 @@ async def test_retries_mixed_timeout_and_application_errors(self): @run_async async def test_does_not_retry_on_timeout_if_not_wanted(self): - delegate = mock.Mock(side_effect=as_future(exception=elasticsearch.ConnectionTimeout(408, "timed out"))) + delegate = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(408, "timed out")) es = None params = {"retries": 3, "retry-wait-period": 0.01, "retry-on-timeout": False, "retry-on-error": True} retrier = runner.Retry(delegate) @@ -5672,7 +5679,7 @@ async def test_retries_on_application_error_if_wanted(self): failed_return_value = {"weight": 1, "unit": "ops", "success": False} success_return_value = {"weight": 1, "unit": "ops", "success": True} - delegate = mock.Mock(side_effect=[as_future(failed_return_value), as_future(success_return_value)]) + delegate = mock.AsyncMock(side_effect=[failed_return_value, success_return_value]) es = None params = {"retries": 3, "retry-wait-period": 0.01, "retry-on-timeout": False, "retry-on-error": True} retrier = runner.Retry(delegate) @@ -5693,7 +5700,7 @@ async def test_retries_on_application_error_if_wanted(self): async def test_does_not_retry_on_application_error_if_not_wanted(self): failed_return_value = {"weight": 1, "unit": "ops", "success": False} - delegate = mock.Mock(return_value=as_future(failed_return_value)) + delegate = mock.AsyncMock(return_value=failed_return_value) es = None params = {"retries": 3, "retry-wait-period": 0.01, "retry-on-timeout": True, "retry-on-error": False} retrier = runner.Retry(delegate) @@ -5706,7 +5713,7 @@ async def test_does_not_retry_on_application_error_if_not_wanted(self): @run_async async def test_assumes_success_if_runner_returns_non_dict(self): - delegate = mock.Mock(return_value=as_future(result=(1, "ops"))) + delegate = mock.AsyncMock(return_value=(1, "ops")) es = None params = {"retries": 3, "retry-wait-period": 0.01, "retry-on-timeout": True, "retry-on-error": True} retrier = runner.Retry(delegate) @@ -5725,10 +5732,10 @@ async def test_retries_until_success(self): success_return_value = {"weight": 1, "unit": "ops", "success": True} responses = [] - responses += failure_count * [as_future(failed_return_value)] - responses += [as_future(success_return_value)] + responses += failure_count * [failed_return_value] + responses += [success_return_value] - delegate = mock.Mock(side_effect=responses) + delegate = mock.AsyncMock(side_effect=responses) es = None params = {"retry-until-success": True, "retry-wait-period": 0.01} retrier = runner.Retry(delegate)