-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[target-allocator] Populate store assets (authorization information) for Prometheus CR watcher #1710
[target-allocator] Populate store assets (authorization information) for Prometheus CR watcher #1710
Changes from 5 commits
84577bd
cb0d5a6
bb4bd44
69af4e0
d17ed35
85a2d42
d66f497
80aa9a5
940c46d
e8f59b0
5266457
cf91b15
5edae81
d4ce3f2
c2ce16b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) | ||
component: target allocator | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Populate credentials for Prometheus CR (service and pod monitor) scrape configs. | ||
|
||
# One or more tracking issues related to the change | ||
issues: [1669] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,13 @@ | |
package watcher | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
allocatorconfig "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-logr/logr" | ||
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" | ||
"github.com/prometheus-operator/prometheus-operator/pkg/assets" | ||
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" | ||
|
@@ -30,32 +32,28 @@ import ( | |
"gopkg.in/yaml.v2" | ||
v1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/cache" | ||
) | ||
|
||
func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) { | ||
func NewPrometheusCRWatcher(logger logr.Logger, cfg allocatorconfig.Config, cliConfig allocatorconfig.CLIConfig) (*PrometheusCRWatcher, error) { | ||
mClient, err := monitoringclient.NewForConfig(cliConfig.ClusterConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces | ||
|
||
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName)) | ||
clientset, err := kubernetes.NewForConfig(cliConfig.ClusterConfig) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName)) | ||
factory := informers.NewMonitoringInformerFactories(map[string]struct{}{v1.NamespaceAll: {}}, map[string]struct{}{}, mClient, allocatorconfig.DefaultResyncTime, nil) //TODO decide what strategy to use regarding namespaces | ||
|
||
monitoringInformers, err := getInformers(factory) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
monitoringInformers := map[string]*informers.ForResource{ | ||
monitoringv1.ServiceMonitorName: serviceMonitorInformers, | ||
monitoringv1.PodMonitorName: podMonitorInformers, | ||
} | ||
|
||
generator, err := prometheus.NewConfigGenerator(log.NewNopLogger(), &monitoringv1.Prometheus{}, true) // TODO replace Nop? | ||
if err != nil { | ||
return nil, err | ||
|
@@ -66,7 +64,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi | |
podMonSelector := getSelector(cfg.PodMonitorSelector) | ||
|
||
return &PrometheusCRWatcher{ | ||
logger: logger, | ||
kubeMonitoringClient: mClient, | ||
k8sClient: clientset, | ||
informers: monitoringInformers, | ||
stopChannel: make(chan struct{}), | ||
configGenerator: generator, | ||
|
@@ -77,7 +77,9 @@ func NewPrometheusCRWatcher(cfg allocatorconfig.Config, cliConfig allocatorconfi | |
} | ||
|
||
type PrometheusCRWatcher struct { | ||
kubeMonitoringClient *monitoringclient.Clientset | ||
logger logr.Logger | ||
kubeMonitoringClient monitoringclient.Interface | ||
k8sClient kubernetes.Interface | ||
informers map[string]*informers.ForResource | ||
stopChannel chan struct{} | ||
configGenerator *prometheus.ConfigGenerator | ||
|
@@ -88,13 +90,30 @@ type PrometheusCRWatcher struct { | |
} | ||
|
||
func getSelector(s map[string]string) labels.Selector { | ||
sel := labels.NewSelector() | ||
if s == nil { | ||
return sel | ||
return labels.NewSelector() | ||
} | ||
return labels.SelectorFromSet(s) | ||
} | ||
|
||
// getInformers returns a map of informers for the given resources. | ||
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) { | ||
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return map[string]*informers.ForResource{ | ||
monitoringv1.ServiceMonitorName: serviceMonitorInformers, | ||
monitoringv1.PodMonitorName: podMonitorInformers, | ||
}, nil | ||
} | ||
|
||
// Watch wrapped informers and wait for an initial sync. | ||
func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error { | ||
event := Event{ | ||
|
@@ -133,12 +152,13 @@ func (w *PrometheusCRWatcher) Close() error { | |
return nil | ||
} | ||
|
||
func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { | ||
func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Config, error) { | ||
store := assets.NewStore(w.k8sClient.CoreV1(), w.k8sClient.CoreV1()) | ||
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor) | ||
|
||
smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) { | ||
monitor := sm.(*monitoringv1.ServiceMonitor) | ||
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor) | ||
w.addStoreAssetsForServiceMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.Endpoints, store) | ||
serviceMonitorInstances[key] = monitor | ||
}) | ||
if smRetrieveErr != nil { | ||
|
@@ -149,19 +169,13 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { | |
pmRetrieveErr := w.informers[monitoringv1.PodMonitorName].ListAll(w.podMonitorSelector, func(pm interface{}) { | ||
monitor := pm.(*monitoringv1.PodMonitor) | ||
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(monitor) | ||
w.addStoreAssetsForPodMonitor(ctx, monitor.Name, monitor.Namespace, monitor.Spec.PodMetricsEndpoints, store) | ||
podMonitorInstances[key] = monitor | ||
}) | ||
if pmRetrieveErr != nil { | ||
return nil, pmRetrieveErr | ||
} | ||
|
||
store := assets.Store{ | ||
TLSAssets: nil, | ||
TokenAssets: nil, | ||
BasicAuthAssets: nil, | ||
OAuth2Assets: nil, | ||
SigV4Assets: nil, | ||
} | ||
// TODO: We should make these durations configurable | ||
prom := &monitoringv1.Prometheus{ | ||
Spec: monitoringv1.PrometheusSpec{ | ||
|
@@ -171,7 +185,18 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { | |
}, | ||
}, | ||
} | ||
generatedConfig, err := w.configGenerator.Generate(prom, serviceMonitorInstances, podMonitorInstances, map[string]*monitoringv1.Probe{}, &store, nil, nil, nil, []string{}) | ||
|
||
generatedConfig, err := w.configGenerator.Generate( | ||
prom, | ||
serviceMonitorInstances, | ||
podMonitorInstances, | ||
map[string]*monitoringv1.Probe{}, | ||
store, | ||
nil, | ||
nil, | ||
nil, | ||
matej-g marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[]string{}, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -194,3 +219,89 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { | |
} | ||
return promCfg, nil | ||
} | ||
|
||
// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store, | ||
// based on the service monitor and endpoints specs. | ||
// This code borrows from | ||
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L73. | ||
func (w *PrometheusCRWatcher) addStoreAssetsForServiceMonitor( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like the method you linked is a public method, are we unable to use it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah maybe because we don't want to make a whole resource selector. Interested in your perspective here :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was considering use those public methods, but eventually decided to borrow some of the code, because 1) as you said we would need to build the whole resource selector; 2) the methods for monitors seem to do extra stuff (whole selection and validation of monitors), whereas here we wanted to just populate the store. However, now that you point this out, and as I noticed these methods were recently exported, it might be a good opportunity to replace this whole part of the watcher logic directly with the resource selector methods. We could get that extra validation etc. for free, but perhaps first I'd ensure those methods will result in correct selection of monitors for our purposes. What do you think about doing this as a follow up work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's a great idea for some followup – i love anything that allows us to delete code :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do it 👍 I'll open an issue for this to ensure it won't go unaddressed. |
||
ctx context.Context, | ||
smName, smNamespace string, | ||
endps []monitoringv1.Endpoint, | ||
store *assets.Store, | ||
) { | ||
var err error | ||
for i, endp := range endps { | ||
objKey := fmt.Sprintf("serviceMonitor/%s/%s/%d", smNamespace, smName, i) | ||
|
||
if err = store.AddBearerToken(ctx, smNamespace, endp.BearerTokenSecret, objKey); err != nil { | ||
break | ||
} | ||
|
||
if err = store.AddBasicAuth(ctx, smNamespace, endp.BasicAuth, objKey); err != nil { | ||
break | ||
} | ||
|
||
if endp.TLSConfig != nil { | ||
if err = store.AddTLSConfig(ctx, smNamespace, endp.TLSConfig); err != nil { | ||
break | ||
} | ||
} | ||
|
||
if err = store.AddOAuth2(ctx, smNamespace, endp.OAuth2, objKey); err != nil { | ||
break | ||
} | ||
|
||
smAuthKey := fmt.Sprintf("serviceMonitor/auth/%s/%s/%d", smNamespace, smName, i) | ||
if err = store.AddSafeAuthorizationCredentials(ctx, smNamespace, endp.Authorization, smAuthKey); err != nil { | ||
break | ||
} | ||
} | ||
|
||
if err != nil { | ||
w.logger.Error(err, "Failed to obtain credentials for a ServiceMonitor", "serviceMonitor", smName) | ||
} | ||
} | ||
|
||
// addStoreAssetsForServiceMonitor adds authentication / authorization related information to the assets store, | ||
// based on the service monitor and pod metrics endpoints specs. | ||
// This code borrows from | ||
// https://github.com/prometheus-operator/prometheus-operator/blob/06b5c4189f3f72737766d86103d049115c3aff48/pkg/prometheus/resource_selector.go#L314. | ||
func (w *PrometheusCRWatcher) addStoreAssetsForPodMonitor( | ||
ctx context.Context, | ||
pmName, pmNamespace string, | ||
podMetricsEndps []monitoringv1.PodMetricsEndpoint, | ||
store *assets.Store, | ||
) { | ||
var err error | ||
for i, endp := range podMetricsEndps { | ||
objKey := fmt.Sprintf("podMonitor/%s/%s/%d", pmNamespace, pmName, i) | ||
|
||
if err = store.AddBearerToken(ctx, pmNamespace, endp.BearerTokenSecret, objKey); err != nil { | ||
break | ||
} | ||
|
||
if err = store.AddBasicAuth(ctx, pmNamespace, endp.BasicAuth, objKey); err != nil { | ||
break | ||
} | ||
|
||
if endp.TLSConfig != nil { | ||
if err = store.AddSafeTLSConfig(ctx, pmNamespace, &endp.TLSConfig.SafeTLSConfig); err != nil { | ||
break | ||
} | ||
} | ||
|
||
if err = store.AddOAuth2(ctx, pmNamespace, endp.OAuth2, objKey); err != nil { | ||
break | ||
} | ||
|
||
smAuthKey := fmt.Sprintf("podMonitor/auth/%s/%s/%d", pmNamespace, pmName, i) | ||
if err = store.AddSafeAuthorizationCredentials(ctx, pmNamespace, endp.Authorization, smAuthKey); err != nil { | ||
break | ||
} | ||
} | ||
|
||
if err != nil { | ||
w.logger.Error(err, "Failed to obtain credentials for a PodMonitor", "podMonitor", pmName) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: we should open an issue for adding in the new scrape config CRD and maybe the probe CRD too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed 👍 I'll open an issue for this.