Skip to content

Commit

Permalink
Add support for AsyncElasticsearch (elastic#843)
Browse files Browse the repository at this point in the history
* Add support for AsyncElasticsearch

* Add to changelog

Co-authored-by: Colton Myers <colton.myers@gmail.com>
  • Loading branch information
2 people authored and beniwohli committed Sep 14, 2021
1 parent 38c4325 commit 99d7dda
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ endif::[]
===== Features
* capture number of affected rows for INSERT/UPDATE/DELETE SQL queries {pull}614[#614]
* Added instrumentation for AsyncElasticsearch {pull}843[#843]
[[release-notes-5.x]]
Expand Down
22 changes: 20 additions & 2 deletions elasticapm/instrumentation/packages/asyncio/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@

import elasticapm
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation


class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
name = "elasticsearch_connection"

instrument_list = [("elasticsearch_async.connection", "AIOHttpConnection.perform_request")]
instrument_list = [
("elasticsearch_async.connection", "AIOHttpConnection.perform_request"),
("elasticsearch._async.http_aiohttp", "AIOHttpConnection.perform_request"),
]

async def call(self, module, method, wrapped, instance, args, kwargs):
signature = self.get_signature(args, kwargs)
Expand All @@ -52,3 +55,18 @@ async def call(self, module, method, wrapped, instance, args, kwargs):
leaf=True,
):
return await wrapped(*args, **kwargs)


class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule):
name = "elasticsearch"

instrument_list = [
("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"),
("elasticsearch._async.client", "AsyncElasticsearch.search"),
("elasticsearch._async.client", "AsyncElasticsearch.count"),
("elasticsearch._async.client", "AsyncElasticsearch.update"),
]

async def call(self, module, method, wrapped, instance, args, kwargs):
kwargs = self.inject_apm_params(method, kwargs)
return await wrapped(*args, **kwargs)
14 changes: 9 additions & 5 deletions elasticapm/instrumentation/packages/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@


class ElasticSearchConnectionMixin(object):
query_methods = ("Elasticsearch.search", "Elasticsearch.count", "Elasticsearch.delete_by_query")
query_methods = ("search", "count", "delete_by_query")

def get_signature(self, args, kwargs):
args_len = len(args)
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_context(self, instance, args, kwargs):
query.append(json.dumps(body["query"], default=compat.text_type))
if query:
context["db"]["statement"] = "\n\n".join(query)
elif api_method == "Elasticsearch.update":
elif api_method == "update":
if isinstance(body, dict) and "script" in body:
# only get the `script` field from the body
context["db"]["statement"] = json.dumps({"script": body["script"]})
Expand Down Expand Up @@ -135,17 +135,21 @@ def instrument(self):
super(ElasticsearchInstrumentation, self).instrument()

def call(self, module, method, wrapped, instance, args, kwargs):
kwargs = self.inject_apm_params(method, kwargs)
return wrapped(*args, **kwargs)

def inject_apm_params(self, method, kwargs):
params = kwargs.pop("params", {})

# make a copy of params in case the caller reuses them for some reason
params = params.copy() if params is not None else {}

cls_name, method_name = method.split(".", 1)
method_name = method.partition(".")[-1]

# store a reference to the non-serialized body so we can use it in the connection layer
body = kwargs.get("body")
params[BODY_REF_NAME] = body
params[API_METHOD_KEY_NAME] = method
params[API_METHOD_KEY_NAME] = method_name

kwargs["params"] = params
return wrapped(*args, **kwargs)
return kwargs
1 change: 1 addition & 0 deletions elasticapm/instrumentation/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"elasticapm.instrumentation.packages.asyncio.sleep.AsyncIOSleepInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoHandleRequestExceptionInstrumentation",
Expand Down
193 changes: 193 additions & 0 deletions tests/instrumentation/asyncio/async_elasticsearch_client_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pytest # isort:skip

pytest.importorskip("elasticsearch._async") # isort:skip

import os

from elasticsearch import VERSION as ES_VERSION
from elasticsearch import AsyncElasticsearch

from elasticapm.conf.constants import TRANSACTION

pytestmark = [pytest.mark.elasticsearch, pytest.mark.asyncio, pytest.mark.integrationtest]

if "ES_URL" not in os.environ:
pytestmark.append(pytest.mark.skip("Skipping elasticsearch test, no ES_URL environment variable"))


document_type = "_doc" if ES_VERSION[0] >= 6 else "doc"


@pytest.fixture
async def async_elasticsearch(request):
"""AsyncElasticsearch client fixture."""
client = AsyncElasticsearch(hosts=os.environ["ES_URL"])
try:
yield client
finally:
await client.indices.delete(index="*")
await client.close()


async def test_ping(instrument, elasticapm_client, async_elasticsearch):
elasticapm_client.begin_transaction("test")
result = await async_elasticsearch.ping()
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
span = spans[0]
assert span["name"] == "ES HEAD /"
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["sync"] is False


async def test_info(instrument, elasticapm_client, async_elasticsearch):
elasticapm_client.begin_transaction("test")
result = await async_elasticsearch.info()
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]

spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
span = spans[0]
assert span["name"] == "ES GET /"
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["sync"] is False


async def test_create(instrument, elasticapm_client, async_elasticsearch):
elasticapm_client.begin_transaction("test")
if ES_VERSION[0] < 5:
r1 = await async_elasticsearch.create("tweets", document_type, {"user": "kimchy", "text": "hola"}, 1)
elif ES_VERSION[0] < 7:
r1 = await async_elasticsearch.create("tweets", document_type, 1, body={"user": "kimchy", "text": "hola"})
else:
r1 = await async_elasticsearch.create("tweets", 1, body={"user": "kimchy", "text": "hola"})
r2 = await async_elasticsearch.create(
index="tweets", doc_type=document_type, id=2, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]

spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 2

for i, span in enumerate(spans):
if ES_VERSION[0] >= 5:
assert span["name"] in (
"ES PUT /tweets/%s/%d/_create" % (document_type, i + 1),
"ES PUT /tweets/_create/%d" % (i + 1),
)
else:
assert span["name"] == "ES PUT /tweets/%s/%d" % (document_type, i + 1)
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert "statement" not in span["context"]["db"]


async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
await async_elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
search_query = {"query": {"term": {"user": "kimchy"}}}
result = await async_elasticsearch.search(body=search_query, params=None)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
assert result["hits"]["hits"][0]["_source"] == {"user": "kimchy", "text": "hola"}
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
span = spans[0]
# Depending on ES_VERSION, could be /_all/_search or /_search, and GET or POST
assert span["name"] in ("ES GET /_search", "ES GET /_all/_search", "ES POST /_search")
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
assert span["sync"] is False


async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
await async_elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
search_query = {"query": {"term": {"user": "kimchy"}}}
result = await async_elasticsearch.count(body=search_query)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
assert result["count"] == 1
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
span = spans[0]
assert span["name"] in ("ES GET /_count", "ES POST /_count", "ES GET /_all/_count")
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
assert span["sync"] is False


async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch):
await async_elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)

span = spans[0]
assert span["name"] == "ES POST /tweets/_delete_by_query"
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
assert span["sync"] is False
1 change: 1 addition & 0 deletions tests/requirements/reqs-elasticsearch-7.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
elasticsearch>=7.0,<8.0
elasticsearch-async ; python_version >= '3.7'
aiohttp ; python_version >= '3.6'
-r reqs-base.txt

0 comments on commit 99d7dda

Please sign in to comment.