Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[targetallocator] PrometheusOperator CRD MVC #653

Merged
merged 26 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4a72ffb
feat(target-allocator): allow custom config file path
secustor Dec 1, 2021
626d002
feat(target-allocator): move CLI config options to config package
secustor Dec 3, 2021
b71d1f2
feat(target-allocator): allow running outside of cluster for debugging
secustor Dec 4, 2021
727493b
introduce meta watcher
secustor Dec 3, 2021
f6a1996
add event source
secustor Dec 9, 2021
f12336d
fix: log panics
secustor Dec 9, 2021
d5c90ea
fix: race condition
secustor Dec 9, 2021
24275d0
fixup! fix: log panics
secustor Dec 10, 2021
0da5fa8
feat: implement promCR retrieval
secustor Dec 11, 2021
8a29f22
feat: functioning
secustor Dec 16, 2021
077c0c4
refactor: some cleanup
secustor Dec 28, 2021
78161df
feat(target-allocator): escape job names in query parameters
secustor Dec 31, 2021
28fc0c9
feat(target-allocator): make prometheusCR lookup optional and allow u…
secustor Dec 31, 2021
bb21889
refactor(target-allocator): improve memory usage and comments
secustor Dec 31, 2021
c18ac42
chore(target-allocator): update PromOperator and Kubernetes deps
secustor Jan 2, 2022
acc07c5
refactor(target-allocator): use exclude instead of replace directive
secustor Jan 2, 2022
8b7d8bf
ci: add Makefile targets for target allocator
secustor Jan 19, 2022
1b249ca
tests: add kuttl tests for PrometheusCR feature of target allocator
secustor Jan 19, 2022
2f5e0b3
docs(targetAllocator): add README.md
secustor Jan 27, 2022
f665de5
fixup CRD docs
secustor Jan 28, 2022
e23a2c5
Merge branch 'main' into implement_prometheus_crd
secustor Feb 9, 2022
0a429a7
fix(Makefile): add missing PHONY tags
secustor Feb 9, 2022
92a55d5
implement change requests
secustor Feb 22, 2022
96cea47
Merge branch 'main' into implement_prometheus_crd
secustor Apr 7, 2022
45a0e84
Merge branch 'main' into implement_prometheus_crd
secustor Apr 9, 2022
7d3f036
go mod tidy and fix linting
secustor Apr 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,21 @@ type OpenTelemetryTargetAllocator struct {
// +optional
Enabled bool `json:"enabled,omitempty"`

// PrometheusCR defines the configuration for Prometheus custom resource retrieval.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What CR is it exactly? could you please document the fully qualified name (group.kind)? (I am playing an ignorant here not knowing anything about p8s)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's referring to the ServiceMonitor and PodMonitor CRDs in the monitoring.coreos.com/v1 API

// +optional
PrometheusCR OpenTelemetryTargetAllocatorPrometheusCR `json:"prometheusCR,omitempty"`

// Image indicates the container image to use for the OpenTelemetry TargetAllocator.
// +optional
Image string `json:"image,omitempty"`
}

type OpenTelemetryTargetAllocatorPrometheusCR struct {
// Enabled indicates whether to use a Prometheus custom resources as target or not.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which p8s CR does it use? Can there be multiple in a namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a MVC all instances of ServiceMonitor and PodMonitor in all namespaces the service account is able to retrieve from the API server

// +optional
Enabled bool `json:"enabled,omitempty"`
}

// OpenTelemetryCollectorStatus defines the observed state of OpenTelemetryCollector.
type OpenTelemetryCollectorStatus struct {
// Replicas is currently not being set and might be removed in the next version.
Expand Down
16 changes: 16 additions & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,15 @@ spec:
description: Image indicates the container image to use for the
OpenTelemetry TargetAllocator.
type: string
prometheusCR:
description: PrometheusCR defines the configuration for Prometheus
custom resource retrieval.
properties:
enabled:
description: Enabled indicates whether to use a Prometheus
custom resources as target or not.
type: boolean
type: object
type: object
tolerations:
description: Toleration to schedule OpenTelemetry Collector pods.
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ WORKDIR /root/
# Copy the pre-built binary file from the previous stage
COPY --from=builder /app/main .

CMD ["./main"]
ENTRYPOINT ["./main"]
5 changes: 3 additions & 2 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// SetCollectors is called when Collectors are added or removed
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("opentelemetry-targetallocator")
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")

allocator.m.Lock()
defer allocator.m.Unlock()
Expand Down Expand Up @@ -132,7 +133,7 @@ func (allocator *Allocator) processWaitingTargets() {
allocator.TargetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", v.JobName)},
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
Expand Down
3 changes: 2 additions & 1 deletion cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocation

import (
"fmt"
"net/url"

"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -43,7 +44,7 @@ func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *All
targetGroupList = append(targetGroupList, targetGroupJSON{Targets: targets, Labels: labelSetMap[targets[0]]})

}
displayData[j.Collector.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", j.JobName, j.Collector.Name), Jobs: targetGroupList}
displayData[j.Collector.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.Collector.Name), Jobs: targetGroupList}

}
}
Expand Down
15 changes: 5 additions & 10 deletions cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ type Client struct {
close chan struct{}
}

