Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[target-allocator] Populate store assets (authorization information) for Prometheus CR watcher #1710

Merged
merged 15 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/1710-prometheus-cr-scrape-config-credentials.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Populate credentials for Prometheus CR (service and pod monitor) scrape configs.

# One or more tracking issues related to the change
issues: [1669]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
6 changes: 6 additions & 0 deletions cmd/otel-allocator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ the `targetAllocator:` part of the OpenTelemetryCollector CR.
**Note**: The Collector part of this same CR *also* has a serviceAccount key which only affects the collector and *not*
the TargetAllocator.

### Service / Pod monitor endpoint credentials

If your service or pod monitor endpoints require credentials or other supported form of authentication (bearer token, basic auth, OAuth2 etc.), you need to ensure that the collector has access to this information. Due to some limitations in how the endpoints configuration is handled, target allocator currently does **not** support credentials provided via secrets. It is only possible to provide credentials in a file (for more details see issue https://github.com/open-telemetry/opentelemetry-operator/issues/1669).

In order to ensure your endpoints can be scraped, your collector instance needs to have the particular secret mounted as a file at the correct path.


# Design

Expand Down
4 changes: 2 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
defer close(interrupts)

if *cliConf.PromCRWatcherConf.Enabled {
promWatcher, err = allocatorWatcher.NewPrometheusCRWatcher(cfg, cliConf)
promWatcher, err = allocatorWatcher.NewPrometheusCRWatcher(setupLog.WithName("prometheus-cr-watcher"), cfg, cliConf)
if err != nil {
setupLog.Error(err, "Can't start the prometheus watcher")
os.Exit(1)
Expand Down Expand Up @@ -193,7 +193,7 @@ func main() {
select {
case event := <-eventChan:
eventsMetric.WithLabelValues(event.Source.String()).Inc()
loadConfig, err := event.Watcher.LoadConfig()
loadConfig, err := event.Watcher.LoadConfig(ctx)
if err != nil {
setupLog.Error(err, "Unable to load configuration")
continue
Expand Down
3 changes: 2 additions & 1 deletion cmd/otel-allocator/watcher/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package watcher

import (
"context"
"path/filepath"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -48,7 +49,7 @@ func NewFileWatcher(logger logr.Logger, config config.CLIConfig) (*FileWatcher,
}, nil
}

func (f *FileWatcher) LoadConfig() (*promconfig.Config, error) {
func (f *FileWatcher) LoadConfig(_ context.Context) (*promconfig.Config, error) {
cfg, err := config.Load(f.configFilePath)
if err != nil {
f.logger.Error(err, "Unable to load configuration")
Expand Down
146 changes: 123 additions & 23 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package watcher

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
promv1alpha1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1alpha1"
"github.com/prometheus-operator/prometheus-operator/pkg/assets"
Expand All @@ -29,34 +31,30 @@ import (
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config"
)

func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) {
mClient, err := monitoringclient.NewForConfig(cliConfig.ClusterConfig)
if err != nil {
return nil, err
}

factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces

serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
clientset, err := kubernetes.NewForConfig(cliConfig.ClusterConfig)
if err != nil {
return nil, err
}

podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces

monitoringInformers, err := getInformers(factory)
if err != nil {
return nil, err
}

monitoringInformers := map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}

// TODO: We should make these durations configurable
prom := &monitoringv1.Prometheus{
Spec: monitoringv1.PrometheusSpec{
Expand All @@ -76,7 +74,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi
podMonSelector := getSelector(cfg.PodMonitorSelector)

return &PrometheusCRWatcher{
logger: logger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
informers: monitoringInformers,
stopChannel: make(chan struct{}),
configGenerator: generator,
Expand All @@ -87,7 +87,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi
}

type PrometheusCRWatcher struct {
kubeMonitoringClient *monitoringclient.Clientset
logger logr.Logger
kubeMonitoringClient monitoringclient.Interface
k8sClient kubernetes.Interface
informers map[string]*informers.ForResource
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
Expand All @@ -98,13 +100,30 @@ type PrometheusCRWatcher struct {
}

func getSelector(s map[string]string) labels.Selector {
sel := labels.NewSelector()
if s == nil {
return sel
return labels.NewSelector()
}
return labels.SelectorFromSet(s)
}

// getInformers returns a map of informers for the given resources.
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: we should open an issue for adding in the new scrape config CRD and maybe the probe CRD too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed 👍 I'll open an issue for this.

serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
if err != nil {
return nil, err
}

podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
if err != nil {
return nil, err
}

return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
}, nil
}

// Watch wrapped informers and wait for an initial sync.
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
event := Event{
Expand Down Expand Up @@ -143,12 +162,13 @@ func (w *PrometheusCRWatcher) Close() error {
return nil
}

func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) {
store := assets.NewStore(w.k8sClient.CoreV1(), w.k8sClient.CoreV1())
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)

smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
monitor := sm.(*monitoringv1.ServiceMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForServiceMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.Endpoints, store)
serviceMonitorInstances[key] = monitor
})
if smRetrieveErr != nil {
Expand All @@ -159,19 +179,13 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) {
monitor := pm.(*monitoringv1.PodMonitor)
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor)
w.addStoreAssetsForPodMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.PodMetricsEndpoints, store)
podMonitorInstances[key] = monitor
})
if pmRetrieveErr != nil {
return nil, pmRetrieveErr
}

store := assets.Store{
TLSAssets: nil,
TokenAssets: nil,
BasicAuthAssets: nil,
OAuth2Assets: nil,
SigV4Assets: nil,
}
generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
"30s",
"",
Expand All @@ -184,7 +198,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
podMonitorInstances,
map[string]*monitoringv1.Probe{},
map[string]*promv1alpha1.ScrapeConfig{},
&store,
store,
nil,
nil,
nil,
Expand All @@ -211,3 +225,89 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
}
return promCfg, nil
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L73.
func (w *PrometheusCRWatcher) addStoreAssetsForServiceMonitor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the method you linked is a public method, are we unable to use it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah maybe because we don't want to make a whole resource selector. Interested in your perspective here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering use those public methods, but eventually decided to borrow some of the code, because 1) as you said we would need to build the whole resource selector; 2) the methods for monitors seem to do extra stuff (whole selection and validation of monitors), whereas here we wanted to just populate the store.

However, now that you point this out, and as I noticed these methods were recently exported, it might be a good opportunity to replace this whole part of the watcher logic directly with the resource selector methods. We could get that extra validation etc. for free, but perhaps first I'd ensure those methods will result in correct selection of monitors for our purposes.

What do you think about doing this as a follow up work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a great idea for some followup – i love anything that allows us to delete code :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it 👍 I'll open an issue for this to ensure it won't go unaddressed.

ctx context.Context,
smName, smNamespace string,
endps []monitoringv1.Endpoint,
store *assets.Store,
) {
var err error
for i, endp := range endps {
objKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", smNamespace, smName, i)

if err = store.AddBearerToken(ctx, smNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, smNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddTLSConfig(ctx, smNamespace, endp.TLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, smNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("serviceMonitor/auth/%s/%s/%d", smNamespace, smName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a ServiceMonitor", "serviceMonitor", smName)
}
}

// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store,
// based on the service monitor and pod metrics endpoints specs.
// This code borrows from
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L314.
func (w *PrometheusCRWatcher) addStoreAssetsForPodMonitor(
ctx context.Context,
pmName, pmNamespace string,
podMetricsEndps []monitoringv1.PodMetricsEndpoint,
store *assets.Store,
) {
var err error
for i, endp := range podMetricsEndps {
objKey := fmt.Sprintf("podMonitor/%s/%s/%d", pmNamespace, pmName, i)

if err = store.AddBearerToken(ctx, pmNamespace, endp.BearerTokenSecret, objKey); err != nil {
break
}

if err = store.AddBasicAuth(ctx, pmNamespace, endp.BasicAuth, objKey); err != nil {
break
}

if endp.TLSConfig != nil {
if err = store.AddSafeTLSConfig(ctx, pmNamespace, &endp.TLSConfig.SafeTLSConfig); err != nil {
break
}
}

if err = store.AddOAuth2(ctx, pmNamespace, endp.OAuth2, objKey); err != nil {
break
}

smAuthKey := fmt.Sprintf("podMonitor/auth/%s/%s/%d", pmNamespace, pmName, i)
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, smAuthKey); err != nil {
break
}
}

if err != nil {
w.logger.Error(err, "Failed to obtain credentials for a PodMonitor", "podMonitor", pmName)
}
}
Loading