From 6277f2c573fbd7b97c6d375c82faf1e01cc9b352 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Tue, 2 Jun 2020 18:50:06 -0500 Subject: [PATCH] Add support for AsyncElasticsearch (#843) * Add support for AsyncElasticsearch * Add to changelog Co-authored-by: Colton Myers --- CHANGELOG.asciidoc | 1 + .../packages/asyncio/elasticsearch.py | 22 +- .../instrumentation/packages/elasticsearch.py | 14 +- elasticapm/instrumentation/register.py | 1 + .../async_elasticsearch_client_tests.py | 193 ++++++++++++++++++ tests/requirements/reqs-elasticsearch-7.txt | 1 + 6 files changed, 225 insertions(+), 7 deletions(-) create mode 100644 tests/instrumentation/asyncio/async_elasticsearch_client_tests.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4120445df..37eb9b533 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ endif::[] ===== Features * capture number of affected rows for INSERT/UPDATE/DELETE SQL queries {pull}614[#614] + * Added instrumentation for AsyncElasticsearch {pull}843[#843] [[release-notes-5.x]] diff --git a/elasticapm/instrumentation/packages/asyncio/elasticsearch.py b/elasticapm/instrumentation/packages/asyncio/elasticsearch.py index 13b5d3de8..993f90a77 100644 --- a/elasticapm/instrumentation/packages/asyncio/elasticsearch.py +++ b/elasticapm/instrumentation/packages/asyncio/elasticsearch.py @@ -30,13 +30,16 @@ import elasticapm from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule -from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin +from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule): name = "elasticsearch_connection" - instrument_list = [("elasticsearch_async.connection", "AIOHttpConnection.perform_request")] + instrument_list = [ + ("elasticsearch_async.connection", "AIOHttpConnection.perform_request"), + ("elasticsearch._async.http_aiohttp", "AIOHttpConnection.perform_request"), + ] async def call(self, module, method, wrapped, instance, args, kwargs): signature = self.get_signature(args, kwargs) @@ -52,3 +55,18 @@ async def call(self, module, method, wrapped, instance, args, kwargs): leaf=True, ): return await wrapped(*args, **kwargs) + + +class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule): + name = "elasticsearch" + + instrument_list = [ + ("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"), + ("elasticsearch._async.client", "AsyncElasticsearch.search"), + ("elasticsearch._async.client", "AsyncElasticsearch.count"), + ("elasticsearch._async.client", "AsyncElasticsearch.update"), + ] + + async def call(self, module, method, wrapped, instance, args, kwargs): + kwargs = self.inject_apm_params(method, kwargs) + return await wrapped(*args, **kwargs) diff --git a/elasticapm/instrumentation/packages/elasticsearch.py b/elasticapm/instrumentation/packages/elasticsearch.py index beef020a2..76c69089c 100644 --- a/elasticapm/instrumentation/packages/elasticsearch.py +++ b/elasticapm/instrumentation/packages/elasticsearch.py @@ -45,7 +45,7 @@ class ElasticSearchConnectionMixin(object): - query_methods = ("Elasticsearch.search", "Elasticsearch.count", "Elasticsearch.delete_by_query") + query_methods = ("search", "count", "delete_by_query") def get_signature(self, args, kwargs): args_len = len(args) @@ -74,7 +74,7 @@ def get_context(self, instance, args, kwargs): query.append(json.dumps(body["query"], default=compat.text_type)) if query: context["db"]["statement"] = "\n\n".join(query) - elif api_method == "Elasticsearch.update": + elif api_method == "update": if isinstance(body, dict) and "script" in body: # only get the `script` field from the body context["db"]["statement"] = json.dumps({"script": body["script"]}) @@ -135,17 +135,21 @@ def instrument(self): super(ElasticsearchInstrumentation, self).instrument() def call(self, module, method, wrapped, instance, args, kwargs): + kwargs = self.inject_apm_params(method, kwargs) + return wrapped(*args, **kwargs) + + def inject_apm_params(self, method, kwargs): params = kwargs.pop("params", {}) # make a copy of params in case the caller reuses them for some reason params = params.copy() if params is not None else {} - cls_name, method_name = method.split(".", 1) + method_name = method.partition(".")[-1] # store a reference to the non-serialized body so we can use it in the connection layer body = kwargs.get("body") params[BODY_REF_NAME] = body - params[API_METHOD_KEY_NAME] = method + params[API_METHOD_KEY_NAME] = method_name kwargs["params"] = params - return wrapped(*args, **kwargs) + return kwargs diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index 73f3aa8b8..05a802e31 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -68,6 +68,7 @@ "elasticapm.instrumentation.packages.asyncio.sleep.AsyncIOSleepInstrumentation", "elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation", "elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection", + "elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation", "elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation", "elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation", "elasticapm.instrumentation.packages.tornado.TornadoHandleRequestExceptionInstrumentation", diff --git a/tests/instrumentation/asyncio/async_elasticsearch_client_tests.py b/tests/instrumentation/asyncio/async_elasticsearch_client_tests.py new file mode 100644 index 000000000..2b88ab33b --- /dev/null +++ b/tests/instrumentation/asyncio/async_elasticsearch_client_tests.py @@ -0,0 +1,193 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest # isort:skip + +pytest.importorskip("elasticsearch._async") # isort:skip + +import os + +from elasticsearch import VERSION as ES_VERSION +from elasticsearch import AsyncElasticsearch + +from elasticapm.conf.constants import TRANSACTION + +pytestmark = [pytest.mark.elasticsearch, pytest.mark.asyncio, pytest.mark.integrationtest] + +if "ES_URL" not in os.environ: + pytestmark.append(pytest.mark.skip("Skipping elasticsearch test, no ES_URL environment variable")) + + +document_type = "_doc" if ES_VERSION[0] >= 6 else "doc" + + +@pytest.fixture +async def async_elasticsearch(request): + """AsyncElasticsearch client fixture.""" + client = AsyncElasticsearch(hosts=os.environ["ES_URL"]) + try: + yield client + finally: + await client.indices.delete(index="*") + await client.close() + + +async def test_ping(instrument, elasticapm_client, async_elasticsearch): + elasticapm_client.begin_transaction("test") + result = await async_elasticsearch.ping() + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 1 + span = spans[0] + assert span["name"] == "ES HEAD /" + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["sync"] is False + + +async def test_info(instrument, elasticapm_client, async_elasticsearch): + elasticapm_client.begin_transaction("test") + result = await async_elasticsearch.info() + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 1 + span = spans[0] + assert span["name"] == "ES GET /" + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["sync"] is False + + +async def test_create(instrument, elasticapm_client, async_elasticsearch): + elasticapm_client.begin_transaction("test") + if ES_VERSION[0] < 5: + r1 = await async_elasticsearch.create("tweets", document_type, {"user": "kimchy", "text": "hola"}, 1) + elif ES_VERSION[0] < 7: + r1 = await async_elasticsearch.create("tweets", document_type, 1, body={"user": "kimchy", "text": "hola"}) + else: + r1 = await async_elasticsearch.create("tweets", 1, body={"user": "kimchy", "text": "hola"}) + r2 = await async_elasticsearch.create( + index="tweets", doc_type=document_type, id=2, body={"user": "kimchy", "text": "hola"}, refresh=True + ) + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 2 + + for i, span in enumerate(spans): + if ES_VERSION[0] >= 5: + assert span["name"] in ( + "ES PUT /tweets/%s/%d/_create" % (document_type, i + 1), + "ES PUT /tweets/_create/%d" % (i + 1), + ) + else: + assert span["name"] == "ES PUT /tweets/%s/%d" % (document_type, i + 1) + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["context"]["db"]["type"] == "elasticsearch" + assert "statement" not in span["context"]["db"] + + +async def test_search_body(instrument, elasticapm_client, async_elasticsearch): + await async_elasticsearch.create( + index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True + ) + elasticapm_client.begin_transaction("test") + search_query = {"query": {"term": {"user": "kimchy"}}} + result = await async_elasticsearch.search(body=search_query, params=None) + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + assert result["hits"]["hits"][0]["_source"] == {"user": "kimchy", "text": "hola"} + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 1 + span = spans[0] + # Depending on ES_VERSION, could be /_all/_search or /_search, and GET or POST + assert span["name"] in ("ES GET /_search", "ES GET /_all/_search", "ES POST /_search") + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["context"]["db"]["type"] == "elasticsearch" + assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}' + assert span["sync"] is False + + +async def test_count_body(instrument, elasticapm_client, async_elasticsearch): + await async_elasticsearch.create( + index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True + ) + elasticapm_client.begin_transaction("test") + search_query = {"query": {"term": {"user": "kimchy"}}} + result = await async_elasticsearch.count(body=search_query) + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + assert result["count"] == 1 + spans = elasticapm_client.spans_for_transaction(transaction) + assert len(spans) == 1 + span = spans[0] + assert span["name"] in ("ES GET /_count", "ES POST /_count", "ES GET /_all/_count") + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["context"]["db"]["type"] == "elasticsearch" + assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}' + assert span["sync"] is False + + +async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch): + await async_elasticsearch.create( + index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True + ) + elasticapm_client.begin_transaction("test") + result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}}) + elasticapm_client.end_transaction("test", "OK") + + transaction = elasticapm_client.events[TRANSACTION][0] + spans = elasticapm_client.spans_for_transaction(transaction) + + span = spans[0] + assert span["name"] == "ES POST /tweets/_delete_by_query" + assert span["type"] == "db" + assert span["subtype"] == "elasticsearch" + assert span["action"] == "query" + assert span["context"]["db"]["type"] == "elasticsearch" + assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}' + assert span["sync"] is False diff --git a/tests/requirements/reqs-elasticsearch-7.txt b/tests/requirements/reqs-elasticsearch-7.txt index 46988c199..7b30f3499 100644 --- a/tests/requirements/reqs-elasticsearch-7.txt +++ b/tests/requirements/reqs-elasticsearch-7.txt @@ -1,3 +1,4 @@ elasticsearch>=7.0,<8.0 elasticsearch-async ; python_version >= '3.7' +aiohttp ; python_version >= '3.6' -r reqs-base.txt