Skip to content

Commit

Permalink
Development (#29)
Browse files Browse the repository at this point in the history
* Implement Event Debouncer
  • Loading branch information
fjogeleit authored Apr 24, 2021
1 parent be30efd commit 8ffd35a
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 86 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog

## 1.2.3
## 1.3.1

* Debounce reconcile modification events for 10s to prevent resending violations

## 1.3.0

* New Helm Configuration
* `crdVersion` changes the version of the PolicyReporter CRD - v1alpha1 is the current default
Expand Down
4 changes: 2 additions & 2 deletions charts/policy-reporter/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ description: |
It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord
type: application
version: 1.3.0
appVersion: 1.3.0
version: 1.3.1
appVersion: 1.3.1

dependencies:
- name: monitoring
Expand Down
2 changes: 1 addition & 1 deletion charts/policy-reporter/values.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
image:
repository: fjogeleit/policy-reporter
pullPolicy: IfNotPresent
tag: 1.3.0
tag: 1.3.1

imagePullSecrets: []

Expand Down
18 changes: 9 additions & 9 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -74,21 +73,22 @@ func newRunCMD() *cobra.Command {
rClient.RegisterPolicyResultWatcher(resolver.SkipExistingOnStartup())
}

g := new(errgroup.Group)
errorChan := make(chan error)

if c.API.Enabled {
g.Go(resolver.APIServer().Start)
go func() { errorChan <- resolver.APIServer().Start() }()
}

g.Go(cpClient.StartWatching)
g.Go(pClient.StartWatching)
g.Go(func() error {
go func() { errorChan <- cpClient.StartWatching() }()
go func() { errorChan <- pClient.StartWatching() }()

go func() {
http.Handle("/metrics", promhttp.Handler())

return http.ListenAndServe(":2112", nil)
})
errorChan <- http.ListenAndServe(":2112", nil)
}()

return g.Wait()
return <-errorChan
},
}

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ module github.com/fjogeleit/policy-reporter
go 1.15

require (
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/magiconair/properties v1.8.4 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/prometheus/client_golang v1.9.0
Expand Down
41 changes: 18 additions & 23 deletions go.sum

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions pkg/config/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ var testConfig = &config.Config{
SkipExisting: true,
MinimumPriority: "debug",
},
UI: config.UI{
Host: "http://localhost:8080",
SkipExisting: true,
MinimumPriority: "debug",
},
}

