Skip to content

Commit

Permalink
[jaeger-v2] Add remotesampling extension (jaegertracing#5389)
Browse files Browse the repository at this point in the history
## Description of the changes
- Part of jaegertracing#5531
- adds static sampling support for otel-based jaeger-v2.
- supports hot reload
- added unit tests

## How was this change tested?
- `go run -tags=ui ./cmd/jaeger --config
./cmd/jaeger/config-badger.yaml`
- `make test`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [ ] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: pushkarm029 <pushkarmishra029@gmail.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Signed-off-by: Pushkar Mishra <pushkarmishra029@gmail.com>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people authored and FlamingSaint committed Jul 20, 2024
1 parent ed90e30 commit cd5c1f7
Show file tree
Hide file tree
Showing 25 changed files with 1,506 additions and 63 deletions.
17 changes: 15 additions & 2 deletions cmd/jaeger/config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
Expand All @@ -25,6 +25,16 @@ extensions:
memstore_archive:
max_traces: 100000

remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
sampling_store: memstore
initial_sampling_probability: 0.1
http:
grpc:

receivers:
otlp:
protocols:
Expand All @@ -42,6 +52,9 @@ receivers:

processors:
batch:
# Adaptive Sampling Processor is required to support adaptive sampling.
# It expects remote_sampling extension with `adaptive:` config to be enabled.
adaptive_sampling:

exporters:
jaeger_storage_exporter:
Expand Down
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling"
)

type builders struct {
Expand Down Expand Up @@ -63,7 +65,7 @@ func (b builders) build() (otelcol.Factories, error) {
jaegerquery.NewFactory(),
jaegerstorage.NewFactory(),
storagecleaner.NewFactory(),
// TODO add adaptive sampling
remotesampling.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down Expand Up @@ -101,7 +103,7 @@ func (b builders) build() (otelcol.Factories, error) {
batchprocessor.NewFactory(),
memorylimiterprocessor.NewFactory(),
// add-ons
// TODO add adaptive sampling
adaptivesampling.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error
f, ok := comp.(Extension).Factory(name)
if !ok {
return nil, fmt.Errorf(
"cannot find storage '%s' declared by '%s' extension",
"cannot find definition of storage '%s' in the configuration for extension '%s'",
name, componentType,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestStorageFactoryBadHostError(t *testing.T) {
func TestStorageFactoryBadNameError(t *testing.T) {
host := storageHost{t: t, ext: startStorageExtension(t, "foo")}
_, err := GetStorageFactory("bar", host)
require.ErrorContains(t, err, "cannot find storage 'bar'")
require.ErrorContains(t, err, "cannot find definition of storage 'bar'")
}

func TestStorageFactoryBadShutdownError(t *testing.T) {
Expand Down
91 changes: 91 additions & 0 deletions cmd/jaeger/internal/extension/remotesampling/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package remotesampling

import (
"errors"

"github.com/asaskevich/govalidator"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/confmap"

"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
)

var (
errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'")
errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'")
)

var (
_ component.Config = (*Config)(nil)
_ component.ConfigValidator = (*Config)(nil)
_ confmap.Unmarshaler = (*Config)(nil)
)

type Config struct {
File *FileConfig `mapstructure:"file"`
Adaptive *AdaptiveConfig `mapstructure:"adaptive"`
HTTP *confighttp.ServerConfig `mapstructure:"http"`
GRPC *configgrpc.ServerConfig `mapstructure:"grpc"`
}

type FileConfig struct {
// File specifies a local file as the source of sampling strategies.
Path string `valid:"required" mapstructure:"path"`
}

type AdaptiveConfig struct {
// SamplingStore is the name of the storage defined in the jaegerstorage extension.
SamplingStore string `valid:"required" mapstructure:"sampling_store"`

adaptive.Options `mapstructure:",squash"`
}

// Unmarshal is a custom unmarshaler that allows the factory to provide default values
// for nested configs (like GRPC endpoint) yes still reset the pointers to nil if the
// config did not contain the corresponding sections.
// This is a workaround for the lack of opional fields support in OTEL confmap.
// Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/10266
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
// first load the config normally
err := conf.Unmarshal(cfg)
if err != nil {
return err
}

// use string names of fields to see if they are set in the confmap
if !conf.IsSet("file") {
cfg.File = nil
}

if !conf.IsSet("adaptive") {
cfg.Adaptive = nil
}

if !conf.IsSet("grpc") {
cfg.GRPC = nil
}

if !conf.IsSet("http") {
cfg.HTTP = nil
}

return nil
}

func (cfg *Config) Validate() error {
if cfg.File == nil && cfg.Adaptive == nil {
return errNoProvider
}

if cfg.File != nil && cfg.Adaptive != nil {
return errMultipleProviders
}

_, err := govalidator.ValidateStruct(cfg)
return err
}
134 changes: 134 additions & 0 deletions cmd/jaeger/internal/extension/remotesampling/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package remotesampling

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
)

func Test_Validate(t *testing.T) {
tests := []struct {
name string
config *Config
expectedErr string
}{
{
name: "No provider specified",
config: &Config{},
expectedErr: "no sampling strategy provider specified, expecting 'adaptive' or 'file'",
},
{
name: "Both providers specified",
config: &Config{
File: &FileConfig{Path: "test-path"},
Adaptive: &AdaptiveConfig{SamplingStore: "test-store"},
},
expectedErr: "only one sampling strategy provider can be specified, 'adaptive' or 'file'",
},
{
name: "Only File provider specified",
config: &Config{
File: &FileConfig{Path: "test-path"},
},
expectedErr: "",
},
{
name: "Only Adaptive provider specified",
config: &Config{
Adaptive: &AdaptiveConfig{SamplingStore: "test-store"},
},
expectedErr: "",
},
{
name: "Invalid File provider",
config: &Config{
File: &FileConfig{Path: ""},
},
expectedErr: "File.Path: non zero value required",
},
{
name: "Invalid Adaptive provider",
config: &Config{
Adaptive: &AdaptiveConfig{SamplingStore: ""},
},
expectedErr: "Adaptive.SamplingStore: non zero value required",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.Validate()
if tt.expectedErr == "" {
require.NoError(t, err)
} else {
assert.Equal(t, tt.expectedErr, err.Error())
}
})
}
}

func Test_Unmarshal(t *testing.T) {
tests := []struct {
name string
input map[string]any
expectedCfg *Config
expectedErr string
}{
{
name: "Valid config with File",
input: map[string]any{
"file": map[string]any{
"path": "test-path",
},
},
expectedCfg: &Config{
File: &FileConfig{Path: "test-path"},
},
expectedErr: "",
},
{
name: "Valid config with Adaptive",
input: map[string]any{
"adaptive": map[string]any{
"sampling_store": "test-store",
},
},
expectedCfg: &Config{
Adaptive: &AdaptiveConfig{SamplingStore: "test-store"},
},
expectedErr: "",
},
{
name: "Empty config",
input: map[string]any{},
expectedCfg: &Config{},
expectedErr: "",
},
{
name: "Invalid config",
input: map[string]any{
"foo": "bar",
},
expectedErr: "invalid keys: foo", // sensitive to lib implementation
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := confmap.NewFromStringMap(tt.input)
var cfg Config
err := cfg.Unmarshal(conf)
if tt.expectedErr == "" {
require.NoError(t, err)
assert.Equal(t, tt.expectedCfg, &cfg)
} else {
assert.ErrorContains(t, err, tt.expectedErr)
}
})
}
}
Loading

0 comments on commit cd5c1f7

Please sign in to comment.