Skip to content

Commit

Permalink
[target-allocator] Add a pre-hook to the allocator to filter out drop…
Browse files Browse the repository at this point in the history
…ped targets (#1127)

* Adding prehook allocator filter to reduce assigned targets

* add metrics to track number of targets kept after filtering

* add smaller interfaces local to packages using them

* address review feedback

* remove outdated references
  • Loading branch information
moh-osman3 authored Nov 4, 2022
1 parent ca5cd5a commit 854eb67
Show file tree
Hide file tree
Showing 15 changed files with 495 additions and 11 deletions.
5 changes: 5 additions & 0 deletions apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type OpenTelemetryTargetAllocator struct {
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
// FilterStrategy determines how to filter targets before allocating them among the collectors.
// The only current option is relabel-config (drops targets based on prom relabel_config).
// Filtering is disabled by default.
// +optional
FilterStrategy string `json:"filterStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
ServiceAccount string `json:"serviceAccount,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,12 @@ spec:
description: Enabled indicates whether to use a target allocation
mechanism for Prometheus targets or not.
type: boolean
filterStrategy:
description: FilterStrategy determines how to filter targets before
allocating them among the collectors. The only current option
is relabel-config (drops targets based on prom relabel_config).
Filtering is disabled by default.
type: string
image:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
Expand Down
21 changes: 19 additions & 2 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,34 @@ type consistentHashingAllocator struct {
targetItems map[string]*target.Item

log logr.Logger

filter Filter
}

func newConsistentHashingAllocator(log logr.Logger) Allocator {
func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Load: 1.1,
Hasher: hasher{},
}
consistentHasher := consistent.New(nil, config)
return &consistentHashingAllocator{
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
}

return chAllocator
}

// SetFilter sets the filtering hook to use.
func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
Expand Down Expand Up @@ -140,6 +152,11 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

if c.filter != nil {
targets = c.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)

c.m.Lock()
defer c.m.Unlock()

Expand Down
22 changes: 20 additions & 2 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ type leastWeightedAllocator struct {
targetItems map[string]*target.Item

log logr.Logger

filter Filter
}

// SetFilter sets the filtering hook to use.
func (allocator *leastWeightedAllocator) SetFilter(filter Filter) {
allocator.filter = filter
}

// TargetItems returns a shallow copy of the targetItems map.
Expand Down Expand Up @@ -157,6 +164,11 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

if allocator.filter != nil {
targets = allocator.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)

allocator.m.Lock()
defer allocator.m.Unlock()

Expand Down Expand Up @@ -195,10 +207,16 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co
}
}

func newLeastWeightedAllocator(log logr.Logger) Allocator {
return &leastWeightedAllocator{
func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
lwAllocator := &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
}

for _, opt := range opts {
opt(lwAllocator)
}

return lwAllocator
}
37 changes: 34 additions & 3 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type AllocatorProvider func(log logr.Logger) Allocator
type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator

var (
registry = map[string]AllocatorProvider{}
Expand All @@ -45,11 +45,41 @@ var (
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method", "strategy"})
targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "opentelemetry_allocator_targets_kept",
Help: "Number of targets kept after filtering.",
}, []string{"job_name"})
)

func New(name string, log logr.Logger) (Allocator, error) {
type AllocationOption func(Allocator)

type Filter interface {
Apply(map[string]*target.Item) map[string]*target.Item
}

func WithFilter(filter Filter) AllocationOption {
return func(allocator Allocator) {
allocator.SetFilter(filter)
}
}

func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 {
targetsPerJob := make(map[string]float64)

for _, tItem := range targets {
targetsPerJob[tItem.JobName] += 1
}

for jName, numTargets := range targetsPerJob {
targetsKeptPerJob.WithLabelValues(jName).Set(numTargets)
}

return targetsPerJob
}

func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) {
if p, ok := registry[name]; ok {
return p(log), nil
return p(log, opts...), nil
}
return nil, fmt.Errorf("unregistered strategy: %s", name)
}
Expand All @@ -67,6 +97,7 @@ type Allocator interface {
SetTargets(targets map[string]*target.Item)
TargetItems() map[string]*target.Item
Collectors() map[string]*Collector
SetFilter(filter Filter)
}

var _ consistent.Member = Collector{}
Expand Down
8 changes: 8 additions & 0 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
Expand All @@ -51,6 +52,13 @@ func (c Config) GetAllocationStrategy() string {
return "least-weighted"
}

func (c Config) GetTargetsFilterStrategy() string {
if c.FilterStrategy != nil {
return *c.FilterStrategy
}
return ""
}

type PrometheusCRWatcherConfig struct {
Enabled *bool
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
Expand All @@ -42,9 +43,14 @@ type Manager struct {
logger log.Logger
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
hook discoveryHook
}

func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options ...func(*discovery.Manager)) *Manager {
type discoveryHook interface {
SetConfig(map[string][]*relabel.Config)
}

func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook discoveryHook, options ...func(*discovery.Manager)) *Manager {
manager := discovery.NewManager(ctx, logger, options...)

go func() {
Expand All @@ -58,6 +64,7 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
logger: logger,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
hook: hook,
}
}

Expand All @@ -75,12 +82,18 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C
m.configsMap[source] = cfg

discoveryCfg := make(map[string]discovery.Configs)
relabelCfg := make(map[string][]*relabel.Config)

for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
}
}

if m.hook != nil {
m.hook.SetConfig(relabelCfg)
}
return m.manager.ApplyConfig(discoveryCfg)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
fmt.Printf("failed to load config file: %v", err)
os.Exit(1)
}
manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger())
manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger(), nil)

results = make(chan []string)
manager.Watch(func(targets map[string]*target.Item) {
Expand Down
11 changes: 9 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
)
Expand Down Expand Up @@ -72,11 +73,16 @@ func main() {

log := ctrl.Log.WithName("allocator")

allocator, err := allocation.New(cfg.GetAllocationStrategy(), log)
// allocatorPrehook will be nil if filterStrategy is not set or
// unrecognized. No filtering will be used in this case.
allocatorPrehook := prehook.New(cfg.GetTargetsFilterStrategy(), log)

allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}

watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator)
if err != nil {
setupLog.Error(err, "Can't start the watchers")
Expand All @@ -90,8 +96,9 @@ func main() {
}()

// creates a new discovery manager
discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger())
discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger(), allocatorPrehook)
defer discoveryManager.Close()

discoveryManager.Watch(allocator.SetTargets)

k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf)
Expand Down
57 changes: 57 additions & 0 deletions cmd/otel-allocator/prehook/prehook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prehook

import (
"errors"

"github.com/go-logr/logr"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

const (
relabelConfigTargetFilterName = "relabel-config"
)

type Hook interface {
Apply(map[string]*target.Item) map[string]*target.Item
SetConfig(map[string][]*relabel.Config)
GetConfig() map[string][]*relabel.Config
}

type HookProvider func(log logr.Logger) Hook

var (
registry = map[string]HookProvider{}
)

func New(name string, log logr.Logger) Hook {
if p, ok := registry[name]; ok {
return p(log.WithName("Prehook").WithName(name))
}

log.Info("Unrecognized filter strategy; filtering disabled")
return nil
}

func Register(name string, provider HookProvider) error {
if _, ok := registry[name]; ok {
return errors.New("already registered")
}
registry[name] = provider
return nil
}
Loading

0 comments on commit 854eb67

Please sign in to comment.