diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 747736361f..ac79522da0 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -108,6 +108,8 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { + // AllocationStrategy determines which strategy the target allocator should use for allocation + AllocationStrategy string `json:"allocationStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. // +optional ServiceAccount string `json:"serviceAccount,omitempty"` diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index cba3b24b99..776bebf9fd 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index da91c7ce09..01dc8f2e2b 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -683,6 +683,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index be4d82d3eb..62dfc8f586 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -1,14 +1,13 @@ package allocation import ( - "fmt" - "net/url" "sync" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" ) var ( @@ -16,10 +15,7 @@ var ( Name: "opentelemetry_allocator_collectors_allocatable", Help: "Number of collectors the allocator is able to allocate to.", }) - targetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_per_collector", - Help: "The number of targets for each collector.", - }, []string{"collector_name"}) + timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "opentelemetry_allocator_time_to_allocate", Help: "The time it takes to allocate", @@ -33,121 +29,45 @@ var ( Keep a Map of what each collector currently holds and update it based on new scrape target updates */ -type TargetItem struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - Collector *collector -} - -func (t TargetItem) hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() -} - -// Create a struct that holds collector - and jobs for that collector -// This struct will be parsed into endpoint with collector and jobs info - -type collector struct { - Name string - NumTargets int -} - // Allocator makes decisions to distribute work among // a number of OpenTelemetry collectors based on the number of targets. // Users need to call SetTargets when they have new targets in their // clusters and call SetCollectors when the collectors have changed. type Allocator struct { // m protects collectors and targetItems for concurrent use. - m sync.RWMutex - collectors map[string]*collector // all current collectors - targetItems map[string]*TargetItem + m sync.RWMutex + state strategy.State - log logr.Logger + log logr.Logger + strategy strategy.Allocator } // TargetItems returns a shallow copy of the targetItems map. -func (allocator *Allocator) TargetItems() map[string]*TargetItem { +func (allocator *Allocator) TargetItems() map[string]strategy.TargetItem { allocator.m.RLock() defer allocator.m.RUnlock() - targetItemsCopy := make(map[string]*TargetItem) - for k, v := range allocator.targetItems { + targetItemsCopy := make(map[string]strategy.TargetItem) + for k, v := range allocator.state.TargetItems() { targetItemsCopy[k] = v } return targetItemsCopy } // Collectors returns a shallow copy of the collectors map. -func (allocator *Allocator) Collectors() map[string]*collector { +func (allocator *Allocator) Collectors() map[string]strategy.Collector { allocator.m.RLock() defer allocator.m.RUnlock() - collectorsCopy := make(map[string]*collector) - for k, v := range allocator.collectors { + collectorsCopy := make(map[string]strategy.Collector) + for k, v := range allocator.state.Collectors() { collectorsCopy[k] = v } return collectorsCopy } -// findNextCollector finds the next collector with fewer number of targets. -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. -func (allocator *Allocator) findNextCollector() *collector { - var col *collector - for _, v := range allocator.collectors { - // If the initial collector is empty, set the initial collector to the first element of map - if col == nil { - col = v - } else { - if v.NumTargets < col.NumTargets { - col = v - } - } - } - return col -} - -// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems -// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. -// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap -func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { - chosenCollector := allocator.findNextCollector() - targetItem := TargetItem{ - JobName: target.JobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - Collector: chosenCollector, - } - allocator.targetItems[targetItem.hash()] = &targetItem - chosenCollector.NumTargets++ - targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) -} - -// getCollectorChanges returns the new and removed collectors respectively. -// This method is called from within SetCollectors, which acquires the needed lock. -func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { - var newCollectors []string - var removedCollectors []string - // Used as a set to check for removed collectors - tempCollectorMap := map[string]bool{} - for _, s := range collectors { - if _, found := allocator.collectors[s]; !found { - newCollectors = append(newCollectors, s) - } - tempCollectorMap[s] = true - } - for k := range allocator.collectors { - if _, found := tempCollectorMap[k]; !found { - removedCollectors = append(removedCollectors, k) - } - } - return newCollectors, removedCollectors -} - // SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. -func (allocator *Allocator) SetTargets(targets []TargetItem) { +func (allocator *Allocator) SetTargets(targets []strategy.TargetItem) { timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) defer timer.ObserveDuration() @@ -155,34 +75,15 @@ func (allocator *Allocator) SetTargets(targets []TargetItem) { defer allocator.m.Unlock() // Make the temp map for access - tempTargetMap := make(map[string]TargetItem, len(targets)) + tempTargetMap := make(map[string]strategy.TargetItem, len(targets)) for _, target := range targets { - tempTargetMap[target.hash()] = target - } - - // Check for removals - for k, target := range allocator.targetItems { - // if the old target is no longer in the new list, remove it - if _, ok := tempTargetMap[k]; !ok { - allocator.collectors[target.Collector.Name].NumTargets-- - delete(allocator.targetItems, k) - targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) - } - } - - // Check for additions - for k, target := range tempTargetMap { - // Do nothing if the item is already there - if _, ok := allocator.targetItems[k]; ok { - continue - } else { - // Assign new set of collectors with the one different name - allocator.addTargetToTargetItems(&target) - } + tempTargetMap[target.Hash()] = target } + newState := strategy.NewState(allocator.state.Collectors(), tempTargetMap) + allocator.state = allocator.strategy.Allocate(allocator.state, newState) } -// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// SetCollectors sets the set of collectors with key=collectorName, value=CollectorName object. // This method is called when Collectors are added or removed. func (allocator *Allocator) SetCollectors(collectors []string) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") @@ -197,41 +98,21 @@ func (allocator *Allocator) SetCollectors(collectors []string) { allocator.m.Lock() defer allocator.m.Unlock() - newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) - if len(newCollectors) == 0 && len(removedCollectors) == 0 { - log.Info("No changes to the collectors found") - return - } - - // Clear existing collectors - for _, k := range removedCollectors { - delete(allocator.collectors, k) - targetsPerCollector.WithLabelValues(k).Set(0) - } - // Insert the new collectors - for _, i := range newCollectors { - allocator.collectors[i] = &collector{Name: i, NumTargets: 0} - } - - // find targets which need to be redistributed - var redistribute []*TargetItem - for _, item := range allocator.targetItems { - for _, s := range removedCollectors { - if item.Collector.Name == s { - redistribute = append(redistribute, item) - } + newCollectors := map[string]strategy.Collector{} + for _, s := range collectors { + newCollectors[s] = strategy.Collector{ + Name: s, + NumTargets: 0, } } - // Re-Allocate the existing targets - for _, item := range redistribute { - allocator.addTargetToTargetItems(item) - } + newState := strategy.NewState(newCollectors, allocator.state.TargetItems()) + allocator.state = allocator.strategy.Allocate(allocator.state, newState) } -func NewAllocator(log logr.Logger) *Allocator { +func NewAllocator(log logr.Logger, allocatorStrategy strategy.Allocator) *Allocator { return &Allocator{ - log: log, - collectors: make(map[string]*collector), - targetItems: make(map[string]*TargetItem), + log: log, + state: strategy.NewState(make(map[string]strategy.Collector), make(map[string]strategy.TargetItem)), + strategy: allocatorStrategy, } } diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go index 0b754cb47a..de8bde1f5d 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/allocator_test.go @@ -4,6 +4,9 @@ import ( "math" "testing" + _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -11,22 +14,9 @@ import ( var logger = logf.Log.WithName("unit-tests") -// Tests the least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload -func TestFindNextCollector(t *testing.T) { - s := NewAllocator(logger) - - defaultCol := collector{Name: "default-col", NumTargets: 1} - maxCol := collector{Name: "max-col", NumTargets: 2} - leastCol := collector{Name: "least-col", NumTargets: 0} - s.collectors[maxCol.Name] = &maxCol - s.collectors[leastCol.Name] = &leastCol - s.collectors[defaultCol.Name] = &defaultCol - - assert.Equal(t, "least-col", s.findNextCollector().Name) -} - func TestSetCollectors(t *testing.T) { - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) @@ -42,16 +32,17 @@ func TestSetCollectors(t *testing.T) { func TestAddingAndRemovingTargets(t *testing.T) { // prepare allocator with initial targets and collectors - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) labels := model.LabelSet{} initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem + var targetList []strategy.TargetItem for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + targetList = append(targetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly @@ -63,9 +54,9 @@ func TestAddingAndRemovingTargets(t *testing.T) { // prepare second round of targets tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} - var newTargetList []TargetItem + var newTargetList []strategy.TargetItem for _, i := range tar { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + newTargetList = append(newTargetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that fewer targets are found - removed @@ -86,7 +77,8 @@ func TestAddingAndRemovingTargets(t *testing.T) { // Tests that two targets with the same target url and job name but different label set are both added func TestAllocationCollision(t *testing.T) { // prepare allocator with initial targets and collectors - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) @@ -97,7 +89,7 @@ func TestAllocationCollision(t *testing.T) { "test": "test2", } - targetList := []TargetItem{ + targetList := []strategy.TargetItem{ {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, } @@ -112,28 +104,29 @@ func TestAllocationCollision(t *testing.T) { // verify results map for _, i := range targetList { - _, ok := targetItems[i.hash()] + _, ok := targetItems[i.Hash()] assert.True(t, ok) } } func TestNoCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) labels := model.LabelSet{} expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) + assert.Len(t, s.Collectors(), expectedColLen) for _, i := range cols { - assert.NotNil(t, s.collectors[i]) + assert.NotNil(t, s.Collectors()[i]) } initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem + var targetList []strategy.TargetItem for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + targetList = append(targetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly s.SetTargets(targetList) @@ -153,22 +146,23 @@ func TestNoCollectorReassignment(t *testing.T) { } func TestSmartCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) labels := model.LabelSet{} expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) + assert.Len(t, s.Collectors(), expectedColLen) for _, i := range cols { - assert.NotNil(t, s.collectors[i]) + assert.NotNil(t, s.Collectors()[i]) } initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem + var targetList []strategy.TargetItem for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + targetList = append(targetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) } // test that targets and collectors are added properly s.SetTargets(targetList) @@ -187,10 +181,10 @@ func TestSmartCollectorReassignment(t *testing.T) { for key, targetItem := range targetItems { item, ok := newTargetItems[key] assert.True(t, ok, "all target items should be found in new target item list") - if targetItem.Collector.Name != "col-3" { - assert.Equal(t, targetItem.Collector.Name, item.Collector.Name) + if targetItem.CollectorName != "col-3" { + assert.Equal(t, targetItem.CollectorName, item.CollectorName) } else { - assert.Equal(t, "col-4", item.Collector.Name) + assert.Equal(t, "col-4", item.CollectorName) } } } @@ -199,7 +193,8 @@ func TestSmartCollectorReassignment(t *testing.T) { func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // prepare allocator with 3 collectors and 'random' amount of targets - s := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + s := NewAllocator(logger, allocatorStrategy) cols := []string{"col-1", "col-2", "col-3"} s.SetCollectors(cols) @@ -207,9 +202,9 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006", "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016", "prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - var newTargetList []TargetItem + var newTargetList []strategy.TargetItem for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + newTargetList = append(newTargetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetTargets(newTargetList) @@ -230,9 +225,9 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - newTargetList = []TargetItem{} + newTargetList = []strategy.TargetItem{} for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + newTargetList = append(newTargetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetTargets(newTargetList) @@ -249,9 +244,9 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", "prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"} - newTargetList = []TargetItem{} + newTargetList = []strategy.TargetItem{} for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + newTargetList = append(newTargetList, strategy.TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) } s.SetTargets(newTargetList) diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index ba2602fff9..ac91804cf8 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -4,55 +4,43 @@ import ( "fmt" "net/url" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/prometheus/common/model" ) -type LinkJSON struct { - Link string `json:"_link"` -} - -type collectorJSON struct { - Link string `json:"_link"` - Jobs []targetGroupJSON `json:"targets"` -} - -type targetGroupJSON struct { - Targets []string `json:"targets"` - Labels model.LabelSet `json:"labels"` -} - -func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON { - displayData := make(map[string]collectorJSON) +func GetAllTargetsByJob(job string, cMap map[string][]strategy.TargetItem, allocator *Allocator) map[string]strategy.CollectorJSON { + displayData := make(map[string]strategy.CollectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { - var targetList []TargetItem - targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...) + var targetList []strategy.TargetItem + targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) - var targetGroupList []targetGroupJSON + var targetGroupList []strategy.TargetGroupJSON for _, t := range targetList { - targetGroupList = append(targetGroupList, targetGroupJSON{ + targetGroupList = append(targetGroupList, strategy.TargetGroupJSON{ Targets: []string{t.TargetURL}, Labels: t.Label, }) } - displayData[j.Collector.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.Collector.Name), Jobs: targetGroupList} + displayData[j.CollectorName] = strategy.CollectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} } } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator *Allocator) []targetGroupJSON { - var tgs []targetGroupJSON +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]strategy.TargetItem, allocator *Allocator) []strategy.TargetGroupJSON { + var tgs []strategy.TargetGroupJSON group := make(map[string]string) labelSet := make(map[string]model.LabelSet) - for _, col := range allocator.Collectors() { - if col.Name == collector { + for colName, _ := range allocator.Collectors() { + if colName == collector { for _, targetItemArr := range cMap { for _, targetItem := range targetItemArr { - if targetItem.Collector.Name == collector && targetItem.JobName == job { + if targetItem.CollectorName == collector && targetItem.JobName == job { group[targetItem.Label.String()] = targetItem.TargetURL labelSet[targetItem.TargetURL] = targetItem.Label } @@ -62,7 +50,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin } for _, v := range group { - tgs = append(tgs, targetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) + tgs = append(tgs, strategy.TargetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) } return tgs diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index ed62c1113f..d716d1463c 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -4,32 +4,35 @@ import ( "reflect" "testing" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) func TestGetAllTargetsByCollectorAndJob(t *testing.T) { - baseAllocator := NewAllocator(logger) + allocatorStrategy, _ := strategy.New("least-weighted") + baseAllocator := NewAllocator(logger, allocatorStrategy) baseAllocator.SetCollectors([]string{"test-collector"}) - statefulAllocator := NewAllocator(logger) + statefulAllocator := NewAllocator(logger, allocatorStrategy) statefulAllocator.SetCollectors([]string{"test-collector-0"}) type args struct { collector string job string - cMap map[string][]TargetItem + cMap map[string][]strategy.TargetItem allocator *Allocator } var tests = []struct { name string args args - want []targetGroupJSON + want []strategy.TargetGroupJSON }{ { name: "Empty target map", args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{}, + cMap: map[string][]strategy.TargetItem{}, allocator: baseAllocator, }, want: nil, @@ -39,24 +42,21 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -70,37 +70,31 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job2", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -114,38 +108,32 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", "foo": "bar", }, - TargetURL: "test-url1", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url1", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url2", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url2", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url1"}, Labels: map[model.LabelName]model.LabelValue{ diff --git a/cmd/otel-allocator/allocation/least_weighted/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted/least_weighted.go new file mode 100644 index 0000000000..6d94b20271 --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted/least_weighted.go @@ -0,0 +1,122 @@ +package least_weighted + +import ( + "os" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/utility" +) + +func init() { + err := strategy.Register("least-weighted", NewLeastWeightedStrategy) + if err != nil { + os.Exit(1) + } +} + +type LeastWeightedStrategy struct { +} + +func NewLeastWeightedStrategy() strategy.Allocator { + return &LeastWeightedStrategy{} +} + +// findNextCollector finds the next collector with fewer number of targets. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. Requires there to be at least one collector set +func (l LeastWeightedStrategy) findNextCollector(state strategy.State) strategy.Collector { + // Set a dummy to be replaced + col := strategy.Collector{NumTargets: -1} + for _, v := range state.Collectors() { + if col.NumTargets == -1 || v.NumTargets < col.NumTargets { + col = v + } + } + return col +} + +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +func (l LeastWeightedStrategy) addTargetToTargetItems(target strategy.TargetItem, state strategy.State) strategy.State { + nextState := state + chosenCollector := l.findNextCollector(nextState) + targetItem := strategy.NewTargetItem(target.JobName, target.TargetURL, target.Label, chosenCollector.Name) + chosenCollector.NumTargets++ + nextState.SetTargetItem(targetItem.Hash(), targetItem) + nextState.SetCollector(chosenCollector.Name, chosenCollector) + strategy.TargetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) + return nextState +} + +func (l LeastWeightedStrategy) handleTargets(targetDiff utility.Changes[strategy.TargetItem], currentState strategy.State) strategy.State { + nextState := currentState + // Check for removals + for k, target := range nextState.TargetItems() { + // if the current target is in the removals list + if _, ok := targetDiff.Removals()[k]; ok { + c := nextState.Collectors()[target.CollectorName] + c.NumTargets-- + nextState.SetCollector(target.CollectorName, c) + nextState.RemoveTargetItem(k) + strategy.TargetsPerCollector.WithLabelValues(target.CollectorName).Set(float64(nextState.Collectors()[target.CollectorName].NumTargets)) + } + } + + // Check for additions + for k, target := range targetDiff.Additions() { + // Do nothing if the item is already there + if _, ok := nextState.TargetItems()[k]; ok { + continue + } else { + // Assign new set of collectors with the one different name + nextState = l.addTargetToTargetItems(target, nextState) + } + } + return nextState +} + +func (l LeastWeightedStrategy) handleCollectors(collectorsDiff utility.Changes[strategy.Collector], currentState strategy.State) strategy.State { + nextState := currentState + // Clear existing collectors + for _, k := range collectorsDiff.Removals() { + nextState.RemoveCollector(k.Name) + strategy.TargetsPerCollector.WithLabelValues(k.Name).Set(0) + } + // Insert the new collectors + for _, i := range collectorsDiff.Additions() { + nextState.SetCollector(i.Name, strategy.Collector{Name: i.Name, NumTargets: 0}) + } + + // find targets which need to be redistributed + var redistribute []strategy.TargetItem + for _, item := range nextState.TargetItems() { + for _, s := range collectorsDiff.Removals() { + if item.CollectorName == s.Name { + redistribute = append(redistribute, item) + } + } + } + // Re-Allocate the existing targets + for _, item := range redistribute { + nextState = l.addTargetToTargetItems(item, nextState) + } + return nextState +} + +func (l LeastWeightedStrategy) Allocate(currentState, newState strategy.State) strategy.State { + nextState := currentState + // Check for target changes + targetsDiff := utility.DiffMaps(currentState.TargetItems(), newState.TargetItems()) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + nextState = l.handleTargets(targetsDiff, currentState) + } + // Check for collector changes + collectorsDiff := utility.DiffMaps(currentState.Collectors(), newState.Collectors()) + // If there are any additions or removals + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + nextState = l.handleCollectors(collectorsDiff, nextState) + } + return nextState +} diff --git a/cmd/otel-allocator/allocation/least_weighted/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted/least_weighted_test.go new file mode 100644 index 0000000000..dd909a588f --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted/least_weighted_test.go @@ -0,0 +1,137 @@ +package least_weighted + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + + "github.com/stretchr/testify/assert" +) + +func makeNNewTargets(n int, numCollectors int) map[string]strategy.TargetItem { + toReturn := map[string]strategy.TargetItem{} + for i := 0; i < n; i++ { + collector := fmt.Sprintf("collector-%d", i%numCollectors) + newTarget := strategy.NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) + toReturn[newTarget.Hash()] = newTarget + } + return toReturn +} + +func makeNCollectors(n int, targetsForEach int) map[string]strategy.Collector { + toReturn := map[string]strategy.Collector{} + for i := 0; i < n; i++ { + collector := fmt.Sprintf("collector-%d", i) + toReturn[collector] = strategy.Collector{ + Name: collector, + NumTargets: targetsForEach, + } + } + return toReturn +} + +func TestLeastWeightedStrategy_Allocate(t *testing.T) { + type args struct { + currentState strategy.State + newState strategy.State + } + tests := []struct { + name string + args args + want strategy.State + }{ + { + name: "single collector gets a new target", + args: args{ + currentState: strategy.NewState(makeNCollectors(1, 0), makeNNewTargets(0, 1)), + newState: strategy.NewState(makeNCollectors(1, 0), makeNNewTargets(1, 1)), + }, + want: strategy.NewState(makeNCollectors(1, 1), makeNNewTargets(1, 1)), + }, + { + name: "test set new collectors", + args: args{ + currentState: strategy.NewState(makeNCollectors(0, 0), makeNNewTargets(0, 0)), + newState: strategy.NewState(makeNCollectors(3, 0), makeNNewTargets(0, 3)), + }, + want: strategy.NewState(makeNCollectors(3, 0), makeNNewTargets(0, 3)), + }, + { + name: "test remove targets", + args: args{ + currentState: strategy.NewState(makeNCollectors(2, 2), makeNNewTargets(4, 2)), + newState: strategy.NewState(makeNCollectors(2, 2), makeNNewTargets(2, 2)), + }, + want: strategy.NewState(makeNCollectors(2, 1), makeNNewTargets(2, 2)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := LeastWeightedStrategy{} + assert.Equalf(t, tt.want, l.Allocate(tt.args.currentState, tt.args.newState), "Allocate(%v, %v)", tt.args.currentState, tt.args.newState) + }) + } +} + +func TestLeastWeightedStrategy_findNextCollector(t *testing.T) { + type args struct { + state strategy.State + } + tests := []struct { + name string + args args + want strategy.Collector + }{ + { + name: "goes to first collector with no targets", + args: args{ + state: strategy.NewState(makeNCollectors(1, 0), makeNNewTargets(0, 1)), + }, + want: strategy.Collector{ + Name: "collector-0", + NumTargets: 0, + }, + }, + { + name: "goes to collector with fewest targets with existing state", + args: args{ + state: strategy.NewState( + map[string]strategy.Collector{ + "collector-0": { + Name: "collector-0", + NumTargets: 0, + }, + "collector-1": { + Name: "collector-1", + NumTargets: 1, + }, + "collector-2": { + Name: "collector-2", + NumTargets: 2, + }, + }, + nil, + ), + }, + want: strategy.Collector{ + Name: "collector-0", + NumTargets: 0, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := LeastWeightedStrategy{} + assert.Equalf(t, tt.want, l.findNextCollector(tt.args.state), "findNextCollector(%v)", tt.args.state) + }) + } +} + +func BenchmarkLeastWeightedStrategy_AllocateTargets(b *testing.B) { + l := LeastWeightedStrategy{} + emptyState := strategy.NewState(map[string]strategy.Collector{}, map[string]strategy.TargetItem{}) + for i := 0; i < b.N; i++ { + l.Allocate(emptyState, strategy.NewState(makeNCollectors(3, 0), makeNNewTargets(i, 3))) + } +} diff --git a/cmd/otel-allocator/allocation/strategy/strategy.go b/cmd/otel-allocator/allocation/strategy/strategy.go new file mode 100644 index 0000000000..f3f5382f95 --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy/strategy.go @@ -0,0 +1,122 @@ +package strategy + +import ( + "errors" + "fmt" + "net/url" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" +) + +type AllocatorProvider func() Allocator + +var ( + registry = map[string]AllocatorProvider{} + + // TargetsPerCollector records how many targets have been assigned to each collector + // It is currently the responsibility of the strategy to track this information. + TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_per_collector", + Help: "The number of targets for each collector.", + }, []string{"collector_name"}) +) + +func New(name string) (Allocator, error) { + if p, ok := registry[name]; ok { + return p(), nil + } + return nil, errors.New(fmt.Sprintf("unregistered strategy: %s", name)) +} + +func Register(name string, provider AllocatorProvider) error { + if _, ok := registry[name]; ok { + return errors.New("already registered") + } + registry[name] = provider + return nil +} + +type Allocator interface { + Allocate(currentState, newState State) State +} + +type LinkJSON struct { + Link string `json:"_link"` +} + +type CollectorJSON struct { + Link string `json:"_link"` + Jobs []TargetGroupJSON `json:"targets"` +} + +type TargetGroupJSON struct { + Targets []string `json:"targets"` + Labels model.LabelSet `json:"labels"` +} + +type TargetItem struct { + JobName string + Link LinkJSON + TargetURL string + Label model.LabelSet + CollectorName string +} + +func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) TargetItem { + return TargetItem{ + JobName: jobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, + TargetURL: targetURL, + Label: label, + CollectorName: collectorName, + } +} + +func (t TargetItem) Hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + +// Collector Creates a struct that holds Collector information +// This struct will be parsed into endpoint with Collector and jobs info +// This struct can be extended with information like annotations and labels in the future +type Collector struct { + Name string + NumTargets int +} + +type State struct { + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]Collector + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]TargetItem +} + +func (s State) Collectors() map[string]Collector { + return s.collectors +} + +func (s State) TargetItems() map[string]TargetItem { + return s.targetItems +} + +func (s State) SetTargetItem(key string, value TargetItem) { + s.targetItems[key] = value +} + +func (s State) SetCollector(key string, value Collector) { + s.collectors[key] = value +} + +func (s State) RemoveCollector(key string) { + delete(s.collectors, key) +} + +func (s State) RemoveTargetItem(key string) { + delete(s.targetItems, key) +} + +func NewState(collectors map[string]Collector, targetItems map[string]TargetItem) State { + return State{collectors: collectors, targetItems: targetItems} +} diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 4be3bdf3c2..97d729ac09 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -31,8 +31,9 @@ const DefaultResyncTime = 5 * time.Minute const DefaultConfigFilePath string = "/conf/targetallocator.yaml" type Config struct { - LabelSelector map[string]string `yaml:"label_selector,omitempty"` - Config *promconfig.Config `yaml:"config"` + LabelSelector map[string]string `yaml:"label_selector,omitempty"` + Config *promconfig.Config `yaml:"config"` + AllocationStrategy *string `yaml:"allocation_strategy,omitempty"` } type PrometheusCRWatcherConfig struct { diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 1714f92657..f258978a81 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -5,13 +5,14 @@ import ( "github.com/go-kit/log" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var ( @@ -59,7 +60,7 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C return m.manager.ApplyConfig(discoveryCfg) } -func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { +func (m *Manager) Watch(fn func(targets []strategy.TargetItem)) { log := m.log.WithValues("component", "opentelemetry-targetallocator") go func() { @@ -69,14 +70,14 @@ func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { log.Info("Service Discovery watch event stopped: discovery manager closed") return case tsets := <-m.manager.SyncCh(): - targets := []allocation.TargetItem{} + targets := []strategy.TargetItem{} for jobName, tgs := range tsets { var count float64 = 0 for _, tg := range tgs { for _, t := range tg.Targets { count++ - targets = append(targets, allocation.TargetItem{ + targets = append(targets, strategy.TargetItem{ JobName: jobName, TargetURL: string(t[model.AddressLabel]), Label: t.Merge(tg.Labels), diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 11e601f364..b0419a103c 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -8,14 +8,15 @@ import ( "testing" gokitlog "github.com/go-kit/log" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/stretchr/testify/assert" ctrl "sigs.k8s.io/controller-runtime" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var cfg config.Config @@ -32,7 +33,7 @@ func TestMain(m *testing.M) { manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) results = make(chan []string) - manager.Watch(func(targets []allocation.TargetItem) { + manager.Watch(func(targets []strategy.TargetItem) { var result []string for _, t := range targets { result = append(result, t.TargetURL) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index bd2bd0a469..39dd321450 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -12,15 +12,18 @@ import ( gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + ctrl "sigs.k8s.io/controller-runtime" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -41,13 +44,23 @@ func main() { setupLog.Error(err, "Failed to parse parameters") os.Exit(1) } + cfg, err := config.Load(*cliConf.ConfigFilePath) + if err != nil { + setupLog.Error(err, "Unable to load configuration") + } cliConf.RootLogger.Info("Starting the Target Allocator") ctx := context.Background() log := ctrl.Log.WithName("allocator") - allocator := allocation.NewAllocator(log) + + allocatorStrategy, err := strategy.New(*cfg.AllocationStrategy) + if err != nil { + setupLog.Error(err, "Unable to initialize allocation strategy") + os.Exit(1) + } + allocator := allocation.NewAllocator(log, allocatorStrategy) watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator) if err != nil { setupLog.Error(err, "Can't start the watchers") @@ -60,7 +73,13 @@ func main() { defer discoveryManager.Close() discoveryManager.Watch(allocator.SetTargets) - srv, err := newServer(log, allocator, discoveryManager, cliConf) + k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) + if err != nil { + setupLog.Error(err, "Can't start the k8s client") + os.Exit(1) + } + + srv, err := newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Can't start the server") } @@ -91,7 +110,7 @@ func main() { if err := srv.Shutdown(ctx); err != nil { setupLog.Error(err, "Cannot shutdown the server") } - srv, err = newServer(log, allocator, discoveryManager, cliConf) + srv, err = newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Error restarting the server with new config") } @@ -126,11 +145,7 @@ type server struct { server *http.Server } -func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, cliConf config.CLIConfig) (*server, error) { - k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) - if err != nil { - return nil, err - } +func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) (*server, error) { s := &server{ logger: log, allocator: allocator, @@ -142,7 +157,7 @@ func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManage router.HandleFunc("/jobs", s.JobHandler).Methods("GET") router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET") router.Path("/metrics").Handler(promhttp.Handler()) - s.server = &http.Server{Addr: *cliConf.ListenAddr, Handler: router} + s.server = &http.Server{Addr: *listenAddr, Handler: router} return s, nil } @@ -178,9 +193,9 @@ func (s *server) Shutdown(ctx context.Context) error { } func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { - displayData := make(map[string]allocation.LinkJSON) + displayData := make(map[string]strategy.LinkJSON) for _, v := range s.allocator.TargetItems() { - displayData[v.JobName] = allocation.LinkJSON{v.Link.Link} + displayData[v.JobName] = strategy.LinkJSON{Link: v.Link.Link} } jsonHandler(w, r, displayData) } @@ -199,9 +214,9 @@ func (s *server) PrometheusMiddleware(next http.Handler) http.Handler { func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] - var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem + var compareMap = make(map[string][]strategy.TargetItem) // CollectorName+jobName -> TargetItem for _, v := range s.allocator.TargetItems() { - compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v) + compareMap[v.CollectorName+v.JobName] = append(compareMap[v.CollectorName+v.JobName], v) } params := mux.Vars(r) jobId, err := url.QueryUnescape(params["job_id"]) diff --git a/cmd/otel-allocator/utility/utility.go b/cmd/otel-allocator/utility/utility.go new file mode 100644 index 0000000000..30d26d755c --- /dev/null +++ b/cmd/otel-allocator/utility/utility.go @@ -0,0 +1,36 @@ +package utility + +type Changes[T any] struct { + additions map[string]T + removals map[string]T +} + +func (c Changes[T]) Additions() map[string]T { + return c.additions +} + +func (c Changes[T]) Removals() map[string]T { + return c.removals +} + +func DiffMaps[T any](current, new map[string]T) Changes[T] { + additions := map[string]T{} + removals := map[string]T{} + // Used as a set to check for removed items + newMembership := map[string]bool{} + for key, value := range new { + if _, found := current[key]; !found { + additions[key] = value + } + newMembership[key] = true + } + for key, value := range current { + if _, found := newMembership[key]; !found { + removals[key] = value + } + } + return Changes[T]{ + additions: additions, + removals: removals, + } +} diff --git a/cmd/otel-allocator/utility/utility_test.go b/cmd/otel-allocator/utility/utility_test.go new file mode 100644 index 0000000000..47943b97eb --- /dev/null +++ b/cmd/otel-allocator/utility/utility_test.go @@ -0,0 +1,63 @@ +package utility + +import ( + "reflect" + "testing" +) + +func TestDiffMaps(t *testing.T) { + type args struct { + current map[string]string + new map[string]string + } + tests := []struct { + name string + args args + want Changes[string] + }{ + { + name: "basic replacement", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{ + "current": "one", + }, + }, + }, + { + name: "single addition", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "current": "one", + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := DiffMaps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DiffMaps() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/otel-allocator/watcher/file.go b/cmd/otel-allocator/watcher/file.go index 262bb71bfb..6adc3c33e9 100644 --- a/cmd/otel-allocator/watcher/file.go +++ b/cmd/otel-allocator/watcher/file.go @@ -5,6 +5,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) diff --git a/cmd/otel-allocator/watcher/main.go b/cmd/otel-allocator/watcher/main.go index dd09ca1b09..8496382e79 100644 --- a/cmd/otel-allocator/watcher/main.go +++ b/cmd/otel-allocator/watcher/main.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index e5a0084416..b2f0bf3d0c 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -681,6 +681,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/docs/api.md b/docs/api.md index 57b8d78a9a..6dcb8b56ec 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2869,6 +2869,13 @@ TargetAllocator indicates a value which determines whether to spawn a target all