Skip to content

Commit

Permalink
[target-allocator] restart pod watcher when no event is found (#1237)
Browse files Browse the repository at this point in the history
* naive fix

* unit test for close channel

* update unit tests, timeout option still not working as expected

* gofmt and removed unused block

* fix more lint errors

* more lint

* add timeout to context instead

* gofmt

* move logic for starting watch to own function

* gofmt

* add timoutSeconds to test struct

* remove repeated logger declarations

* add chloggen
  • Loading branch information
moh-osman3 authored Dec 19, 2022
1 parent 49de005 commit a89d0d6
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 22 deletions.
17 changes: 17 additions & 0 deletions .chloggen/1028-restart-podwatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: Target Allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: This PR restarts pod watcher on no event and adds unit tests for timeout and closed watcher channel

# One or more tracking issues related to the change
issues:
- 1028

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
47 changes: 26 additions & 21 deletions cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector
import (
"context"
"os"
"strconv"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,22 +57,21 @@ func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) {
}

return &Client{
log: logger,
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
close: make(chan struct{}),
}, nil
}

func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*allocation.Collector)) error {
collectorMap := map[string]*allocation.Collector{}
log := k.log.WithValues("component", "opentelemetry-targetallocator")

opts := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
}
pods, err := k.k8sClient.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
log.Error(err, "Pod failure")
k.log.Error(err, "Pod failure")
os.Exit(1)
}
for i := range pods.Items {
Expand All @@ -86,38 +84,48 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(
fn(collectorMap)

for {
watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts)
if err != nil {
log.Error(err, "unable to create collector pod watcher")
return err
}
log.Info("Successfully started a collector pod watcher")
if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" && msg != "no event" {
log.Info("Collector pod watch event stopped " + msg)
if !k.restartWatch(ctx, opts, collectorMap, fn) {
return nil
}
}
}

func (k *Client) restartWatch(ctx context.Context, opts metav1.ListOptions, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) bool {
// add timeout to the context before calling Watch
ctx, cancel := context.WithTimeout(ctx, watcherTimeout)
defer cancel()
watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts)
if err != nil {
k.log.Error(err, "unable to create collector pod watcher")
return false
}
k.log.Info("Successfully started a collector pod watcher")
if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" {
k.log.Info("Collector pod watch event stopped " + msg)
return false
}

return true
}

func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string {
log := k.log.WithValues("component", "opentelemetry-targetallocator")
for {
collectorsDiscovered.Set(float64(len(collectorMap)))
select {
case <-k.close:
return "kubernetes client closed"
case <-ctx.Done():
return "context done"
return ""
case event, ok := <-c:
if !ok {
log.Info(strconv.FormatBool(ok))
return "no event"
k.log.Info("No event found. Restarting watch routine")
return ""
}

pod, ok := event.Object.(*v1.Pod)
if !ok {
log.Info(strconv.FormatBool(ok))
return "no event"
k.log.Info("No pod found in event Object. Restarting watch routine")
return ""
}

switch event.Type { //nolint:exhaustive
Expand All @@ -127,9 +135,6 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap
delete(collectorMap, pod.Name)
}
fn(collectorMap)
case <-time.After(watcherTimeout):
log.Info("Restarting watch routine")
return ""
}
}
}
Expand Down
56 changes: 55 additions & 1 deletion cmd/otel-allocator/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/watch"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -50,7 +51,6 @@ func getTestClient() (Client, watch.Interface) {
opts := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
}

watcher, err := kubeClient.k8sClient.CoreV1().Pods("test-ns").Watch(context.Background(), opts)
if err != nil {
fmt.Printf("failed to setup a Collector Pod watcher: %v", err)
Expand Down Expand Up @@ -165,3 +165,57 @@ func Test_runWatch(t *testing.T) {
})
}
}

// this tests runWatch in the case of watcher channel closing and watcher timing out.
func Test_closeChannel(t *testing.T) {
tests := []struct {
description string
isCloseChannel bool
timeoutSeconds time.Duration
}{
{
// event is triggered by channel closing.
description: "close_channel",
isCloseChannel: true,
// channel should be closed before this timeout occurs
timeoutSeconds: 10 * time.Second,
},
{
// event triggered by timeout.
description: "watcher_timeout",
isCloseChannel: false,
timeoutSeconds: 0 * time.Second,
},
}

for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
kubeClient, watcher := getTestClient()

defer func() {
close(kubeClient.close)
watcher.Stop()
}()
var wg sync.WaitGroup
wg.Add(1)
terminated := false

go func(watcher watch.Interface) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), tc.timeoutSeconds)
defer cancel()
if msg := runWatch(ctx, &kubeClient, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) {}); msg != "" {
terminated = true
return
}
}(watcher)

if tc.isCloseChannel {
// stop pod watcher to trigger event.
watcher.Stop()
}
wg.Wait()
assert.False(t, terminated)
})
}
}

0 comments on commit a89d0d6

Please sign in to comment.