Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch: tests against elasticsearch 8 #2420

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2461](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2461))
- Remove SDK dependency from opentelemetry-instrumentation-grpc
([#2474](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2474))
- `opentelemetry-instrumentation-elasticsearch` Improved support for version 8
([#2420](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2420))

## Version 1.24.0/0.45b0 (2024-03-28)

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
| [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka >= 1.8.2, <= 2.3.0 | No | experimental
| [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | No | experimental
| [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | Yes | experimental
| [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 2.0 | No | experimental
| [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 6.0 | No | experimental
| [opentelemetry-instrumentation-falcon](./opentelemetry-instrumentation-falcon) | falcon >= 1.4.1, < 4.0.0 | Yes | experimental
| [opentelemetry-instrumentation-fastapi](./opentelemetry-instrumentation-fastapi) | fastapi ~= 0.58 | Yes | experimental
| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0 | Yes | migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def response_hook(span, response):
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer

from .utils import sanitize_body

Expand All @@ -103,6 +103,7 @@ def response_hook(span, response):
es_transport_split = elasticsearch.VERSION[0] > 7
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
if es_transport_split:
import elastic_transport
from elastic_transport._models import DefaultType

logger = getLogger(__name__)

Expand Down Expand Up @@ -173,7 +174,12 @@ def _instrument(self, **kwargs):

def _uninstrument(self, **kwargs):
# pylint: disable=no-member
unwrap(elasticsearch.Transport, "perform_request")
transport_class = (
elastic_transport.Transport
if es_transport_split
else elasticsearch.Transport
)
unwrap(transport_class, "perform_request")


_regex_doc_url = re.compile(r"/_doc/([^/]+)")
Expand All @@ -182,6 +188,7 @@ def _uninstrument(self, **kwargs):
_regex_search_url = re.compile(r"/([^/]+)/_search[/]?")


# pylint: disable=too-many-statements
def _wrap_perform_request(
tracer,
span_name_prefix,
Expand Down Expand Up @@ -234,7 +241,22 @@ def wrapper(wrapped, _, args, kwargs):
kind=SpanKind.CLIENT,
) as span:
if callable(request_hook):
request_hook(span, method, url, kwargs)
# elasticsearch 8 changed the parameters quite a bit
if es_transport_split:

def normalize_kwargs(k, v):
if isinstance(v, DefaultType):
v = str(v)
elif isinstance(v, elastic_transport.HttpHeaders):
v = dict(v)
return (k, v)

hook_kwargs = dict(
normalize_kwargs(k, v) for k, v in kwargs.items()
)
else:
hook_kwargs = kwargs
request_hook(span, method, url, hook_kwargs)

if span.is_recording():
attributes = {
Expand All @@ -260,16 +282,41 @@ def wrapper(wrapped, _, args, kwargs):
span.set_attribute(key, value)

rv = wrapped(*args, **kwargs)
xrmx marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(rv, dict) and span.is_recording():

body = rv.body if es_transport_split else rv
if isinstance(body, dict) and span.is_recording():
for member in _ATTRIBUTES_FROM_RESULT:
if member in rv:
if member in body:
span.set_attribute(
f"elasticsearch.{member}",
str(rv[member]),
str(body[member]),
)

# since the transport split the raising of exceptions that set the error status
# are called after this code so need to set error status manually
if es_transport_split and span.is_recording():
if not (method == "HEAD" and rv.meta.status == 404) and (
not 200 <= rv.meta.status < 299
):
exception = elasticsearch.exceptions.HTTP_EXCEPTIONS.get(
rv.meta.status, elasticsearch.exceptions.ApiError
)
message = str(body)
if isinstance(body, dict):
error = body.get("error", message)
if isinstance(error, dict) and "type" in error:
error = error["type"]
message = error

span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{exception.__name__}: {message}",
)
)

if callable(response_hook):
response_hook(span, rv)
response_hook(span, body)
return rv

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.


_instruments = ("elasticsearch >= 2.0",)
_instruments = ("elasticsearch >= 6.0",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
asgiref==3.7.2
attrs==23.2.0
Deprecated==1.2.14
elasticsearch==8.12.1
elasticsearch-dsl==8.12.0
elastic-transport==8.12.0
importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==23.2
pluggy==1.4.0
py==1.11.0
py-cpuinfo==9.0.0
pytest==7.1.3
pytest-benchmark==4.0.0
python-dateutil==2.8.2
six==1.16.0
tomli==2.0.1
typing_extensions==4.10.0
urllib3==2.2.1
wrapt==1.16.0
zipp==3.17.0
-e opentelemetry-instrumentation
-e instrumentation/opentelemetry-instrumentation-elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ class Index:
dsl_index_span_name = "Elasticsearch/test-index/doc/2"
dsl_index_url = "/test-index/doc/2"
dsl_search_method = "GET"

perform_request_mock_path = "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"


def mock_response(body: str, status_code: int = 200):
return (status_code, {}, body)
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ class Index:
dsl_index_span_name = "Elasticsearch/test-index/_doc/:id"
dsl_index_url = "/test-index/_doc/2"
dsl_search_method = "POST"

perform_request_mock_path = "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"


def mock_response(body: str, status_code: int = 200):
return (status_code, {}, body)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from elastic_transport import ApiResponseMeta, HttpHeaders
from elastic_transport._node import NodeApiResponse
from elasticsearch_dsl import Document, Keyword, Text


Expand All @@ -36,6 +38,23 @@ class Index:
}
}
dsl_index_result = (1, {}, '{"result": "created"}')
dsl_index_span_name = "Elasticsearch/test-index/_doc/2"
dsl_index_span_name = "Elasticsearch/test-index/_doc/:id"
dsl_index_url = "/test-index/_doc/2"
dsl_search_method = "POST"

perform_request_mock_path = (
"elastic_transport._node._http_urllib3.Urllib3HttpNode.perform_request"
)


def mock_response(body: str, status_code: int = 200):
return NodeApiResponse(
ApiResponseMeta(
status=status_code,
headers=HttpHeaders({}),
duration=100,
http_version="1.1",
node="node",
),
body.encode(),
)
Loading
Loading