Skip to content

Commit

Permalink
Expand central config support
Browse files Browse the repository at this point in the history
Add central config support for ELASTIC_APM_CAPTURE_BODY
and ELASTIC_APM_TRANSACTION_MAX_SPANS.

The initial code for central config contained some hacks
specifically around the initially supported config for
controlling sampling. The code has been refactored to
support additional config attributes. In doing so, we
now have a single "instrumentationConfig" object on the
Tracer, which gets swapped out atomically when a config
change occurs, either via the API or via central config.
This has the added benefit of being slightly faster, as
we're no longer using RWMutexes, and also neater as we
now have a single immutable config object (excluding
tracer-internal config).
  • Loading branch information
axw committed Oct 10, 2019
1 parent 693514a commit e1ef314
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 355 deletions.
4 changes: 1 addition & 3 deletions capturebody.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ func (t *Tracer) CaptureHTTPRequestBody(req *http.Request) *BodyCapturer {
if req.Body == nil {
return nil
}
t.captureBodyMu.RLock()
captureBody := t.captureBody
t.captureBodyMu.RUnlock()
captureBody := t.instrumentationConfig().captureBody
if captureBody == CaptureBodyOff {
return nil
}
Expand Down
158 changes: 157 additions & 1 deletion env.go → config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/pkg/errors"

Expand Down Expand Up @@ -203,6 +205,10 @@ func initialCaptureBody() (CaptureBodyMode, error) {
if value == "" {
return defaultCaptureBody, nil
}
return parseCaptureBody(envCaptureBody, value)
}

func parseCaptureBody(name, value string) (CaptureBodyMode, error) {
switch strings.TrimSpace(strings.ToLower(value)) {
case "all":
return CaptureBodyAll, nil
Expand All @@ -213,7 +219,7 @@ func initialCaptureBody() (CaptureBodyMode, error) {
case "off":
return CaptureBodyOff, nil
}
return -1, errors.Errorf("invalid %s value %q", envCaptureBody, value)
return -1, errors.Errorf("invalid %s value %q", name, value)
}

func initialService() (name, version, environment string) {
Expand Down Expand Up @@ -261,3 +267,153 @@ func initialCentralConfigEnabled() (bool, error) {
func initialBreakdownMetricsEnabled() (bool, error) {
return configutil.ParseBoolEnv(envBreakdownMetrics, true)
}

// updateRemoteConfig updates t and cfg with changes held in "attrs", and reverts to local
// config for config attributes that have been removed (exist in old but not in attrs).
//
// On return from updateRemoteConfig, unapplied config will have been removed from attrs.
func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]string) {
warningf := func(string, ...interface{}) {}
debugf := func(string, ...interface{}) {}
errorf := func(string, ...interface{}) {}
if logger != nil {
warningf = logger.Warningf
debugf = logger.Debugf
errorf = logger.Errorf
}
envName := func(k string) string {
return "ELASTIC_APM_" + strings.ToUpper(k)
}

var updates []func(cfg *instrumentationConfig)
for k, v := range attrs {
if oldv, ok := old[k]; ok && oldv == v {
continue
}
switch envName(k) {
case envCaptureBody:
value, err := parseCaptureBody(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.captureBody = value
})
}
case envMaxSpans:
value, err := strconv.Atoi(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.maxSpans = value
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sampler = sampler
})
}
default:
warningf("central config failure: unsupported config: %s", k)
delete(attrs, k)
continue
}
debugf("central config update: updated %s to %s", k, v)
}
for k := range old {
if _, ok := attrs[k]; ok {
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
if f, ok := cfg.local[envName(k)]; ok {
f(&cfg.instrumentationConfigValues)
}
})
debugf("central config update: reverted %s to local config", k)
}
if updates != nil {
remote := make(map[string]struct{})
for k := range attrs {
remote[envName(k)] = struct{}{}
}
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.remote = remote
for _, update := range updates {
update(cfg)
}
})
}
}

// instrumentationConfig returns the current instrumentationConfig.
//
// The returned value is immutable.
func (t *Tracer) instrumentationConfig() *instrumentationConfig {
config := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)))
return (*instrumentationConfig)(config)
}

// setLocalInstrumentationConfig sets local transaction configuration with
// the specified environment variable key.
func (t *Tracer) setLocalInstrumentationConfig(envKey string, f func(cfg *instrumentationConfigValues)) {
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.local[envKey] = f
if _, ok := cfg.remote[envKey]; !ok {
f(&cfg.instrumentationConfigValues)
}
})
}

func (t *Tracer) updateInstrumentationConfig(f func(cfg *instrumentationConfig)) {
for {
oldConfig := t.instrumentationConfig()
newConfig := *oldConfig
f(&newConfig)
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)),
unsafe.Pointer(oldConfig),
unsafe.Pointer(&newConfig),
) {
return
}
}
}

// instrumentationConfig holds current configuration values, as well as information
// required to revert from remote to local configuration.
type instrumentationConfig struct {
instrumentationConfigValues

// local holds functions for setting instrumentationConfigValues to the most
// recently, locally specified configuration.
local map[string]func(*instrumentationConfigValues)

// remote holds the environment variable keys for applied remote config.
remote map[string]struct{}
}

// instrumentationConfigValues holds configuration that is accessible outside of the
// tracer loop, for instrumentation: StartTransaction, StartSpan, CaptureError, etc.
//
// NOTE(axw) when adding configuration here, you must also update `newTracer` to
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
sampler Sampler
spanFramesMinDuration time.Duration
stackTraceLimit int
}
Loading

0 comments on commit e1ef314

Please sign in to comment.