Skip to content

Commit

Permalink
Redis Cache Module - 2 - Add Cache Spans (#3075)
Browse files Browse the repository at this point in the history
* Some work to use same function for queries and caches module

* Moved functions to better place

* Added tests

* Fix

* Tests and linting

* Thats important for Python 3.6

* Fixed some tests

* Removed ipdb

* more fixing

* Cleanup

* Async cache spans

* Added async tests

* Fixed async tests

* Guard for not running async tests when there is no async fakeredis for that python version

* linting

* Use new names for ops

* Renamed for consistency

* fix _get_op()

* Cleaning up unused properties/parameters

* Use _get_safe_key in Django integration

* Fixed typing

* More tests

* Only return the keys in set_many, makes more sense

* Linting

* Cleanup

* fix(clickhouse): `_sentry_span` might be missing (#3096)

We started auto-enabling the ClickHouse integration in 2.0+. This led to it getting auto-enabled also for folks using ClickHouse with Django via `django-clickhouse-backend`, but it turns out that the integration doesn't work properly with `django-clickhouse-backend` and leads to `AttributeError: 'Connection' object has no attribute '_sentry_span'`.

* Make _get_safe_key work for all multi key methods in django and redis

* Fixed kwargs case and updated tests

* Updated tests

* cache.set should be cache.put

* Fix `cohere` testsuite for new release of `cohere`. (#3098)

* Check for new class to signal end of stream

---------

Co-authored-by: Ivana Kellyerova <ivana.kellyerova@sentry.io>
  • Loading branch information
antonpirker and sentrivana committed May 23, 2024
1 parent 8f23a9a commit 8c33a76
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 92 deletions.
2 changes: 1 addition & 1 deletion sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class SPANDATA:
class OP:
ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic"
CACHE_GET = "cache.get"
CACHE_SET = "cache.set"
CACHE_PUT = "cache.put"
COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere"
COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere"
DB = "db"
Expand Down
15 changes: 8 additions & 7 deletions sentry_sdk/integrations/clickhouse_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _wrap_end(f: Callable[P, T]) -> Callable[P, T]:
def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T:
res = f(*args, **kwargs)
instance = args[0]
span = instance.connection._sentry_span # type: ignore[attr-defined]
span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]

if span is not None:
if res is not None and should_send_default_pii():
Expand All @@ -129,14 +129,15 @@ def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]:
def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T:
instance = args[0] # type: clickhouse_driver.client.Client
data = args[2]
span = instance.connection._sentry_span
span = getattr(instance.connection, "_sentry_span", None)

_set_db_data(span, instance.connection)
if span is not None:
_set_db_data(span, instance.connection)

if should_send_default_pii():
db_params = span._data.get("db.params", [])
db_params.extend(data)
span.set_data("db.params", db_params)
if should_send_default_pii():
db_params = span._data.get("db.params", [])
db_params.extend(data)
span.set_data("db.params", db_params)

return f(*args, **kwargs)

Expand Down
10 changes: 8 additions & 2 deletions sentry_sdk/integrations/cohere.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
try:
from cohere.client import Client
from cohere.base_client import BaseCohere
from cohere import ChatStreamEndEvent, NonStreamedChatResponse
from cohere import (
ChatStreamEndEvent,
NonStreamedChatResponse,
StreamedChatResponse_StreamEnd,
)

if TYPE_CHECKING:
from cohere import StreamedChatResponse
Expand Down Expand Up @@ -181,7 +185,9 @@ def new_iterator():

with capture_internal_exceptions():
for x in old_iterator:
if isinstance(x, ChatStreamEndEvent):
if isinstance(x, ChatStreamEndEvent) or isinstance(
x, StreamedChatResponse_StreamEnd
):
collect_chat_response_fields(
span,
x.response,
Expand Down
30 changes: 6 additions & 24 deletions sentry_sdk/integrations/django/caching.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
from typing import TYPE_CHECKING
from sentry_sdk.integrations.redis.utils import _get_safe_key
from urllib3.util import parse_url as urlparse

from django import VERSION as DJANGO_VERSION
Expand All @@ -8,7 +9,6 @@
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
capture_internal_exceptions,
ensure_integration_enabled,
)
Expand All @@ -28,27 +28,9 @@
]


def _get_key(args, kwargs):
# type: (list[Any], dict[str, Any]) -> str
key = ""

if args is not None and len(args) >= 1:
key = args[0]
elif kwargs is not None and "key" in kwargs:
key = kwargs["key"]

if isinstance(key, dict):
# Do not leak sensitive data
# `set_many()` has a dict {"key1": "value1", "key2": "value2"} as first argument.
# Those values could include sensitive data so we replace them with a placeholder
key = {x: SENSITIVE_DATA_SUBSTITUTE for x in key}

return str(key)


