From 854eb67ffc535c3bc711bb06e06e662e4910244d Mon Sep 17 00:00:00 2001
From: Moh Osman <59479562+moh-osman3@users.noreply.github.com>
Date: Fri, 4 Nov 2022 08:04:09 -0400
Subject: [PATCH] [target-allocator] Add a pre-hook to the allocator to filter
out dropped targets (#1127)
* Adding prehook allocator filter to reduce assigned targets
* add metrics to track number of targets kept after filtering
* add smaller interfaces local to packages using them
* address review feedback
* remove outdated references
---
apis/v1alpha1/opentelemetrycollector_types.go | 5 +
...ntelemetry.io_opentelemetrycollectors.yaml | 6 +
.../allocation/consistent_hashing.go | 21 +-
.../allocation/least_weighted.go | 22 +-
cmd/otel-allocator/allocation/strategy.go | 37 +++-
cmd/otel-allocator/config/config.go | 8 +
cmd/otel-allocator/discovery/discovery.go | 15 +-
.../discovery/discovery_test.go | 2 +-
cmd/otel-allocator/main.go | 11 +-
cmd/otel-allocator/prehook/prehook.go | 57 +++++
cmd/otel-allocator/prehook/relabel.go | 101 +++++++++
cmd/otel-allocator/prehook/relabel_test.go | 203 ++++++++++++++++++
...ntelemetry.io_opentelemetrycollectors.yaml | 6 +
docs/api.md | 7 +
pkg/collector/reconcile/configmap.go | 5 +
15 files changed, 495 insertions(+), 11 deletions(-)
create mode 100644 cmd/otel-allocator/prehook/prehook.go
create mode 100644 cmd/otel-allocator/prehook/relabel.go
create mode 100644 cmd/otel-allocator/prehook/relabel_test.go
diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go
index 99b74dddde..d16abab40f 100644
--- a/apis/v1alpha1/opentelemetrycollector_types.go
+++ b/apis/v1alpha1/opentelemetrycollector_types.go
@@ -159,6 +159,11 @@ type OpenTelemetryTargetAllocator struct {
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
+ // FilterStrategy determines how to filter targets before allocating them among the collectors.
+ // The only current option is relabel-config (drops targets based on prom relabel_config).
+ // Filtering is disabled by default.
+ // +optional
+ FilterStrategy string `json:"filterStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
ServiceAccount string `json:"serviceAccount,omitempty"`
diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
index 13eab89597..fcdfed60de 100644
--- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
+++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
@@ -1701,6 +1701,12 @@ spec:
description: Enabled indicates whether to use a target allocation
mechanism for Prometheus targets or not.
type: boolean
+ filterStrategy:
+ description: FilterStrategy determines how to filter targets before
+ allocating them among the collectors. The only current option
+ is relabel-config (drops targets based on prom relabel_config).
+ Filtering is disabled by default.
+ type: string
image:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go
index f55499f791..bccd8100bf 100644
--- a/cmd/otel-allocator/allocation/consistent_hashing.go
+++ b/cmd/otel-allocator/allocation/consistent_hashing.go
@@ -49,9 +49,11 @@ type consistentHashingAllocator struct {
targetItems map[string]*target.Item
log logr.Logger
+
+ filter Filter
}
-func newConsistentHashingAllocator(log logr.Logger) Allocator {
+func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
@@ -59,12 +61,22 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator {
Hasher: hasher{},
}
consistentHasher := consistent.New(nil, config)
- return &consistentHashingAllocator{
+ chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
}
+ for _, opt := range opts {
+ opt(chAllocator)
+ }
+
+ return chAllocator
+}
+
+// SetFilter sets the filtering hook to use.
+func (c *consistentHashingAllocator) SetFilter(filter Filter) {
+ c.filter = filter
}
// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
@@ -140,6 +152,11 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()
+ if c.filter != nil {
+ targets = c.filter.Apply(targets)
+ }
+ RecordTargetsKeptPerJob(targets)
+
c.m.Lock()
defer c.m.Unlock()
diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go
index 033c7bf52b..717c147d9d 100644
--- a/cmd/otel-allocator/allocation/least_weighted.go
+++ b/cmd/otel-allocator/allocation/least_weighted.go
@@ -48,6 +48,13 @@ type leastWeightedAllocator struct {
targetItems map[string]*target.Item
log logr.Logger
+
+ filter Filter
+}
+
+// SetFilter sets the filtering hook to use.
+func (allocator *leastWeightedAllocator) SetFilter(filter Filter) {
+ allocator.filter = filter
}
// TargetItems returns a shallow copy of the targetItems map.
@@ -157,6 +164,11 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.I
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()
+ if allocator.filter != nil {
+ targets = allocator.filter.Apply(targets)
+ }
+ RecordTargetsKeptPerJob(targets)
+
allocator.m.Lock()
defer allocator.m.Unlock()
@@ -195,10 +207,16 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co
}
}
-func newLeastWeightedAllocator(log logr.Logger) Allocator {
- return &leastWeightedAllocator{
+func newLeastWeightedAllocator(log logr.Logger, opts ...AllocationOption) Allocator {
+ lwAllocator := &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
}
+
+ for _, opt := range opts {
+ opt(lwAllocator)
+ }
+
+ return lwAllocator
}
diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go
index 24d3b9b4bf..14bcca3e5a 100644
--- a/cmd/otel-allocator/allocation/strategy.go
+++ b/cmd/otel-allocator/allocation/strategy.go
@@ -26,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)
-type AllocatorProvider func(log logr.Logger) Allocator
+type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator
var (
registry = map[string]AllocatorProvider{}
@@ -45,11 +45,41 @@ var (
Name: "opentelemetry_allocator_time_to_allocate",
Help: "The time it takes to allocate",
}, []string{"method", "strategy"})
+ targetsKeptPerJob = promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "opentelemetry_allocator_targets_kept",
+ Help: "Number of targets kept after filtering.",
+ }, []string{"job_name"})
)
-func New(name string, log logr.Logger) (Allocator, error) {
+type AllocationOption func(Allocator)
+
+type Filter interface {
+ Apply(map[string]*target.Item) map[string]*target.Item
+}
+
+func WithFilter(filter Filter) AllocationOption {
+ return func(allocator Allocator) {
+ allocator.SetFilter(filter)
+ }
+}
+
+func RecordTargetsKeptPerJob(targets map[string]*target.Item) map[string]float64 {
+ targetsPerJob := make(map[string]float64)
+
+ for _, tItem := range targets {
+ targetsPerJob[tItem.JobName] += 1
+ }
+
+ for jName, numTargets := range targetsPerJob {
+ targetsKeptPerJob.WithLabelValues(jName).Set(numTargets)
+ }
+
+ return targetsPerJob
+}
+
+func New(name string, log logr.Logger, opts ...AllocationOption) (Allocator, error) {
if p, ok := registry[name]; ok {
- return p(log), nil
+ return p(log, opts...), nil
}
return nil, fmt.Errorf("unregistered strategy: %s", name)
}
@@ -67,6 +97,7 @@ type Allocator interface {
SetTargets(targets map[string]*target.Item)
TargetItems() map[string]*target.Item
Collectors() map[string]*Collector
+ SetFilter(filter Filter)
}
var _ consistent.Member = Collector{}
diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go
index 0fa8879f0e..af26544f0c 100644
--- a/cmd/otel-allocator/config/config.go
+++ b/cmd/otel-allocator/config/config.go
@@ -42,6 +42,7 @@ type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
+ FilterStrategy *string `yaml:"filter_strategy,omitempty"`
}
func (c Config) GetAllocationStrategy() string {
@@ -51,6 +52,13 @@ func (c Config) GetAllocationStrategy() string {
return "least-weighted"
}
+func (c Config) GetTargetsFilterStrategy() string {
+ if c.FilterStrategy != nil {
+ return *c.FilterStrategy
+ }
+ return ""
+}
+
type PrometheusCRWatcherConfig struct {
Enabled *bool
}
diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go
index d8298096a6..a68bebc261 100644
--- a/cmd/otel-allocator/discovery/discovery.go
+++ b/cmd/otel-allocator/discovery/discovery.go
@@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
+ "github.com/prometheus/prometheus/model/relabel"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
@@ -42,9 +43,14 @@ type Manager struct {
logger log.Logger
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
+ hook discoveryHook
}
-func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options ...func(*discovery.Manager)) *Manager {
+type discoveryHook interface {
+ SetConfig(map[string][]*relabel.Config)
+}
+
+func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, hook discoveryHook, options ...func(*discovery.Manager)) *Manager {
manager := discovery.NewManager(ctx, logger, options...)
go func() {
@@ -58,6 +64,7 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
logger: logger,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
+ hook: hook,
}
}
@@ -75,12 +82,18 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C
m.configsMap[source] = cfg
discoveryCfg := make(map[string]discovery.Configs)
+ relabelCfg := make(map[string][]*relabel.Config)
for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
+ relabelCfg[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
}
}
+
+ if m.hook != nil {
+ m.hook.SetConfig(relabelCfg)
+ }
return m.manager.ApplyConfig(discoveryCfg)
}
diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go
index 56fd1aa4e4..0f341849a1 100644
--- a/cmd/otel-allocator/discovery/discovery_test.go
+++ b/cmd/otel-allocator/discovery/discovery_test.go
@@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
fmt.Printf("failed to load config file: %v", err)
os.Exit(1)
}
- manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger())
+ manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger(), nil)
results = make(chan []string)
manager.Watch(func(targets map[string]*target.Item) {
diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go
index 1dffa9d3d7..9dd52a1054 100644
--- a/cmd/otel-allocator/main.go
+++ b/cmd/otel-allocator/main.go
@@ -39,6 +39,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery"
+ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehook"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
)
@@ -72,11 +73,16 @@ func main() {
log := ctrl.Log.WithName("allocator")
- allocator, err := allocation.New(cfg.GetAllocationStrategy(), log)
+ // allocatorPrehook will be nil if filterStrategy is not set or
+ // unrecognized. No filtering will be used in this case.
+ allocatorPrehook := prehook.New(cfg.GetTargetsFilterStrategy(), log)
+
+ allocator, err := allocation.New(cfg.GetAllocationStrategy(), log, allocation.WithFilter(allocatorPrehook))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}
+
watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator)
if err != nil {
setupLog.Error(err, "Can't start the watchers")
@@ -90,8 +96,9 @@ func main() {
}()
// creates a new discovery manager
- discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger())
+ discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger(), allocatorPrehook)
defer discoveryManager.Close()
+
discoveryManager.Watch(allocator.SetTargets)
k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf)
diff --git a/cmd/otel-allocator/prehook/prehook.go b/cmd/otel-allocator/prehook/prehook.go
new file mode 100644
index 0000000000..ebe41c0970
--- /dev/null
+++ b/cmd/otel-allocator/prehook/prehook.go
@@ -0,0 +1,57 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prehook
+
+import (
+ "errors"
+
+ "github.com/go-logr/logr"
+ "github.com/prometheus/prometheus/model/relabel"
+
+ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
+)
+
+const (
+ relabelConfigTargetFilterName = "relabel-config"
+)
+
+type Hook interface {
+ Apply(map[string]*target.Item) map[string]*target.Item
+ SetConfig(map[string][]*relabel.Config)
+ GetConfig() map[string][]*relabel.Config
+}
+
+type HookProvider func(log logr.Logger) Hook
+
+var (
+ registry = map[string]HookProvider{}
+)
+
+func New(name string, log logr.Logger) Hook {
+ if p, ok := registry[name]; ok {
+ return p(log.WithName("Prehook").WithName(name))
+ }
+
+ log.Info("Unrecognized filter strategy; filtering disabled")
+ return nil
+}
+
+func Register(name string, provider HookProvider) error {
+ if _, ok := registry[name]; ok {
+ return errors.New("already registered")
+ }
+ registry[name] = provider
+ return nil
+}
diff --git a/cmd/otel-allocator/prehook/relabel.go b/cmd/otel-allocator/prehook/relabel.go
new file mode 100644
index 0000000000..54059773c6
--- /dev/null
+++ b/cmd/otel-allocator/prehook/relabel.go
@@ -0,0 +1,101 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prehook
+
+import (
+ "github.com/go-logr/logr"
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/model/relabel"
+
+ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
+)
+
+type RelabelConfigTargetFilter struct {
+ log logr.Logger
+ relabelCfg map[string][]*relabel.Config
+}
+
+func NewRelabelConfigTargetFilter(log logr.Logger) Hook {
+ return &RelabelConfigTargetFilter{
+ log: log,
+ relabelCfg: make(map[string][]*relabel.Config),
+ }
+}
+
+// helper function converts from model.LabelSet to []labels.Label.
+func convertLabelToPromLabelSet(lbls model.LabelSet) []labels.Label {
+ newLabels := make([]labels.Label, len(lbls))
+ index := 0
+ for k, v := range lbls {
+ newLabels[index].Name = string(k)
+ newLabels[index].Value = string(v)
+ index++
+ }
+ return newLabels
+}
+
+func (tf *RelabelConfigTargetFilter) Apply(targets map[string]*target.Item) map[string]*target.Item {
+ numTargets := len(targets)
+
+ // need to wait until relabelCfg is set
+ if len(tf.relabelCfg) == 0 {
+ return targets
+ }
+
+ // Note: jobNameKey != tItem.JobName (jobNameKey is hashed)
+ for jobNameKey, tItem := range targets {
+ keepTarget := true
+ lset := convertLabelToPromLabelSet(tItem.Label)
+ for _, cfg := range tf.relabelCfg[tItem.JobName] {
+ if new_lset := relabel.Process(lset, cfg); new_lset == nil {
+ keepTarget = false
+ break // inner loop
+ } else {
+ lset = new_lset
+ }
+ }
+
+ if !keepTarget {
+ delete(targets, jobNameKey)
+ }
+ }
+
+ tf.log.V(2).Info("Filtering complete", "seen", numTargets, "kept", len(targets))
+ return targets
+}
+
+func (tf *RelabelConfigTargetFilter) SetConfig(cfgs map[string][]*relabel.Config) {
+ relabelCfgCopy := make(map[string][]*relabel.Config)
+ for key, val := range cfgs {
+ relabelCfgCopy[key] = val
+ }
+ tf.relabelCfg = relabelCfgCopy
+}
+
+func (tf *RelabelConfigTargetFilter) GetConfig() map[string][]*relabel.Config {
+ relabelCfgCopy := make(map[string][]*relabel.Config)
+ for k, v := range tf.relabelCfg {
+ relabelCfgCopy[k] = v
+ }
+ return relabelCfgCopy
+}
+
+func init() {
+ err := Register(relabelConfigTargetFilterName, NewRelabelConfigTargetFilter)
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/cmd/otel-allocator/prehook/relabel_test.go b/cmd/otel-allocator/prehook/relabel_test.go
new file mode 100644
index 0000000000..151188242f
--- /dev/null
+++ b/cmd/otel-allocator/prehook/relabel_test.go
@@ -0,0 +1,203 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package prehook
+
+import (
+ "crypto/rand"
+ "fmt"
+ "math/big"
+ "strconv"
+ "testing"
+
+ "github.com/prometheus/common/model"
+ "github.com/prometheus/prometheus/model/relabel"
+ "github.com/stretchr/testify/assert"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
+
+ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
+)
+
+var (
+ logger = logf.Log.WithName("unit-tests")
+ numTargets = 100
+
+ relabelConfigs = []relabelConfigObj{
+ {
+ cfg: relabel.Config{
+ Action: "replace",
+ Separator: ";",
+ Regex: relabel.MustNewRegexp("(.*)"),
+ Replacement: "$1",
+ },
+ isDrop: false,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"i"},
+ Regex: relabel.MustNewRegexp("(.*)"),
+ Action: "keep",
+ },
+ isDrop: false,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"i"},
+ Regex: relabel.MustNewRegexp("bad.*match"),
+ Action: "drop",
+ },
+ isDrop: false,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"label_not_present"},
+ Regex: relabel.MustNewRegexp("(.*)"),
+ Action: "keep",
+ },
+ isDrop: false,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"i"},
+ Regex: relabel.MustNewRegexp("(.*)"),
+ Action: "drop",
+ },
+ isDrop: true,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"collector"},
+ Regex: relabel.MustNewRegexp("(collector.*)"),
+ Action: "drop",
+ },
+ isDrop: true,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"i"},
+ Regex: relabel.MustNewRegexp("bad.*match"),
+ Action: "keep",
+ },
+ isDrop: true,
+ },
+ {
+ cfg: relabel.Config{
+ SourceLabels: model.LabelNames{"collector"},
+ Regex: relabel.MustNewRegexp("collectors-n"),
+ Action: "keep",
+ },
+ isDrop: true,
+ },
+ }
+
+ DefaultDropRelabelConfig = relabel.Config{
+ SourceLabels: model.LabelNames{"i"},
+ Regex: relabel.MustNewRegexp("(.*)"),
+ Action: "drop",
+ }
+)
+
+type relabelConfigObj struct {
+ cfg relabel.Config
+ isDrop bool
+}
+
+func colIndex(index, numCols int) int {
+ if numCols == 0 {
+ return -1
+ }
+ return index % numCols
+}
+
+func makeNNewTargets(n int, numCollectors int, startingIndex int) (map[string]*target.Item, int, map[string]*target.Item, map[string][]*relabel.Config) {
+ toReturn := map[string]*target.Item{}
+ expectedMap := make(map[string]*target.Item)
+ numItemsRemaining := n
+ relabelConfig := make(map[string][]*relabel.Config)
+ for i := startingIndex; i < n+startingIndex; i++ {
+ collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
+ label := model.LabelSet{
+ "collector": model.LabelValue(collector),
+ "i": model.LabelValue(strconv.Itoa(i)),
+ "total": model.LabelValue(strconv.Itoa(n + startingIndex)),
+ }
+ jobName := fmt.Sprintf("test-job-%d", i)
+ newTarget := target.NewItem(jobName, "test-url", label, collector)
+ // add a single replace, drop, or keep action as relabel_config for targets
+ var index int
+ ind, _ := rand.Int(rand.Reader, big.NewInt(int64(len(relabelConfigs))))
+
+ index = int(ind.Int64())
+
+ relabelConfig[jobName] = []*relabel.Config{
+ &relabelConfigs[index].cfg,
+ }
+
+ targetKey := newTarget.Hash()
+ if relabelConfigs[index].isDrop {
+ numItemsRemaining--
+ } else {
+ expectedMap[targetKey] = newTarget
+ }
+ toReturn[targetKey] = newTarget
+ }
+ return toReturn, numItemsRemaining, expectedMap, relabelConfig
+}
+
+func TestApply(t *testing.T) {
+ allocatorPrehook := New("relabel-config", logger)
+ assert.NotNil(t, allocatorPrehook)
+
+ targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(numTargets, 3, 0)
+ allocatorPrehook.SetConfig(relabelCfg)
+ remainingItems := allocatorPrehook.Apply(targets)
+ assert.Len(t, remainingItems, numRemaining)
+ assert.Equal(t, remainingItems, expectedTargetMap)
+
+ // clear out relabelCfg to test with empty values
+ for key := range relabelCfg {
+ relabelCfg[key] = nil
+ }
+
+ // cfg = createMockConfig(relabelCfg)
+ allocatorPrehook.SetConfig(relabelCfg)
+ remainingItems = allocatorPrehook.Apply(targets)
+ // relabelCfg is empty so targets should be unfiltered
+ assert.Len(t, remainingItems, len(targets))
+ assert.Equal(t, remainingItems, targets)
+}
+
+func TestApplyEmptyRelabelCfg(t *testing.T) {
+
+ allocatorPrehook := New("relabel-config", logger)
+ assert.NotNil(t, allocatorPrehook)
+
+ targets, _, _, _ := makeNNewTargets(numTargets, 3, 0)
+
+ relabelCfg := map[string][]*relabel.Config{}
+ allocatorPrehook.SetConfig(relabelCfg)
+ remainingItems := allocatorPrehook.Apply(targets)
+ // relabelCfg is empty so targets should be unfiltered
+ assert.Len(t, remainingItems, len(targets))
+ assert.Equal(t, remainingItems, targets)
+}
+
+func TestSetConfig(t *testing.T) {
+ allocatorPrehook := New("relabel-config", logger)
+ assert.NotNil(t, allocatorPrehook)
+
+ _, _, _, relabelCfg := makeNNewTargets(numTargets, 3, 0)
+ allocatorPrehook.SetConfig(relabelCfg)
+ assert.Equal(t, relabelCfg, allocatorPrehook.GetConfig())
+}
diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
index 3a33c5ce25..1196232386 100644
--- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
+++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
@@ -1699,6 +1699,12 @@ spec:
description: Enabled indicates whether to use a target allocation
mechanism for Prometheus targets or not.
type: boolean
+ filterStrategy:
+ description: FilterStrategy determines how to filter targets before
+ allocating them among the collectors. The only current option
+ is relabel-config (drops targets based on prom relabel_config).
+ Filtering is disabled by default.
+ type: string
image:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
diff --git a/docs/api.md b/docs/api.md
index b87e3645da..015c87490a 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -4542,6 +4542,13 @@ TargetAllocator indicates a value which determines whether to spawn a target all
Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not.