func Test_ResolveTarget(t *testing.T) {
Expand Down Expand Up @@ -102,8 +107,8 @@ func Test_ResolveTargets(t *testing.T) {
resolver := config.NewResolver(testConfig, nil)

clients := resolver.TargetClients()
if count := len(clients); count != 5 {
t.Errorf("Expected 5 Clients, got %d", count)
if count := len(clients); count != 6 {
t.Errorf("Expected 6 Clients, got %d", count)
}
}

Expand Down
105 changes: 91 additions & 14 deletions pkg/kubernetes/cluster_policy_report_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,60 @@ import (
"k8s.io/apimachinery/pkg/watch"
)

type clusterPolicyReportEvent struct {
report report.ClusterPolicyReport
eventType watch.EventType
}

type clusterPolicyReportEventDebouncer struct {
events map[string]clusterPolicyReportEvent
channel chan<- clusterPolicyReportEvent
mutx *sync.Mutex
}

func (d *clusterPolicyReportEventDebouncer) Add(e clusterPolicyReportEvent) {
_, ok := d.events[e.report.GetIdentifier()]
if e.eventType != watch.Modified && ok {
d.mutx.Lock()
delete(d.events, e.report.GetIdentifier())
d.mutx.Unlock()
}

if e.eventType != watch.Modified {
d.channel <- e
return
}

if len(e.report.Results) == 0 && !ok {
d.mutx.Lock()
d.events[e.report.GetIdentifier()] = e
d.mutx.Unlock()

go func() {
time.Sleep(10 * time.Second)

d.mutx.Lock()
if event, ok := d.events[e.report.GetIdentifier()]; ok {
d.channel <- event
delete(d.events, e.report.GetIdentifier())
}
d.mutx.Unlock()
}()

return
}

if ok {
d.mutx.Lock()
d.events[e.report.GetIdentifier()] = e
d.mutx.Unlock()

return
}

d.channel <- e
}

type clusterPolicyReportClient struct {
policyAPI PolicyReportAdapter
store *report.ClusterPolicyReportStore
Expand All @@ -20,7 +74,7 @@ type clusterPolicyReportClient struct {
startUp time.Time
skipExisting bool
started bool
modifyHash map[string]uint64
modifyHash map[string]string
}

func (c *clusterPolicyReportClient) RegisterCallback(cb report.ClusterPolicyReportCallback) {
Expand Down Expand Up @@ -70,28 +124,51 @@ func (c *clusterPolicyReportClient) StartWatching() error {
}

c.started = true
reportChan := make(chan clusterPolicyReportEvent)

errorChan := make(chan error)
go func() {
for {
result, err := c.policyAPI.WatchClusterPolicyReports()
if err != nil {
c.started = false
errorChan <- err
}

for {
result, err := c.policyAPI.WatchClusterPolicyReports()
if err != nil {
c.started = false
return err
}
debouncer := clusterPolicyReportEventDebouncer{
events: make(map[string]clusterPolicyReportEvent, 0),
mutx: new(sync.Mutex),
channel: reportChan,
}

for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
c.executeClusterPolicyReportHandler(result.Type, c.mapper.MapClusterPolicyReport(item.Object))
for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
report := c.mapper.MapClusterPolicyReport(item.Object)
debouncer.Add(clusterPolicyReportEvent{report, result.Type})
}
}

// skip existing results when the watcher restarts
c.skipExisting = true
}
}()

// skip existing results when the watcher restarts
c.skipExisting = true
}
go func() {
for event := range reportChan {
c.executeClusterPolicyReportHandler(event.eventType, event.report)
}

errorChan <- errors.New("Report Channel closed")
}()

return <-errorChan
}

func (c *clusterPolicyReportClient) executeClusterPolicyReportHandler(e watch.EventType, cpr report.ClusterPolicyReport) {
log.Printf("[INFO] New Event %s for %s", e, cpr.Name)
opr, ok := c.store.Get(cpr.GetIdentifier())
if !ok {
log.Printf("[INFO] No previous Report for %s found", cpr.Name)
opr = report.ClusterPolicyReport{}
}

Expand Down Expand Up @@ -204,6 +281,6 @@ func NewClusterPolicyReportClient(client PolicyReportAdapter, store *report.Clus
store: store,
mapper: mapper,
startUp: startUp,
modifyHash: make(map[string]uint64),
modifyHash: make(map[string]string),
}
}
61 changes: 61 additions & 0 deletions pkg/kubernetes/cluster_policy_report_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,67 @@ func Test_WatchDeleteEvent(t *testing.T) {
}
}

func Test_WatchDelayEvents(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()

client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
report.NewClusterPolicyReportStore(),
NewMapper(k8sCMClient),
time.Now(),
)

client.RegisterPolicyResultWatcher(false)

wg := sync.WaitGroup{}
wg.Add(2)

client.RegisterCallback(func(e watch.EventType, r report.ClusterPolicyReport, o report.ClusterPolicyReport) {
wg.Done()
})

go client.StartWatching()

fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Modify(&unstructured.Unstructured{Object: minClusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Modify(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Delete(&unstructured.Unstructured{Object: clusterPolicyMap})

wg.Wait()
}

func Test_WatchDelayEventsWithoutClearEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()

client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
report.NewClusterPolicyReportStore(),
NewMapper(k8sCMClient),
time.Now(),
)

client.RegisterPolicyResultWatcher(false)

wg := sync.WaitGroup{}
wg.Add(3)

client.RegisterCallback(func(e watch.EventType, r report.ClusterPolicyReport, o report.ClusterPolicyReport) {
wg.Done()
})

go client.StartWatching()

fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Modify(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Delete(&unstructured.Unstructured{Object: clusterPolicyMap})

wg.Wait()
}

func Test_WatchModifiedEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
Expand Down
Loading

0 comments on commit 8ffd35a

Please sign in to comment.