Skip to content

Commit

Permalink
Adding prehook allocator filter to reduce assigned targets
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Oct 4, 2022
1 parent 10404eb commit cbc48bf
Show file tree
Hide file tree
Showing 15 changed files with 527 additions and 10 deletions.
5 changes: 5 additions & 0 deletions apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ type OpenTelemetryTargetAllocator struct {
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
// FilterStrategy determines how to filter targets before allocating them among the collectors.
// The current options are no-op (no filtering) and relabel-config (drops targets based on prom relabel_config)
// The default is no-op
// +optional
FilterStrategy string `json:"filterStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
ServiceAccount string `json:"serviceAccount,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,12 @@ spec:
description: Enabled indicates whether to use a target allocation
mechanism for Prometheus targets or not.
type: boolean
filterStrategy:
description: FilterStrategy determines how to filter targets before
allocating them among the collectors. The current options are
no-op (no filtering) and relabel-config (drops targets based
on prom relabel_config) The default is no-op
type: string
image:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
Expand Down
12 changes: 7 additions & 5 deletions cmd/otel-allocator/allocation/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
)

type AllocatorProvider func(log logr.Logger) Allocator
Expand Down Expand Up @@ -70,11 +71,12 @@ type Allocator interface {
}

type TargetItem struct {
JobName string
Link LinkJSON
TargetURL string
Label model.LabelSet
CollectorName string
JobName string
Link LinkJSON
TargetURL string
Label model.LabelSet
CollectorName string
RelabelConfigs []*relabel.Config
}

func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem {
Expand Down
8 changes: 8 additions & 0 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
AllocationStrategy *string `yaml:"allocation_strategy,omitempty"`
FilterStrategy *string `yaml:"filter_strategy,omitempty"`
}

func (c Config) GetAllocationStrategy() string {
Expand All @@ -51,6 +52,13 @@ func (c Config) GetAllocationStrategy() string {
return "least-weighted"
}

func (c Config) GetTargetsFilterStrategy() string {
if c.FilterStrategy != nil {
return *c.FilterStrategy
}
return "no-op"
}

type PrometheusCRWatcherConfig struct {
Enabled *bool
}
Expand Down
21 changes: 17 additions & 4 deletions cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/model/relabel"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
Expand Down Expand Up @@ -84,6 +85,17 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C
return m.manager.ApplyConfig(discoveryCfg)
}

func (m *Manager) CreateRelabelConfigsMap() map[string][]*relabel.Config {
relabelConfigs := make(map[string][]*relabel.Config)
for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
relabelConfigs[scrapeConfig.JobName] = scrapeConfig.RelabelConfigs
}
}

return relabelConfigs
}

func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) {
log := m.log.WithValues("component", "opentelemetry-targetallocator")

Expand All @@ -95,16 +107,17 @@ func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) {
return
case tsets := <-m.manager.SyncCh():
targets := map[string]*allocation.TargetItem{}

relabelConfigs := m.CreateRelabelConfigsMap()
for jobName, tgs := range tsets {
var count float64 = 0
for _, tg := range tgs {
for _, t := range tg.Targets {
count++
item := &allocation.TargetItem{
JobName: jobName,
TargetURL: string(t[model.AddressLabel]),
Label: t.Merge(tg.Labels),
JobName: jobName,
TargetURL: string(t[model.AddressLabel]),
Label: t.Merge(tg.Labels),
RelabelConfigs: relabelConfigs[jobName],
}
targets[item.Hash()] = item
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery"
prehook "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/prehooktargetfilter"
allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
)

Expand Down Expand Up @@ -76,6 +77,13 @@ func main() {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}

allocatorPrehook, err := prehook.New(cfg.GetTargetsFilterStrategy(), log, allocator)
if err != nil {
setupLog.Error(err, "Unable to initialize targets filter strategy")
os.Exit(1)
}

watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator)
if err != nil {
setupLog.Error(err, "Can't start the watchers")
Expand All @@ -91,7 +99,8 @@ func main() {
// creates a new discovery manager
discoveryManager := lbdiscovery.NewManager(log, ctx, gokitlog.NewNopLogger())
defer discoveryManager.Close()
discoveryManager.Watch(allocator.SetTargets)

discoveryManager.Watch(allocatorPrehook.SetTargets)

k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf)
if err != nil {
Expand Down
66 changes: 66 additions & 0 deletions cmd/otel-allocator/prehooktargetfilter/allocator_prehook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prehooktargetfilter

import (
"errors"
"fmt"

"github.com/go-logr/logr"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
)

const (
noOpTargetFilterName = "no-op"
relabelConfigTargetFilterName = "relabel-config"
)

type AllocatorPrehook interface {
SetTargets(targets map[string]*allocation.TargetItem)
TargetItems() map[string]*allocation.TargetItem
}

type AllocatorPrehookProvider func(log logr.Logger, allocator allocation.Allocator) AllocatorPrehook

var (
registry = map[string]AllocatorPrehookProvider{}
)

func New(name string, log logr.Logger, allocator allocation.Allocator) (AllocatorPrehook, error) {
if p, ok := registry[name]; ok {
return p(log, allocator), nil
}
return nil, fmt.Errorf("unregistered filtering strategy: %s", name)
}

func Register(name string, provider AllocatorPrehookProvider) error {
if _, ok := registry[name]; ok {
return errors.New("already registered")
}
registry[name] = provider
return nil
}

func init() {
err := Register(noOpTargetFilterName, NewNoOpTargetFilter)
if err != nil {
panic(err)
}
err = Register(relabelConfigTargetFilterName, NewRelabelConfigTargetFilter)
if err != nil {
panic(err)
}
}
41 changes: 41 additions & 0 deletions cmd/otel-allocator/prehooktargetfilter/no_op_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prehooktargetfilter

import (
"github.com/go-logr/logr"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
)

type NoOpTargetFilter struct {
log logr.Logger
allocator allocation.Allocator
}

func NewNoOpTargetFilter(log logr.Logger, allocator allocation.Allocator) AllocatorPrehook {
return &NoOpTargetFilter{
log: log,
allocator: allocator,
}
}

func (tf *NoOpTargetFilter) SetTargets(targets map[string]*allocation.TargetItem) {
tf.allocator.SetTargets(targets)
}

func (tf *NoOpTargetFilter) TargetItems() map[string]*allocation.TargetItem {
return tf.allocator.TargetItems()
}
57 changes: 57 additions & 0 deletions cmd/otel-allocator/prehooktargetfilter/no_op_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prehooktargetfilter

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
)

var _ allocation.Allocator = &mockAllocator{}

type mockAllocator struct {
targetItems map[string]*allocation.TargetItem
}

func (allocator mockAllocator) SetTargets(targets map[string]*allocation.TargetItem) {
for k, v := range targets {
allocator.targetItems[k] = v
}
}

func (allocator mockAllocator) TargetItems() map[string]*allocation.TargetItem {
return allocator.targetItems
}

func (allocator mockAllocator) SetCollectors(collectors map[string]*allocation.Collector) {
}
func (allocator mockAllocator) Collectors() map[string]*allocation.Collector {
return nil
}

func TestNoOpSetTargets(t *testing.T) {
allocator := mockAllocator{targetItems: make(map[string]*allocation.TargetItem)}

allocatorPrehook, err := New("no-op", logger, allocator)
assert.Nil(t, err)

targets, _ := makeNNewTargets(numTargets, 3, 0)
allocatorPrehook.SetTargets(targets)
remainingTargetItems := allocatorPrehook.TargetItems()
assert.Len(t, remainingTargetItems, numTargets)
}
Loading

0 comments on commit cbc48bf

Please sign in to comment.