Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement trace_continuation_strategy #1564

Merged
merged 4 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ endif::[]
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
* Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562]
* Add specific instrumentation for SQS delete/batch-delete {pull}1567[#1567]
* Add `trace_continuation_strategy` setting {pull}1564[#1564]

[float]
===== Bug fixes
Expand Down
30 changes: 30 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,36 @@ These headers (`traceparent` and `tracestate`) are defined in the https://www.w3

Additionally, when this setting is set to `True`, the agent will set `elasticapm-traceparent` for backwards compatibility.

[float]
[[config-trace-continuation-strategy]]
==== `trace_continuation_strategy`

[options="header"]
|============
| Environment | Django/Flask | Default
| `ELASTIC_APM_TRACE_CONTINUATION_STRATEGY` | `TRACE_CONTINUATION_STRATEGY` | `continue`
|============

This option allows some control on how the APM agent handles W3C trace-context headers on incoming requests.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole section is lifted from the nodejs agent docs

By default, the `traceparent` and `tracestate` headers are used per W3C spec for distributed tracing.
However, in certain cases it can be helpful to *not* use the incoming `traceparent` header.
Some example use cases:

- An Elastic-monitored service is receiving requests with `traceparent` headers from *unmonitored* services.
- An Elastic-monitored service is publicly exposed, and does not want tracing data (trace-ids, sampling decisions) to possibly be spoofed by user requests.

Valid values are:

- `'continue'`: The default behavior. An incoming `traceparent` value is used to continue the trace and determine the sampling decision.
- `'restart'`: Always ignores the `traceparent` header of incoming requests.
A new trace-id will be generated and the sampling decision will be made based on <<config-transaction-sample-rate,`transaction_sample_rate`>>.
A *span link* will be made to the incoming traceparent.
- `'restart_external'`: If an incoming request includes the `es` vendor flag in `tracestate`, then any 'traceparent' will be considered internal and will be handled as described for `'continue'` above.
Otherwise, any `'traceparent'` is considered external and will be handled as described for `'restart'` above.

Starting with Elastic Observability 8.2, span links will be visible in trace
views.

[float]
[[config-use-elastic-excepthook]]
==== `use_elastic_excepthook`
Expand Down
15 changes: 14 additions & 1 deletion elasticapm/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import threading
from datetime import timedelta

from elasticapm.conf.constants import BASE_SANITIZE_FIELD_NAMES
from elasticapm.conf.constants import BASE_SANITIZE_FIELD_NAMES, TRACE_CONTINUATION_STRATEGY
from elasticapm.utils import compat, starmatch_to_regex
from elasticapm.utils.logging import get_logger
from elasticapm.utils.threading import IntervalTimer, ThreadManager
Expand Down Expand Up @@ -671,6 +671,19 @@ class Config(_ConfigBase):
callbacks=[_log_ecs_reformatting_callback],
default="off",
)
trace_continuation_strategy = _ConfigValue(
"TRACE_CONTINUATION_STRATEGY",
validators=[
EnumerationValidator(
[
TRACE_CONTINUATION_STRATEGY.CONTINUE,
TRACE_CONTINUATION_STRATEGY.RESTART,
TRACE_CONTINUATION_STRATEGY.RESTART_EXTERNAL,
]
)
],
default=TRACE_CONTINUATION_STRATEGY.CONTINUE,
)

@property
def is_recording(self):
Expand Down
5 changes: 5 additions & 0 deletions elasticapm/conf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ def _starmatch_to_regex(pattern):
LABEL_TYPES = (bool, int, float, decimal.Decimal)

TRACESTATE = namedtuple("TRACESTATE", ["SAMPLE_RATE"])(SAMPLE_RATE="s")
TRACE_CONTINUATION_STRATEGY = namedtuple("TRACE_CONTINUATION_STRATEGY", ["CONTINUE", "RESTART", "RESTART_EXTERNAL"])(
CONTINUE="continue",
RESTART="restart",
RESTART_EXTERNAL="restart_external",
)
26 changes: 16 additions & 10 deletions elasticapm/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from elasticapm.context import init_execution_context
from elasticapm.metrics.base_metrics import Timer
from elasticapm.utils import encoding, get_name_from_func, nested_key, url_to_destination_resource
from elasticapm.utils.disttracing import TraceParent, TracingOptions
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.logging import get_logger
from elasticapm.utils.time import time_to_perf_counter

Expand Down Expand Up @@ -204,12 +204,7 @@ def __init__(
"""
self.id = self.get_dist_tracing_id()
if not trace_parent:
trace_parent = TraceParent(
constants.TRACE_CONTEXT_VERSION,
"%032x" % random.getrandbits(128),
self.id,
TracingOptions(recorded=is_sampled),
)
trace_parent = TraceParent.new(self.id, is_sampled)

self.trace_parent: TraceParent = trace_parent
self.timestamp = start if start is not None else time.time()
Expand Down Expand Up @@ -894,18 +889,29 @@ def begin_transaction(
start: Optional[float] = None,
auto_activate: bool = True,
links: Optional[Sequence[TraceParent]] = None,
):
) -> Transaction:
"""
Start a new transactions and bind it in a thread-local variable

:param transaction_type: type of the transaction, e.g. "request"
:param trace_parent: an optional TraceParent object
:param start: override the start timestamp, mostly useful for testing
:param auto_activate: whether to set this transaction in execution_context
:param list of traceparents to causally link this transaction to

:param links: list of traceparents to causally link this transaction to
:returns the Transaction object
"""
links = links if links else []
continuation_strategy = self.config.trace_continuation_strategy

# we restart the trace if continuation strategy is "restart", or if it is "restart_external" and our
# "es" key is not in the tracestate header. In both cases, the original TraceParent is added to trace links.
if trace_parent and continuation_strategy != constants.TRACE_CONTINUATION_STRATEGY.CONTINUE:
if continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART or (
continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART_EXTERNAL
and not trace_parent.tracestate_dict
):
links.append(trace_parent)
trace_parent = None
if trace_parent:
is_sampled = bool(trace_parent.trace_options.recorded)
sample_rate = trace_parent.tracestate_dict.get(constants.TRACESTATE.SAMPLE_RATE)
Expand Down
66 changes: 47 additions & 19 deletions elasticapm/utils/disttracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import binascii
import ctypes
import itertools
import random
import re
from typing import Dict, Optional

from elasticapm.conf import constants
from elasticapm.utils.logging import get_logger
Expand All @@ -42,16 +44,31 @@
class TraceParent(object):
__slots__ = ("version", "trace_id", "span_id", "trace_options", "tracestate", "tracestate_dict", "is_legacy")

def __init__(self, version, trace_id, span_id, trace_options, tracestate=None, is_legacy=False):
self.version = version
self.trace_id = trace_id
self.span_id = span_id
self.trace_options = trace_options
self.is_legacy = is_legacy
self.tracestate = tracestate
def __init__(
self,
version: int,
trace_id: str,
span_id: str,
trace_options: "TracingOptions",
tracestate: Optional[str] = None,
is_legacy: bool = False,
):
self.version: int = version
self.trace_id: str = trace_id
self.span_id: str = span_id
self.trace_options: TracingOptions = trace_options
self.is_legacy: bool = is_legacy
self.tracestate: Optional[str] = tracestate
self.tracestate_dict = self._parse_tracestate(tracestate)

def copy_from(self, version=None, trace_id=None, span_id=None, trace_options=None, tracestate=None):
def copy_from(
self,
version: int = None,
trace_id: str = None,
span_id: str = None,
trace_options: "TracingOptions" = None,
tracestate: str = None,
):
return TraceParent(
version or self.version,
trace_id or self.trace_id,
Expand All @@ -60,13 +77,13 @@ def copy_from(self, version=None, trace_id=None, span_id=None, trace_options=Non
tracestate or self.tracestate,
)

def to_string(self):
def to_string(self) -> str:
return "{:02x}-{}-{}-{:02x}".format(self.version, self.trace_id, self.span_id, self.trace_options.asByte)

def to_ascii(self):
def to_ascii(self) -> bytes:
return self.to_string().encode("ascii")

def to_binary(self):
def to_binary(self) -> bytes:
return b"".join(
[
(self.version).to_bytes(1, byteorder="big"),
Expand All @@ -80,7 +97,18 @@ def to_binary(self):
)

@classmethod
def from_string(cls, traceparent_string, tracestate_string=None, is_legacy=False):
def new(cls, transaction_id: str, is_sampled: bool) -> "TraceParent":
return cls(
version=constants.TRACE_CONTEXT_VERSION,
trace_id="%032x" % random.getrandbits(128),
span_id=transaction_id,
trace_options=TracingOptions(recorded=is_sampled),
)

@classmethod
def from_string(
cls, traceparent_string: str, tracestate_string: Optional[str] = None, is_legacy: bool = False
) -> Optional["TraceParent"]:
try:
parts = traceparent_string.split("-")
version, trace_id, span_id, trace_flags = parts[:4]
Expand All @@ -105,11 +133,11 @@ def from_string(cls, traceparent_string, tracestate_string=None, is_legacy=False
@classmethod
def from_headers(
cls,
headers,
header_name=constants.TRACEPARENT_HEADER_NAME,
legacy_header_name=constants.TRACEPARENT_LEGACY_HEADER_NAME,
tracestate_header_name=constants.TRACESTATE_HEADER_NAME,
):
headers: dict,
header_name: str = constants.TRACEPARENT_HEADER_NAME,
legacy_header_name: str = constants.TRACEPARENT_LEGACY_HEADER_NAME,
tracestate_header_name: str = constants.TRACESTATE_HEADER_NAME,
) -> Optional["TraceParent"]:
tracestate = cls.merge_duplicate_headers(headers, tracestate_header_name)
if header_name in headers:
return TraceParent.from_string(headers[header_name], tracestate, is_legacy=False)
Expand All @@ -119,7 +147,7 @@ def from_headers(
return None

@classmethod
def from_binary(cls, data):
def from_binary(cls, data: bytes) -> Optional["TraceParent"]:
if len(data) != 29:
logger.debug("Invalid binary traceparent format, length is %d, should be 29, value %r", len(data), data)
return
Expand Down Expand Up @@ -162,7 +190,7 @@ def merge_duplicate_headers(cls, headers, key):
return ",".join([item[1] for item in headers if item[0] == key])
return headers.get(key)

def _parse_tracestate(self, tracestate):
def _parse_tracestate(self, tracestate) -> Dict[str, str]:
"""
Tracestate can contain data from any vendor, made distinct by vendor
keys. Vendors are comma-separated. The elastic (es) tracestate data is
Expand Down
41 changes: 41 additions & 0 deletions tests/client/transaction_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,44 @@ def test_transaction_span_links(elasticapm_client):
assert span["links"][0]["span_id"] == "0011223344556677"
assert span["links"][1]["trace_id"] == "00112233445566778899aabbccddeeff"
assert span["links"][1]["span_id"] == "aabbccddeeff0011"


def test_transaction_trace_continuation_continue(elasticapm_client):
elasticapm_client.config.update("1", trace_continuation_strategy=constants.TRACE_CONTINUATION_STRATEGY.CONTINUE)
tp = TraceParent.from_string("00-aabbccddeeff00112233445566778899-0011223344556677-01")
elasticapm_client.begin_transaction("a", trace_parent=tp)
elasticapm_client.end_transaction("foo")
transaction = elasticapm_client.events[constants.TRANSACTION][0]
assert transaction["trace_id"] == tp.trace_id
assert "links" not in transaction


def test_transaction_trace_continuation_restart(elasticapm_client):
elasticapm_client.config.update("1", trace_continuation_strategy=constants.TRACE_CONTINUATION_STRATEGY.RESTART)
tp = TraceParent.from_string("00-aabbccddeeff00112233445566778899-0011223344556677-01")
elasticapm_client.begin_transaction("a", trace_parent=tp)
elasticapm_client.end_transaction("foo")
transaction = elasticapm_client.events[constants.TRANSACTION][0]
assert transaction["trace_id"] != tp.trace_id
assert transaction["links"][0]["trace_id"] == tp.trace_id
assert transaction["links"][0]["span_id"] == tp.span_id


def test_transaction_trace_continuation_restart_external(elasticapm_client):
elasticapm_client.config.update(
"1", trace_continuation_strategy=constants.TRACE_CONTINUATION_STRATEGY.RESTART_EXTERNAL
)
tp = TraceParent.from_string("00-aabbccddeeff00112233445566778899-0011223344556677-01")
elasticapm_client.begin_transaction("a", trace_parent=tp)
elasticapm_client.end_transaction("foo")
transaction = elasticapm_client.events[constants.TRANSACTION][0]
assert transaction["trace_id"] != tp.trace_id
assert transaction["links"][0]["trace_id"] == tp.trace_id
assert transaction["links"][0]["span_id"] == tp.span_id

tp.add_tracestate("foo", "bar")
elasticapm_client.begin_transaction("a", trace_parent=tp)
elasticapm_client.end_transaction("foo")
transaction = elasticapm_client.events[constants.TRANSACTION][1]
assert transaction["trace_id"] == tp.trace_id
assert "links" not in transaction