Skip to content

Commit

Permalink
adding response_hook to elastic instrumentation (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
ItayGibel-helios authored Sep 14, 2021
1 parent b47328e commit 291e508
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0
([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667))

### Added
- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks
([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670))

### Changed
- `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@
.. code-block:: python
ElasticsearchInstrumentor("my-custom-prefix").instrument()
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is:
def request_hook(span: Span, method: str, url: str, kwargs)
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
this function signature is:
def response_hook(span: Span, response: dict)
for example:
.. code: python
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
import elasticsearch
def request_hook(span, method, url, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
def response_hook(span, response):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
# instrument elasticsearch with request and response hooks
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
# Using elasticsearch as normal now will automatically generate spans,
# including user custom attributes added from the hooks
es = elasticsearch.Elasticsearch()
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
es.get(index='my-index', doc_type='my-type', id=1)
"""

from logging import getLogger
Expand Down Expand Up @@ -97,17 +132,23 @@ def _instrument(self, **kwargs):
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(tracer, self._span_name_prefix),
_wrap_perform_request(
tracer, self._span_name_prefix, request_hook, response_hook
),
)

def _uninstrument(self, **kwargs):
unwrap(elasticsearch.Transport, "perform_request")


def _wrap_perform_request(tracer, span_name_prefix):
def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912
def wrapper(wrapped, _, args, kwargs):
method = url = None
Expand All @@ -127,6 +168,10 @@ def wrapper(wrapped, _, args, kwargs):
with tracer.start_as_current_span(
op_name, kind=SpanKind.CLIENT,
) as span:

if callable(request_hook):
request_hook(span, method, url, kwargs)

if span.is_recording():
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
Expand All @@ -150,6 +195,9 @@ def wrapper(wrapped, _, args, kwargs):
"elasticsearch.{0}".format(member),
str(rv[member]),
)

if callable(response_hook):
response_hook(span, rv)
return rv

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import threading
from ast import literal_eval
Expand Down Expand Up @@ -316,3 +316,101 @@ def test_dsl_index(self, request_mock):
"title": "About searching",
},
)

def test_request_hook(self, request_mock):
request_hook_method_attribute = "request_hook.method"
request_hook_url_attribute = "request_hook.url"
request_hook_kwargs_attribute = "request_hook.kwargs"

def request_hook(span, method, url, kwargs):

attributes = {
request_hook_method_attribute: method,
request_hook_url_attribute: url,
request_hook_kwargs_attribute: json.dumps(kwargs),
}

if span and span.is_recording():
span.set_attributes(attributes)

ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(request_hook=request_hook)

request_mock.return_value = (
1,
{},
'{"found": false, "timed_out": true, "took": 7}',
)
es = Elasticsearch()
index = "test-index"
doc_type = "tweet"
doc_id = 1
kwargs = {"params": {"test": True}}
es.get(index=index, doc_type=doc_type, id=doc_id, **kwargs)

spans = self.get_finished_spans()

self.assertEqual(1, len(spans))
self.assertEqual(
"GET", spans[0].attributes[request_hook_method_attribute]
)
self.assertEqual(
f"/{index}/{doc_type}/{doc_id}",
spans[0].attributes[request_hook_url_attribute],
)
self.assertEqual(
json.dumps(kwargs),
spans[0].attributes[request_hook_kwargs_attribute],
)

def test_response_hook(self, request_mock):
response_attribute_name = "db.query_result"

def response_hook(span, response):
if span and span.is_recording():
span.set_attribute(
response_attribute_name, json.dumps(response)
)

ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(response_hook=response_hook)

response_payload = {
"took": 9,
"timed_out": False,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0,
},
"hits": {
"total": {"value": 1, "relation": "eq"},
"max_score": 0.18232156,
"hits": [
{
"_index": "test-index",
"_type": "tweet",
"_id": "1",
"_score": 0.18232156,
"_source": {"name": "tester"},
}
],
},
}

request_mock.return_value = (
1,
{},
json.dumps(response_payload),
)
es = Elasticsearch()
es.get(index="test-index", doc_type="tweet", id=1)

spans = self.get_finished_spans()

self.assertEqual(1, len(spans))
self.assertEqual(
json.dumps(response_payload),
spans[0].attributes[response_attribute_name],
)

0 comments on commit 291e508

Please sign in to comment.