def _get_span_description(method_name, args, kwargs):
# type: (str, list[Any], dict[str, Any]) -> str
return _get_key(args, kwargs)
# type: (str, tuple[Any], dict[str, Any]) -> str
return _get_safe_key(method_name, args, kwargs)


def _patch_cache_method(cache, method_name, address, port):
Expand All @@ -61,11 +43,11 @@ def _patch_cache_method(cache, method_name, address, port):
def _instrument_call(
cache, method_name, original_method, args, kwargs, address, port
):
# type: (CacheHandler, str, Callable[..., Any], list[Any], dict[str, Any], Optional[str], Optional[int]) -> Any
# type: (CacheHandler, str, Callable[..., Any], tuple[Any, ...], dict[str, Any], Optional[str], Optional[int]) -> Any
is_set_operation = method_name.startswith("set")
is_get_operation = not is_set_operation

op = OP.CACHE_SET if is_set_operation else OP.CACHE_GET
op = OP.CACHE_PUT if is_set_operation else OP.CACHE_GET
description = _get_span_description(method_name, args, kwargs)

with sentry_sdk.start_span(op=op, description=description) as span:
Expand All @@ -78,7 +60,7 @@ def _instrument_call(
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)

key = _get_key(args, kwargs)
key = _get_safe_key(method_name, args, kwargs)
if key != "":
span.set_data(SPANDATA.CACHE_KEY, key)

Expand Down
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.redis.consts import _DEFAULT_MAX_DATA_SIZE
from sentry_sdk.integrations.redis.rb import _patch_rb
Expand All @@ -6,13 +7,17 @@
from sentry_sdk.integrations.redis.redis_py_cluster_legacy import _patch_rediscluster
from sentry_sdk.utils import logger

if TYPE_CHECKING:
from typing import Optional


class RedisIntegration(Integration):
identifier = "redis"

def __init__(self, max_data_size=_DEFAULT_MAX_DATA_SIZE):
# type: (int) -> None
def __init__(self, max_data_size=_DEFAULT_MAX_DATA_SIZE, cache_prefixes=None):
# type: (int, Optional[list[str]]) -> None
self.max_data_size = max_data_size
self.cache_prefixes = cache_prefixes if cache_prefixes is not None else []

@staticmethod
def setup_once():
Expand Down
47 changes: 40 additions & 7 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis.modules.caches import (
_compile_cache_span_properties,
_set_cache_data,
)
from sentry_sdk.integrations.redis.modules.queries import _compile_db_span_properties
from sentry_sdk.integrations.redis.utils import (
_get_span_description,
_set_client_data,
_set_pipeline_data,
)
Expand Down Expand Up @@ -56,15 +60,44 @@ def patch_redis_async_client(cls, is_cluster, set_db_data_fn):

async def _sentry_execute_command(self, name, *args, **kwargs):
# type: (Any, str, *Any, **Any) -> Any
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

description = _get_span_description(name, *args)
cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

with sentry_sdk.start_span(op=OP.DB_REDIS, description=description) as span:
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)
cache_span = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
description=cache_properties["description"],
)
cache_span.__enter__()

return await old_execute_command(self, name, *args, **kwargs)
db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
description=db_properties["description"],
)
db_span.__enter__()

set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

value = await old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
cache_span.__exit__(None, None, None)

return value

cls.execute_command = _sentry_execute_command # type: ignore
47 changes: 37 additions & 10 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis.modules.caches import (
_compile_cache_span_properties,
_set_cache_data,
)
from sentry_sdk.integrations.redis.modules.queries import _compile_db_span_properties
from sentry_sdk.integrations.redis.utils import (
_get_span_description,
_set_client_data,
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions
import sentry_sdk


if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
Expand Down Expand Up @@ -64,18 +69,40 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

description = _get_span_description(name, *args)
cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
description=cache_properties["description"],
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

data_should_be_truncated = (
integration.max_data_size and len(description) > integration.max_data_size
db_span = sentry_sdk.start_span(
op=db_properties["op"],
description=db_properties["description"],
)
if data_should_be_truncated:
description = description[: integration.max_data_size - len("...")] + "..."
db_span.__enter__()

with sentry_sdk.start_span(op=OP.DB_REDIS, description=description) as span:
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)
set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
value = old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
cache_span.__exit__(None, None, None)

return value

cls.execute_command = sentry_patched_execute_command
7 changes: 6 additions & 1 deletion sentry_sdk/integrations/redis/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"],
)
_MULTI_KEY_COMMANDS = frozenset(
["del", "touch", "unlink"],
[
"del",
"touch",
"unlink",
"mget",
],
)
_COMMANDS_INCLUDING_SENSITIVE_DATA = [
"auth",
Expand Down
Loading

0 comments on commit 8c33a76

Please sign in to comment.