func NewClient(logger logr.Logger) (*Client, error) {
config, err := rest.InClusterConfig()
if err != nil {
return &Client{}, err
}

clientset, err := kubernetes.NewForConfig(config)
func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) {
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return &Client{}, err
}
Expand All @@ -50,7 +45,7 @@ func NewClient(logger logr.Logger) (*Client, error) {

func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors []string)) {
collectorMap := map[string]bool{}
log := k.log.WithValues("opentelemetry-targetallocator")
log := k.log.WithValues("component", "opentelemetry-targetallocator")

opts := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
Expand Down Expand Up @@ -83,15 +78,15 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(
return
}
if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" {
log.Info("Collector pod watch event stopped", msg)
log.Info("Collector pod watch event stopped " + msg)
return
}
}
}()
}

func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string {
log := k.log.WithValues("opentelemetry-targetallocator")
log := k.log.WithValues("component", "opentelemetry-targetallocator")
for {
select {
case <-k.close:
Expand Down
66 changes: 61 additions & 5 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,54 @@ package config

import (
"errors"
"flag"
"fmt"
"io/fs"
"io/ioutil"
"path/filepath"
"time"

"github.com/go-logr/logr"
promconfig "github.com/prometheus/prometheus/config"
_ "github.com/prometheus/prometheus/discovery/install"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

// ErrInvalidYAML represents an error in the format of the original YAML configuration file.
var (
ErrInvalidYAML = errors.New("couldn't parse the loadbalancer configuration")
)

const defaultConfigFile string = "/conf/targetallocator.yaml"
const DefaultResyncTime = 5 * time.Minute
const DefaultConfigFilePath string = "/conf/targetallocator.yaml"

type Config struct {
LabelSelector map[string]string `yaml:"label_selector,omitempty"`
Config *promconfig.Config `yaml:"config"`
}

func Load(file string) (Config, error) {
if file == "" {
file = defaultConfigFile
}
type PrometheusCRWatcherConfig struct {
Enabled *bool
}

type CLIConfig struct {
ListenAddr *string
ConfigFilePath *string
ClusterConfig *rest.Config
// KubeConfigFilePath empty if in cluster configuration is in use
KubeConfigFilePath string
RootLogger logr.Logger
PromCRWatcherConf PrometheusCRWatcherConfig
}

func Load(file string) (Config, error) {
var cfg Config
if err := unmarshal(&cfg, file); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: It seems previously the allocator would load a default config file if no file was provided, it seems that was because we would always pass in an empty string in main.go. Should that functionality remain in case there any consumers of this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to users of the TargetAllocator or Devs which import this package?

In the first case the behavior has not changed. It is still loading the same default file as before. This is implement using pflags now. https://github.com/open-telemetry/opentelemetry-operator/pull/653/files#diff-d292758014333553bcd82dfdf75743ef82b0b32f6414e2dc345777fc342e4480R77

return Config{}, err
Expand All @@ -45,3 +68,36 @@ func unmarshal(cfg *Config, configFile string) error {
}
return nil
}

func ParseCLI() (CLIConfig, error) {
opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
cLIConf := CLIConfig{
ListenAddr: pflag.String("listen-addr", ":8080", "The address where this service serves."),
ConfigFilePath: pflag.String("config-file", DefaultConfigFilePath, "The path to the config file."),
PromCRWatcherConf: PrometheusCRWatcherConfig{
Enabled: pflag.Bool("enable-prometheus-cr-watcher", false, "Enable Prometheus CRs as target sources"),
},
}
kubeconfigPath := pflag.String("kubeconfig-path", filepath.Join(homedir.HomeDir(), ".kube", "config"), "absolute path to the KubeconfigPath file")
pflag.Parse()

cLIConf.RootLogger = zap.New(zap.UseFlagOptions(&opts))
klog.SetLogger(cLIConf.RootLogger)
ctrl.SetLogger(cLIConf.RootLogger)

clusterConfig, err := clientcmd.BuildConfigFromFlags("", *kubeconfigPath)
cLIConf.KubeConfigFilePath = *kubeconfigPath
if err != nil {
if _, ok := err.(*fs.PathError); !ok {
return CLIConfig{}, err
}
clusterConfig, err = rest.InClusterConfig()
if err != nil {
return CLIConfig{}, err
}
cLIConf.KubeConfigFilePath = "" // reset as we use in cluster configuration
}
cLIConf.ClusterConfig = clusterConfig
return cLIConf, nil
}
33 changes: 20 additions & 13 deletions cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"github.com/go-kit/log"
"github.com/go-logr/logr"
"github.com/otel-allocator/allocation"
"github.com/otel-allocator/config"
allocatorWatcher "github.com/otel-allocator/watcher"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
)

type Manager struct {
log logr.Logger
manager *discovery.Manager
logger log.Logger
close chan struct{}
log logr.Logger
manager *discovery.Manager
logger log.Logger
close chan struct{}
configsMap map[allocatorWatcher.EventSource]*config.Config
}

func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options ...func(*discovery.Manager)) *Manager {
Expand All @@ -27,24 +29,29 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
}
}()
return &Manager{
log: log,
manager: manager,
logger: logger,
close: make(chan struct{}),
log: log,
manager: manager,
logger: logger,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource]*config.Config),
}
}

func (m *Manager) ApplyConfig(cfg config.Config) error {
func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error {
m.configsMap[source] = cfg

discoveryCfg := make(map[string]discovery.Configs)

for _, scrapeConfig := range cfg.Config.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
for _, value := range m.configsMap {
for _, scrapeConfig := range value.ScrapeConfigs {
discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs
}
}
return m.manager.ApplyConfig(discoveryCfg)
}

func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) {
log := m.log.WithValues("opentelemetry-targetallocator")
log := m.log.WithValues("component", "opentelemetry-targetallocator")

go func() {
for {
Expand Down
Loading