Skip to content

Commit

Permalink
[processor/tailsampling] Decision cache for "keep" trace IDs (#33533)
Browse files Browse the repository at this point in the history
**Description:** 

Adds simple LRU decision cache for sampled trace IDs. 

The design makes it easy to add another cache for non-sampled IDs.

It does not save any other information other than the trace ID that is
sampled. It only holds the right half of the trace ID (as a uint64) in
the cache.

By default the cache remains no-op. Only when the user configures the
cache size will the cache become active.

**Link to tracking Issue:**
#31583

**Testing:**
* unit tests on new code 
* test in `processor_decision_test.go` to test that a trace that was
sampled, cached, but the span data was dropped persists a "keep"
decision.

**Documentation:** Added description to README
  • Loading branch information
jamesmoessis authored Jun 17, 2024
1 parent 97ce47e commit 18dc9ac
Show file tree
Hide file tree
Showing 16 changed files with 377 additions and 24 deletions.
6 changes: 6 additions & 0 deletions .chloggen/jmoe_tailsampling-decisioncache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
change_type: enhancement
component: tailsamplingprocessor
note: Simple LRU Decision Cache for "keep" decisions
issues: [31583]
subtext:
change_logs: []
14 changes: 11 additions & 3 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
- `num_traces` (default = 50000): Number of traces kept in memory.
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
- `decision_cache` (default = `sampled_cache_size: 0`): Configures amount of trace IDs to be kept in an LRU cache,
persisting the "keep" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.
If using, configure this as much higher than `num_traces` so decisions for trace IDs are kept
longer than the span data for the trace.

Each policy will result in a decision, and the processor will evaluate them to make a final decision:

Expand All @@ -63,6 +68,8 @@ processors:
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
decision_cache:
sampled_cache_size: 100000
policies:
[
{
Expand Down Expand Up @@ -433,7 +440,7 @@ help resolve this error, at the expense of increased memory usage.

## Monitoring and Tuning

See [metrics.go][metrics_go] for the full list metrics available for this component and their descriptions.
See [documentation.md][documentation_md] for the full list metrics available for this component and their descriptions.

### Dropped Traces

Expand Down Expand Up @@ -469,7 +476,8 @@ A span's arrival is considered "late" if it arrives after its trace's sampling d

There are two scenarios for late arriving spans:
- Scenario 1: While the sampling decision of the trace remains in the circular buffer of `num_traces` length, the late spans inherit that decision. That means late spans do not influence the trace's sampling decision.
- Scenario 2: After the sampling decision is removed from the buffer, it's as if this component has never seen the trace before: The late spans are buffered for `decision_wait` seconds and then a new sampling decision is made.
- Scenario 2: (Default, no decision cache configured) After the sampling decision is removed from the buffer, it's as if this component has never seen the trace before: The late spans are buffered for `decision_wait` seconds and then a new sampling decision is made.
- Scenario 3: (Decision cache is configured) When a "keep" decision is made on a trace, the trace ID is cached. The component will remember which trace IDs it sampled even after it releases the span data from memory. Unless it has been evicted from the cache after some time, it will remember the same "keep trace" decision.

Occurrences of Scenario 1 where late spans are not sampled can be tracked with the below histogram metric.
```
Expand Down Expand Up @@ -508,4 +516,4 @@ As a reminder, a policy voting to sample the trace does not guarantee sampling;
sampling_policy_evaluation_error
```

[metrics_go]: ./metrics.go
[documentation_md]: ./documentation.md
10 changes: 10 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ type OTTLConditionCfg struct {
SpanEventConditions []string `mapstructure:"spanevent"`
}

type DecisionCacheConfig struct {
// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs
// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
// For effective use, this value should be at least an order of magnitude higher than Config.NumTraces.
// If left as default 0, a no-op DecisionCache will be used.
SampledCacheSize int `mapstructure:"sampled_cache_size"`
}

// Config holds the configuration for tail-based sampling.
type Config struct {
// DecisionWait is the desired wait time from the arrival of the first span of
Expand All @@ -234,4 +242,6 @@ type Config struct {
// PolicyCfgs sets the tail-based sampling policy which makes a sampling decision
// for a given trace when requested.
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
// DecisionCache holds configuration for the decision cache(s)
DecisionCache DecisionCacheConfig `mapstructure:"decision_cache"`
}
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestLoadConfig(t *testing.T) {
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
DecisionCache: DecisionCacheConfig{SampledCacheSize: 500},
PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Expand Down
8 changes: 8 additions & 0 deletions processor/tailsamplingprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ Count of traces that were sampled or not per sampling policy
| ---- | ----------- | ---------- | --------- |
| {traces} | Sum | Int | true |

### processor_tail_sampling_early_releases_from_cache_decision

Number of spans that were able to be immediately released due to a decision cache hit.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### processor_tail_sampling_global_count_traces_sampled

Global count of traces that were sampled or not by at least one policy
Expand Down
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
Expand Down
48 changes: 48 additions & 0 deletions processor/tailsamplingprocessor/internal/cache/lru_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"

import (
"encoding/binary"

lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/pdata/pcommon"
)

// lruDecisionCache implements Cache as a simple LRU cache.
// It holds trace IDs that had sampling decisions made on them.
// It does not specify the type of sampling decision that was made, only that
// a decision was made for an ID. You need separate DecisionCaches for caching
// sampled and not sampled trace IDs.
type lruDecisionCache[V any] struct {
cache *lru.Cache[uint64, V]
}

var _ Cache[any] = (*lruDecisionCache[any])(nil)

// NewLRUDecisionCache returns a new lruDecisionCache.
// The size parameter indicates the amount of keys the cache will hold before it
// starts evicting the least recently used key.
func NewLRUDecisionCache[V any](size int) (Cache[V], error) {
c, err := lru.New[uint64, V](size)
if err != nil {
return nil, err
}
return &lruDecisionCache[V]{cache: c}, nil
}

func (c *lruDecisionCache[V]) Get(id pcommon.TraceID) (V, bool) {
return c.cache.Get(rightHalfTraceID(id))
}

func (c *lruDecisionCache[V]) Put(id pcommon.TraceID, v V) {
_ = c.cache.Add(rightHalfTraceID(id), v)
}

// Delete is no-op since LRU relies on least recently used key being evicting automatically
func (c *lruDecisionCache[V]) Delete(_ pcommon.TraceID) {}

func rightHalfTraceID(id pcommon.TraceID) uint64 {
return binary.LittleEndian.Uint64(id[8:])
}
83 changes: 83 additions & 0 deletions processor/tailsamplingprocessor/internal/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache

import (
"encoding/hex"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestSinglePut(t *testing.T) {
c, err := NewLRUDecisionCache[int](2)
require.NoError(t, err)
id, err := traceIDFromHex("12341234123412341234123412341234")
require.NoError(t, err)
c.Put(id, 123)
v, ok := c.Get(id)
assert.Equal(t, 123, v)
assert.True(t, ok)
}

func TestExceedsSizeLimit(t *testing.T) {
c, err := NewLRUDecisionCache[bool](2)
require.NoError(t, err)
id1, err := traceIDFromHex("12341234123412341234123412341231")
require.NoError(t, err)
id2, err := traceIDFromHex("12341234123412341234123412341232")
require.NoError(t, err)
id3, err := traceIDFromHex("12341234123412341234123412341233")
require.NoError(t, err)

c.Put(id1, true)
c.Put(id2, true)
c.Put(id3, true)

v, ok := c.Get(id1)
assert.False(t, v) // evicted
assert.False(t, ok) // evicted
v, ok = c.Get(id2)
assert.True(t, v)
assert.True(t, ok)
v, ok = c.Get(id3)
assert.True(t, v)
assert.True(t, ok)
}

func TestLeastRecentlyUsedIsEvicted(t *testing.T) {
c, err := NewLRUDecisionCache[bool](2)
require.NoError(t, err)
id1, err := traceIDFromHex("12341234123412341234123412341231")
require.NoError(t, err)
id2, err := traceIDFromHex("12341234123412341234123412341232")
require.NoError(t, err)
id3, err := traceIDFromHex("12341234123412341234123412341233")
require.NoError(t, err)

c.Put(id1, true)
c.Put(id2, true)
v, ok := c.Get(id1) // use id1
assert.True(t, true, v)
assert.True(t, true, ok)
c.Put(id3, true)

v, ok = c.Get(id1)
assert.True(t, v)
assert.True(t, ok)
v, ok = c.Get(id2)
assert.False(t, v) // evicted, returns zero-value
assert.False(t, ok) // evicted, not OK
v, ok = c.Get(id3)
assert.True(t, v)
assert.True(t, ok)
}

func traceIDFromHex(idStr string) (pcommon.TraceID, error) {
id := pcommon.NewTraceIDEmpty()
_, err := hex.Decode(id[:], []byte(idStr))
return id, err
}
24 changes: 24 additions & 0 deletions processor/tailsamplingprocessor/internal/cache/nop_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

type nopDecisionCache[V any] struct{}

var _ Cache[any] = (*nopDecisionCache[any])(nil)

func NewNopDecisionCache[V any]() Cache[V] {
return &nopDecisionCache[V]{}
}

func (n *nopDecisionCache[V]) Get(_ pcommon.TraceID) (V, bool) {
var v V
return v, false
}

func (n *nopDecisionCache[V]) Put(_ pcommon.TraceID, _ V) {
}

func (n *nopDecisionCache[V]) Delete(_ pcommon.TraceID) {}
21 changes: 21 additions & 0 deletions processor/tailsamplingprocessor/internal/cache/nop_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNopCache(t *testing.T) {
c := NewNopDecisionCache[bool]()
id, err := traceIDFromHex("12341234123412341234123412341234")
require.NoError(t, err)
c.Put(id, true)
v, ok := c.Get(id)
assert.False(t, v)
assert.False(t, ok)
}
17 changes: 17 additions & 0 deletions processor/tailsamplingprocessor/internal/cache/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

// Cache is a cache using a pcommon.TraceID as the key and any generic type as the value.
type Cache[V any] interface {
// Get returns the value for the given id, and a boolean to indicate whether the key was found.
// If the key is not present, the zero value is returned.
Get(id pcommon.TraceID) (V, bool)
// Put sets the value for a given id
Put(id pcommon.TraceID, v V)
// Delete deletes the value for the given id
Delete(id pcommon.TraceID)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions processor/tailsamplingprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ telemetry:
enabled: true
gauge:
value_type: int

processor_tail_sampling_early_releases_from_cache_decision:
description: Number of spans that were able to be immediately released due to a decision cache hit.
unit: "{spans}"
enabled: true
sum:
value_type: int
monotonic: true
Loading

0 comments on commit 18dc9ac

Please sign in to comment.