Skip to content

Commit

Permalink
test: use node informer instead of raw watch
Browse files Browse the repository at this point in the history
This should improve watch reliability, as it was failing on channel
being closed.

Fixes #10039

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Dec 25, 2024
1 parent 5dc15e8 commit 27233cf
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 64 deletions.
23 changes: 6 additions & 17 deletions internal/integration/api/node-annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -68,21 +66,15 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {

suite.T().Logf("updating annotations on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

// set two new annotation
suite.setNodeAnnotations(node, map[string]string{
"talos.dev/ann1": "value1",
"talos.dev/ann2": "value2",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "value1",
"talos.dev/ann2": "value2",
})
Expand All @@ -92,28 +84,25 @@ func (suite *NodeAnnotationsSuite) testUpdate(node string) {
"talos.dev/ann1": "foo",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "foo",
"talos.dev/ann2": "",
})

// remove all Talos annoations
suite.setNodeAnnotations(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/ann1": "",
"talos.dev/ann2": "",
})
}

func (suite *NodeAnnotationsSuite) waitUntil(watcher watch.Interface, expectedAnnotations map[string]string) {
func (suite *NodeAnnotationsSuite) waitUntil(watchCh <-chan *v1.Node, expectedAnnotations map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("annotations %#v", k8sNode.Annotations)

for k, v := range expectedAnnotations {
Expand Down
41 changes: 11 additions & 30 deletions internal/integration/api/node-labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/siderolabs/go-pointer"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -63,22 +61,14 @@ func (suite *NodeLabelsSuite) TestUpdateWorker() {
suite.testUpdate(node, false)
}

const metadataKeyName = "metadata.name="

// testUpdate cycles through a set of node label updates reverting the change in the end.
func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
k8sNode, err := suite.GetK8sNodeByInternalIP(suite.ctx, node)
suite.Require().NoError(err)

suite.T().Logf("updating labels on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

const stdLabelName = "kubernetes.io/hostname"

Expand All @@ -90,7 +80,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test2": "value2",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "value1",
"talos.dev/test2": "value2",
}, isControlplane)
Expand All @@ -100,7 +90,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
"talos.dev/test1": "foo",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo",
"talos.dev/test2": "",
}, isControlplane)
Expand All @@ -112,7 +102,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
stdLabelName: "bar",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "foo2",
stdLabelName: stdLabelValue,
}, isControlplane)
Expand All @@ -121,7 +111,7 @@ func (suite *NodeLabelsSuite) testUpdate(node string, isControlplane bool) {
// remove all Talos Labels
suite.setNodeLabels(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
"talos.dev/test1": "",
"talos.dev/test2": "",
}, isControlplane)
Expand All @@ -136,34 +126,25 @@ func (suite *NodeLabelsSuite) TestAllowScheduling() {

suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

defer watcher.Stop()

suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)

suite.setAllowScheduling(node, true)

suite.waitUntil(watcher, nil, false)
suite.waitUntil(watchCh, nil, false)

suite.setAllowScheduling(node, false)

suite.waitUntil(watcher, nil, true)
suite.waitUntil(watchCh, nil, true)
}

//nolint:gocyclo
func (suite *NodeLabelsSuite) waitUntil(watcher watch.Interface, expectedLabels map[string]string, taintNoSchedule bool) {
func (suite *NodeLabelsSuite) waitUntil(watchCh <-chan *v1.Node, expectedLabels map[string]string, taintNoSchedule bool) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)

for k, v := range expectedLabels {
Expand Down
23 changes: 6 additions & 17 deletions internal/integration/api/node-taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/gen/xtesting/must"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/siderolabs/talos/internal/integration/base"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -67,21 +65,15 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {

suite.T().Logf("updating taints on node %q (%q)", node, k8sNode.Name)

watcher, err := suite.Clientset.CoreV1().Nodes().Watch(suite.ctx, metav1.ListOptions{
FieldSelector: metadataKeyName + k8sNode.Name,
Watch: true,
})
suite.Require().NoError(err)

defer watcher.Stop()
watchCh := suite.SetupNodeInformer(suite.ctx, k8sNode.Name)

// set two new taints
suite.setNodeTaints(node, map[string]string{
"talos.dev/test1": "value1:NoSchedule",
"talos.dev/test2": "NoSchedule",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
"talos.dev/test2": "NoSchedule",
Expand All @@ -92,27 +84,24 @@ func (suite *NodeTaintsSuite) testUpdate(node string) {
"talos.dev/test1": "value1:NoSchedule",
})

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
"talos.dev/test1": "value1:NoSchedule",
})

// remove all taints
suite.setNodeTaints(node, nil)

suite.waitUntil(watcher, map[string]string{
suite.waitUntil(watchCh, map[string]string{
constants.LabelNodeRoleControlPlane: "NoSchedule",
})
}

func (suite *NodeTaintsSuite) waitUntil(watcher watch.Interface, expectedTaints map[string]string) {
func (suite *NodeTaintsSuite) waitUntil(watchCh <-chan *v1.Node, expectedTaints map[string]string) {
outer:
for {
select {
case ev := <-watcher.ResultChan():
k8sNode, ok := ev.Object.(*v1.Node)
suite.Require().Truef(ok, "watch event is not of type v1.Node but was %T", ev.Object)

case k8sNode := <-watchCh:
suite.T().Logf("labels %#v, taints %#v", k8sNode.Labels, k8sNode.Spec.Taints)

taints := xslices.ToMap(k8sNode.Spec.Taints, func(t v1.Taint) (string, string) {
Expand Down
45 changes: 45 additions & 0 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-retry/retry"
Expand All @@ -37,6 +38,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
Expand Down Expand Up @@ -804,3 +806,46 @@ func (k8sSuite *K8sSuite) ToUnstructured(obj runtime.Object) unstructured.Unstru

return u
}

// SetupNodeInformer sets up a node informer for the given node name.
func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string) <-chan *corev1.Node {
const metadataKeyName = "metadata.name="

watchCh := make(chan *corev1.Node)

informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}))

nodeInformer := informerFactory.Core().V1().Nodes().Informer()
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}

channel.SendWithContext(ctx, watchCh, node)
},
UpdateFunc: func(_, obj any) {
node, ok := obj.(*corev1.Node)
if !ok {
return
}

channel.SendWithContext(ctx, watchCh, node)
},
})
k8sSuite.Require().NoError(err)

informerFactory.Start(ctx.Done())
k8sSuite.T().Cleanup(informerFactory.Shutdown)

result := informerFactory.WaitForCacheSync(ctx.Done())

for k, v := range result {
k8sSuite.Assert().True(v, "informer %q failed to sync", k.String())
}

return watchCh
}

0 comments on commit 27233cf

Please sign in to comment.