Skip to content

Commit

Permalink
[extension/jaegerremotesampling] Tie in the strategy storages (#8818)
Browse files Browse the repository at this point in the history
* Tie in the strategy storages

This change adds support for the strategy stores, previously referenced as client config managers.

This implements both the local file strategy store and the remote (gRPC) store.

Fixes #6695

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Add changelog

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Improved readme

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Apr 6, 2022
1 parent 94386ed commit 0ed3674
Show file tree
Hide file tree
Showing 13 changed files with 726 additions and 150 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
generated code (#5270)
- Add `make crosslink` target to ensure replace statements are included in `go.mod` for all transitive dependencies within repository (#8822)
- `filestorageextension`: Change bbolt DB settings for better performance (#9004)
- `jaegerremotesamplingextension`: Add local and remote sampling stores (#8818)

### 🛑 Breaking changes 🛑

Expand Down
15 changes: 10 additions & 5 deletions extension/jaegerremotesampling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@ Note that the port `14250` will clash with the Jaeger Receiver. When both are us

Although this extension is derived from Jaeger, it can be used by any clients who can consume this standard, such as the [OpenTelemetry Java SDK](https://github.com/open-telemetry/opentelemetry-java/tree/v1.9.1/sdk-extensions/jaeger-remote-sampler).

At this moment, the `reload_interval` option is only effective for the `file` source. In the future, this property will be used to control a local cache for a `remote` source.

The `file` source can be used to load files from the local file system or from remote HTTP/S sources. The `remote` source must be used with a gRPC server that provides a Jaeger remote sampling service.

## Configuration

```yaml
extensions:
jaegerremotesampling:
grpc:
endpoint: :15251
source:
remote:
endpoint: jaeger-collector:14250
jaegerremotesampling/1:
http:
endpoint: :5878
source:
file: /etc/otel/sampling_strategies.json
reload_interval: 1s
file: /etc/otelcol/sampling_strategies.json
jaegerremotesampling/2:
source:
reload_interval: 1s
file: http://jaeger.example.com/sampling_strategies.json
```
A sampling strategy file could look like:
Expand Down
4 changes: 3 additions & 1 deletion extension/jaegerremotesampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package jaegerremotesampling
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -60,7 +61,8 @@ func TestLoadConfig(t *testing.T) {
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
Source: Source{
File: "/etc/otel/sampling_strategies.json",
ReloadInterval: time.Second,
File: "/etc/otelcol/sampling_strategies.json",
},
},
ext1)
Expand Down
86 changes: 70 additions & 16 deletions extension/jaegerremotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,102 @@ package jaegerremotesampling // import "github.com/open-telemetry/opentelemetry-

import (
"context"
"fmt"

grpcStore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal"
)

var _ component.Extension = (*jrsExtension)(nil)

type jrsExtension struct {
httpServer component.Component
cfg *Config
telemetry component.TelemetrySettings

httpServer component.Component
samplingStore strategystore.StrategyStore

closers []func() error
}

func newExtension(cfg *Config, telemetry component.TelemetrySettings) (*jrsExtension, error) {
// TODO(jpkroehling): get a proper instance
cfgMgr := internal.NewClientConfigManager()
ext := &jrsExtension{}
func newExtension(cfg *Config, telemetry component.TelemetrySettings) *jrsExtension {
jrse := &jrsExtension{
cfg: cfg,
telemetry: telemetry,
}
return jrse
}

if cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(telemetry, *cfg.HTTPServerSettings, cfgMgr)
func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// the config validation will take care of ensuring we have one and only one of the following about the
// source of the sampling config:
// - remote (gRPC)
// - local file
// we can then use a simplified logic here to assign the appropriate store
if jrse.cfg.Source.File != "" {
opts := static.Options{
StrategiesFile: jrse.cfg.Source.File,
ReloadInterval: jrse.cfg.Source.ReloadInterval,
}
ss, err := static.NewStrategyStore(opts, jrse.telemetry.Logger)
if err != nil {
return nil, err
return fmt.Errorf("failed to create the local file strategy store: %v", err)
}
ext.httpServer = httpServer

// there's a Close function on the concrete type, which is not visible to us...
// how can we close it then?
jrse.samplingStore = ss
}

return ext, nil
}
if jrse.cfg.Source.Remote != nil {
opts, err := jrse.cfg.Source.Remote.ToDialOptions(host, jrse.telemetry)
if err != nil {
return fmt.Errorf("error while setting up the remote sampling source: %v", err)
}
conn, err := grpc.Dial(jrse.cfg.Source.Remote.Endpoint, opts...)
if err != nil {
return fmt.Errorf("error while connecting to the remote sampling source: %v", err)
}

jrse.samplingStore = grpcStore.NewConfigManager(conn)
jrse.closers = append(jrse.closers, func() error {
return conn.Close()
})
}

if jrse.cfg.HTTPServerSettings != nil {
httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerSettings, jrse.samplingStore)
if err != nil {
return fmt.Errorf("error while creating the HTTP server: %v", err)
}
jrse.httpServer = httpServer
}

func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error {
// then we start our own server interfaces, starting with the HTTP one
err := jrse.httpServer.Start(ctx, host)
if err != nil {
return err
return fmt.Errorf("error while starting the HTTP server: %v", err)
}

return nil
}

func (jrse *jrsExtension) Shutdown(ctx context.Context) error {
err := jrse.httpServer.Shutdown(ctx)
if err != nil {
return err
// we probably don't want to break whenever an error occurs, we want to continue and close the other resources
if err := jrse.httpServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
}

for _, closer := range jrse.closers {
if err := closer(); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the sampling store", zap.Error(err))
}
}

return nil
Expand Down
96 changes: 43 additions & 53 deletions extension/jaegerremotesampling/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,79 @@ package jaegerremotesampling

import (
"context"
"errors"
"fmt"
"net"
"path/filepath"
"testing"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"google.golang.org/grpc"
)

func TestNewExtension(t *testing.T) {
// test
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())

// verify
assert.NotNil(t, e)
}

func TestStartAndShutdown(t *testing.T) {
func TestStartAndShutdownLocalFile(t *testing.T) {
// prepare
e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
cfg := createDefaultConfig().(*Config)
cfg.Source.File = filepath.Join("testdata", "strategy.json")

e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.NoError(t, e.Shutdown(context.Background()))
}

func TestFailedToStartHTTPServer(t *testing.T) {
// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
func TestStartAndShutdownRemote(t *testing.T) {
// prepare the socket the mock server will listen at
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

e.httpServer = &mockComponent{
StartFunc: func(_ context.Context, _ component.Host) error {
return errBooBoo
},
// create the mock server
server := grpc.NewServer()
go func() {
err = server.Serve(lis)
require.NoError(t, err)
}()

// register the service
api_v2.RegisterSamplingManagerServer(server, &samplingServer{})

// create the config, pointing to the mock server
cfg := createDefaultConfig().(*Config)
cfg.Source.Remote = &configgrpc.GRPCClientSettings{
Endpoint: fmt.Sprintf("localhost:%d", lis.Addr().(*net.TCPAddr).Port),
WaitForReady: true,
}

// test and verify
assert.Equal(t, errBooBoo, e.Start(context.Background(), componenttest.NewNopHost()))
}

func TestFailedToShutdownHTTPServer(t *testing.T) {
// prepare
errBooBoo := errors.New("the server made a boo boo")

e, err := newExtension(createDefaultConfig().(*Config), componenttest.NewNopTelemetrySettings())
// create the extension
e := newExtension(cfg, componenttest.NewNopTelemetrySettings())
require.NotNil(t, e)
require.NoError(t, err)

e.httpServer = &mockComponent{
ShutdownFunc: func(_ context.Context) error {
return errBooBoo
},
}
require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))

// test and verify
assert.Equal(t, errBooBoo, e.Shutdown(context.Background()))
}

type mockComponent struct {
StartFunc func(_ context.Context, _ component.Host) error
ShutdownFunc func(_ context.Context) error
// test
assert.NoError(t, e.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, e.Shutdown(context.Background()))
}

func (s *mockComponent) Start(ctx context.Context, host component.Host) error {
if s.StartFunc == nil {
return nil
}

return s.StartFunc(ctx, host)
type samplingServer struct {
api_v2.UnimplementedSamplingManagerServer
}

func (s *mockComponent) Shutdown(ctx context.Context) error {
if s.ShutdownFunc == nil {
return nil
}

return s.ShutdownFunc(ctx)
func (s samplingServer) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
}, nil
}
3 changes: 2 additions & 1 deletion extension/jaegerremotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ func createDefaultConfig() config.Extension {
Endpoint: ":14250",
},
},
Source: Source{},
}
}

func createExtension(_ context.Context, set component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
return newExtension(cfg.(*Config), set.TelemetrySettings)
return newExtension(cfg.(*Config), set.TelemetrySettings), nil
}
Loading

0 comments on commit 0ed3674

Please sign in to comment.