Skip to content

Commit

Permalink
add timeout to context instead
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Dec 7, 2022
1 parent d26b2b4 commit 286ae82
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
13 changes: 4 additions & 9 deletions cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Client struct {
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
timeoutSeconds int64
}

func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) {
Expand All @@ -61,19 +60,15 @@ func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) {
log: logger,
k8sClient: clientset,
close: make(chan struct{}),
timeoutSeconds: int64(watcherTimeout / time.Second),
}, nil
}

func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*allocation.Collector)) {
collectorMap := map[string]*allocation.Collector{}
log := k.log.WithValues("component", "opentelemetry-targetallocator")

// convert watcherTimeout to an integer in seconds.
interval := int64(watcherTimeout / time.Second)
opts := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
TimeoutSeconds: &interval,
}
pods, err := k.k8sClient.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
Expand All @@ -91,6 +86,9 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(

go func() {
for {
// add timeout to the context before calling Watch
ctx, cancel := context.WithTimeout(ctx, watcherTimeout)
defer cancel()
watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts)
if err != nil {
log.Error(err, "unable to create collector pod watcher")
Expand All @@ -113,7 +111,7 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap
case <-k.close:
return "kubernetes client closed"
case <-ctx.Done():
return "context done"
return ""
case event, ok := <-c:
if !ok {
log.Info("No event found. Restarting watch routine")
Expand All @@ -133,9 +131,6 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap
delete(collectorMap, pod.Name)
}
fn(collectorMap)
case <-time.After(time.Duration(k.timeoutSeconds) * time.Second):
log.Info("Restarting watch routine")
return ""
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions cmd/otel-allocator/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"sync"
"testing"
"time"

"k8s.io/apimachinery/pkg/watch"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -36,12 +37,10 @@ import (
var logger = logf.Log.WithName("collector-unit-tests")

func getTestClient() (Client, watch.Interface) {
interval := int64(10)
kubeClient := Client{
k8sClient: fake.NewSimpleClientset(),
close: make(chan struct{}),
log: logger,
timeoutSeconds: interval,
}

labelMap := map[string]string{
Expand All @@ -51,9 +50,6 @@ func getTestClient() (Client, watch.Interface) {

opts := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
// this timeout doesn't seem to be having an effect.
// i.e. no event is triggered after this duration.
TimeoutSeconds: &kubeClient.timeoutSeconds,
}
watcher, err := kubeClient.k8sClient.CoreV1().Pods("test-ns").Watch(context.Background(), opts)
if err != nil {
Expand Down Expand Up @@ -202,7 +198,9 @@ func Test_closeChannel(t *testing.T) {

go func(watcher watch.Interface) {
defer wg.Done()
if msg := runWatch(context.Background(), &kubeClient, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) {}); msg != "" {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10 * time.Second))
defer cancel()
if msg := runWatch(ctx, &kubeClient, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) {}); msg != "" {
terminated = true
return
}
Expand Down

0 comments on commit 286ae82

Please sign in to comment.