diff --git a/cmd/kured/main.go b/cmd/kured/main.go index 1ab4b8424..96e229f7d 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -224,18 +224,6 @@ func main() { log.Warnf(err.Error()) } - log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) - if lockTTL > 0 { - log.Infof("Lock TTL set, lock will expire after: %v", lockTTL) - } else { - log.Info("Lock TTL not set, lock will remain until being released") - } - if lockReleaseDelay > 0 { - log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay) - } else { - log.Info("Lock release delay not set, lock will be released immediately after rebooting") - } - log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName) // This should be printed from blocker list instead of only blocking pod selectors @@ -278,7 +266,30 @@ func main() { checker = checkers.NewFileRebootChecker(rebootSentinelFile) } - go rebootAsRequired(nodeID, rebooter, checker, window, lockTTL, lockReleaseDelay) + config, err := rest.InClusterConfig() + if err != nil { + log.Fatal(err) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatal(err) + } + + log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) + if lockTTL > 0 { + log.Infof("Lock TTL set, lock will expire after: %v", lockTTL) + } else { + log.Info("Lock TTL not set, lock will remain until being released") + } + if lockReleaseDelay > 0 { + log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay) + } else { + log.Info("Lock release delay not set, lock will be released immediately after rebooting") + } + lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay) + + go rebootAsRequired(nodeID, rebooter, checker, window, lock, client) go maintainRebootRequiredMetric(nodeID, checker) http.Handle("/metrics", promhttp.Handler()) @@ -400,68 +411,6 @@ func stripQuotes(str string) string { return str } -func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}, isMultiLock bool) bool { - var holding bool - var err error - if isMultiLock { - holding, err = lock.TestMultiple() - } else { - holding, err = lock.Test(metadata) - } - if err != nil { - log.Fatalf("Error testing lock: %v", err) - } - if holding { - log.Infof("Holding lock") - } - return holding -} - -func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}, TTL time.Duration, maxOwners int) bool { - var holding bool - var holder string - var err error - if maxOwners > 1 { - var holders []string - holding, holders, err = lock.AcquireMultiple(metadata, TTL, maxOwners) - holder = strings.Join(holders, ",") - } else { - holding, holder, err = lock.Acquire(metadata, TTL) - } - switch { - case err != nil: - log.Fatalf("Error acquiring lock: %v", err) - return false - case !holding: - log.Warnf("Lock already held: %v", holder) - return false - default: - log.Infof("Acquired reboot lock") - return true - } -} - -func throttle(releaseDelay time.Duration) { - if releaseDelay > 0 { - log.Infof("Delaying lock release by %v", releaseDelay) - time.Sleep(releaseDelay) - } -} - -func release(lock *daemonsetlock.DaemonSetLock, isMultiLock bool) { - log.Infof("Releasing lock") - - var err error - if isMultiLock { - err = lock.ReleaseMultiple() - } else { - err = lock.Release() - } - if err != nil { - log.Fatalf("Error releasing lock: %v", err) - } -} - func drain(client *kubernetes.Clientset, node *v1.Node) error { nodename := node.GetName() @@ -537,11 +486,6 @@ func maintainRebootRequiredMetric(nodeID string, checker checkers.Checker) { } } -// nodeMeta is used to remember information across reboots -type nodeMeta struct { - Unschedulable bool `json:"unschedulable"` -} - func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error { node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { @@ -616,30 +560,23 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri } } -func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) { - config, err := rest.InClusterConfig() - if err != nil { - log.Fatal(err) - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - log.Fatal(err) - } - - lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation) +func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) { - nodeMeta := nodeMeta{} source := rand.NewSource(time.Now().UnixNano()) tick := delaytick.New(source, 1*time.Minute) for range tick { - if holding(lock, &nodeMeta, concurrency > 1) { + holding, lockData, err := lock.Holding() + if err != nil { + log.Errorf("Error testing lock: %v", err) + } + if holding { node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { log.Errorf("Error retrieving node object via k8s API: %v", err) continue } - if !nodeMeta.Unschedulable { + + if !lockData.Metadata.Unschedulable { err = uncordon(client, node) if err != nil { log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err) @@ -665,8 +602,12 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } } } - throttle(releaseDelay) - release(lock, concurrency > 1) + + err = lock.Release() + if err != nil { + log.Errorf("Error releasing lock, will retry: %v", err) + continue + } break } else { break @@ -705,7 +646,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. if err != nil { log.Fatalf("Error retrieving node object via k8s API: %v", err) } - nodeMeta.Unschedulable = node.Spec.Unschedulable + + nodeMeta := daemonsetlock.NodeMeta{Unschedulable: node.Spec.Unschedulable} var timeNowString string if annotateNodes { @@ -738,17 +680,32 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } log.Infof("Reboot required%s", rebootRequiredBlockCondition) - if !holding(lock, &nodeMeta, concurrency > 1) && !acquire(lock, &nodeMeta, TTL, concurrency) { - // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. - preferNoScheduleTaint.Enable() - continue + holding, _, err := lock.Holding() + if err != nil { + log.Errorf("Error testing lock: %v", err) + } + + if !holding { + acquired, holder, err := lock.Acquire(nodeMeta) + if err != nil { + log.Errorf("Error acquiring lock: %v", err) + } + if !acquired { + log.Warnf("Lock already held: %v", holder) + // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. + preferNoScheduleTaint.Enable() + continue + } } err = drain(client, node) if err != nil { if !forceReboot { log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err) - release(lock, concurrency > 1) + err = lock.Release() + if err != nil { + log.Errorf("Error releasing lock: %v", err) + } log.Infof("Performing a best-effort uncordon after failed cordon and drain") uncordon(client, node) continue diff --git a/pkg/daemonsetlock/daemonsetlock.go b/pkg/daemonsetlock/daemonsetlock.go index 2d8949b5a..f9386bb06 100644 --- a/pkg/daemonsetlock/daemonsetlock.go +++ b/pkg/daemonsetlock/daemonsetlock.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + log "github.com/sirupsen/logrus" + "strings" "time" v1 "k8s.io/api/apps/v1" @@ -18,6 +20,21 @@ const ( k8sAPICallRetryTimeout = 5 * time.Minute // How long to wait until we determine that the k8s API is definitively unavailable ) +type Lock interface { + Acquire(NodeMeta) (bool, string, error) + Release() error + Holding() (bool, LockAnnotationValue, error) +} + +type GenericLock struct { + TTL time.Duration + releaseDelay time.Duration +} + +type NodeMeta struct { + Unschedulable bool `json:"unschedulable"` +} + // DaemonSetLock holds all necessary information to do actions // on the kured ds which holds lock info through annotations. type DaemonSetLock struct { @@ -28,25 +45,92 @@ type DaemonSetLock struct { annotation string } -type lockAnnotationValue struct { +// DaemonSetSingleLock holds all necessary information to do actions +// on the kured ds which holds lock info through annotations. +type DaemonSetSingleLock struct { + GenericLock + DaemonSetLock +} + +// DaemonSetMultiLock holds all necessary information to do actions +// on the kured ds which holds lock info through annotations, valid +// for multiple nodes +type DaemonSetMultiLock struct { + GenericLock + DaemonSetLock + maxOwners int +} + +// LockAnnotationValue contains the lock data, +// which allows persistence across reboots, particularily recording if the +// node was already unschedulable before kured reboot. +// To be modified when using another type of lock storage. +type LockAnnotationValue struct { NodeID string `json:"nodeID"` - Metadata interface{} `json:"metadata,omitempty"` + Metadata NodeMeta `json:"metadata,omitempty"` Created time.Time `json:"created"` TTL time.Duration `json:"TTL"` } type multiLockAnnotationValue struct { MaxOwners int `json:"maxOwners"` - LockAnnotations []lockAnnotationValue `json:"locks"` + LockAnnotations []LockAnnotationValue `json:"locks"` } // New creates a daemonsetLock object containing the necessary data for follow up k8s requests -func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string) *DaemonSetLock { - return &DaemonSetLock{client, nodeID, namespace, name, annotation} +func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string, TTL time.Duration, concurrency int, lockReleaseDelay time.Duration) Lock { + if concurrency > 1 { + return &DaemonSetMultiLock{ + GenericLock: GenericLock{ + TTL: TTL, + releaseDelay: lockReleaseDelay, + }, + DaemonSetLock: DaemonSetLock{ + client: client, + nodeID: nodeID, + namespace: namespace, + name: name, + annotation: annotation, + }, + maxOwners: concurrency, + } + } else { + return &DaemonSetSingleLock{ + GenericLock: GenericLock{ + TTL: TTL, + releaseDelay: lockReleaseDelay, + }, + DaemonSetLock: DaemonSetLock{ + client: client, + nodeID: nodeID, + namespace: namespace, + name: name, + annotation: annotation, + }, + } + } +} + +// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client +func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) { + var ds *v1.DaemonSet + var lastError error + err := wait.PollImmediate(sleep, timeout, func() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil { + return false, nil + } + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError) + } + return ds, nil } // Acquire attempts to annotate the kured daemonset with lock info from instantiated DaemonSetLock using client-go -func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool, string, error) { +func (dsl *DaemonSetSingleLock) Acquire(nodeMetadata NodeMeta) (bool, string, error) { for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { @@ -55,7 +139,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { - value := lockAnnotationValue{} + value := LockAnnotationValue{} if err := json.Unmarshal([]byte(valueString), &value); err != nil { return false, "", err } @@ -68,7 +152,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool if ds.ObjectMeta.Annotations == nil { ds.ObjectMeta.Annotations = make(map[string]string) } - value := lockAnnotationValue{NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), TTL: TTL} + value := LockAnnotationValue{NodeID: dsl.nodeID, Metadata: nodeMetadata, Created: time.Now().UTC(), TTL: dsl.TTL} valueBytes, err := json.Marshal(&value) if err != nil { return false, "", err @@ -89,49 +173,78 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool } } -// AcquireMultiple creates and annotates the daemonset with a multiple owner lock -func (dsl *DaemonSetLock) AcquireMultiple(metadata interface{}, TTL time.Duration, maxOwners int) (bool, []string, error) { +// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go +func (dsl *DaemonSetSingleLock) Holding() (bool, LockAnnotationValue, error) { + var lockData LockAnnotationValue + ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) + if err != nil { + return false, lockData, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + } + + valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] + if exists { + value := LockAnnotationValue{} + if err := json.Unmarshal([]byte(valueString), &value); err != nil { + return false, lockData, err + } + + if !ttlExpired(value.Created, value.TTL) { + return value.NodeID == dsl.nodeID, value, nil + } + } + + return false, lockData, nil +} + +// Release attempts to remove the lock data from the kured ds annotations using client-go +func (dsl *DaemonSetSingleLock) Release() error { + if dsl.releaseDelay > 0 { + log.Infof("Waiting %v before releasing lock", dsl.releaseDelay) + time.Sleep(dsl.releaseDelay) + } for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { - return false, []string{}, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + return fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) } - annotation := multiLockAnnotationValue{} valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { - if err := json.Unmarshal([]byte(valueString), &annotation); err != nil { - return false, []string{}, fmt.Errorf("error getting multi lock: %w", err) + value := LockAnnotationValue{} + if err := json.Unmarshal([]byte(valueString), &value); err != nil { + return err } - } - lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, metadata, TTL, maxOwners) - if !lockPossible { - return false, nodeIDsFromMultiLock(newAnnotation), nil + if value.NodeID != dsl.nodeID { + return fmt.Errorf("Not lock holder: %v", value.NodeID) + } + } else { + return fmt.Errorf("Lock not held") } - if ds.ObjectMeta.Annotations == nil { - ds.ObjectMeta.Annotations = make(map[string]string) - } - newAnnotationBytes, err := json.Marshal(&newAnnotation) - if err != nil { - return false, []string{}, fmt.Errorf("error marshalling new annotation lock: %w", err) - } - ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes) + delete(ds.ObjectMeta.Annotations, dsl.annotation) - _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{}) + _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}) if err != nil { if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { + // Something else updated the resource between us reading and writing - try again soon time.Sleep(time.Second) continue } else { - return false, []string{}, fmt.Errorf("error updating daemonset with multi lock: %w", err) + return err } } - return true, nodeIDsFromMultiLock(newAnnotation), nil + return nil } } +func ttlExpired(created time.Time, ttl time.Duration) bool { + if ttl > 0 && time.Since(created) >= ttl { + return true + } + return false +} + func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string { nodeIDs := make([]string, 0, len(annotation.LockAnnotations)) for _, nodeLock := range annotation.LockAnnotations { @@ -140,7 +253,7 @@ func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string { return nodeIDs } -func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata interface{}, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) { +func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata NodeMeta, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) { newAnnotation := multiLockAnnotationValue{MaxOwners: maxOwners} freeSpace := false if annotation.LockAnnotations == nil || len(annotation.LockAnnotations) < maxOwners { @@ -162,7 +275,7 @@ func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue if freeSpace { newAnnotation.LockAnnotations = append( newAnnotation.LockAnnotations, - lockAnnotationValue{ + LockAnnotationValue{ NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), @@ -175,92 +288,80 @@ func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue return false, multiLockAnnotationValue{} } -// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go -func (dsl *DaemonSetLock) Test(metadata interface{}) (bool, error) { - ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) - if err != nil { - return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) - } +// Acquire creates and annotates the daemonset with a multiple owner lock +func (dsl *DaemonSetMultiLock) Acquire(nodeMetaData NodeMeta) (bool, string, error) { + for { + ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) + if err != nil { + return false, "", fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + } - valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] - if exists { - value := lockAnnotationValue{Metadata: metadata} - if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return false, err + annotation := multiLockAnnotationValue{} + valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] + if exists { + if err := json.Unmarshal([]byte(valueString), &annotation); err != nil { + return false, "", fmt.Errorf("error getting multi lock: %w", err) + } } - if !ttlExpired(value.Created, value.TTL) { - return value.NodeID == dsl.nodeID, nil + lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, nodeMetaData, dsl.TTL, dsl.maxOwners) + if !lockPossible { + return false, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil } - } - return false, nil + if ds.ObjectMeta.Annotations == nil { + ds.ObjectMeta.Annotations = make(map[string]string) + } + newAnnotationBytes, err := json.Marshal(&newAnnotation) + if err != nil { + return false, "", fmt.Errorf("error marshalling new annotation lock: %w", err) + } + ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes) + + _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{}) + if err != nil { + if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { + time.Sleep(time.Second) + continue + } else { + return false, "", fmt.Errorf("error updating daemonset with multi lock: %w", err) + } + } + return true, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil + } } // TestMultiple attempts to check the kured daemonset lock status for multi locks -func (dsl *DaemonSetLock) TestMultiple() (bool, error) { +func (dsl *DaemonSetMultiLock) Holding() (bool, LockAnnotationValue, error) { + var lockdata LockAnnotationValue ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { - return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) + return false, lockdata, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) } valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] if exists { value := multiLockAnnotationValue{} if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return false, err + return false, lockdata, err } for _, nodeLock := range value.LockAnnotations { if nodeLock.NodeID == dsl.nodeID && !ttlExpired(nodeLock.Created, nodeLock.TTL) { - return true, nil + return true, nodeLock, nil } } } - return false, nil + return false, lockdata, nil } -// Release attempts to remove the lock data from the kured ds annotations using client-go -func (dsl *DaemonSetLock) Release() error { - for { - ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) - if err != nil { - return fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err) - } - - valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation] - if exists { - value := lockAnnotationValue{} - if err := json.Unmarshal([]byte(valueString), &value); err != nil { - return err - } - - if value.NodeID != dsl.nodeID { - return fmt.Errorf("Not lock holder: %v", value.NodeID) - } - } else { - return fmt.Errorf("Lock not held") - } - - delete(ds.ObjectMeta.Annotations, dsl.annotation) - - _, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{}) - if err != nil { - if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { - // Something else updated the resource between us reading and writing - try again soon - time.Sleep(time.Second) - continue - } else { - return err - } - } - return nil +// Release attempts to remove the lock data for a single node from the multi node annotation +func (dsl *DaemonSetMultiLock) Release() error { + if dsl.releaseDelay > 0 { + log.Infof("Waiting %v before releasing lock", dsl.releaseDelay) + time.Sleep(dsl.releaseDelay) } -} - -// ReleaseMultiple attempts to remove the lock data from the kured ds annotations using client-go -func (dsl *DaemonSetLock) ReleaseMultiple() error { for { ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout) if err != nil { @@ -307,28 +408,3 @@ func (dsl *DaemonSetLock) ReleaseMultiple() error { return nil } } - -// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client -func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) { - var ds *v1.DaemonSet - var lastError error - err := wait.PollImmediate(sleep, timeout, func() (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil { - return false, nil - } - return true, nil - }) - if err != nil { - return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError) - } - return ds, nil -} - -func ttlExpired(created time.Time, ttl time.Duration) bool { - if ttl > 0 && time.Since(created) >= ttl { - return true - } - return false -} diff --git a/pkg/daemonsetlock/daemonsetlock_test.go b/pkg/daemonsetlock/daemonsetlock_test.go index b21bcc23c..f68a81e28 100644 --- a/pkg/daemonsetlock/daemonsetlock_test.go +++ b/pkg/daemonsetlock/daemonsetlock_test.go @@ -66,7 +66,7 @@ func TestCanAcquireMultiple(t *testing.T) { current: multiLockAnnotationValue{}, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, }, }, @@ -80,13 +80,13 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node2Name}, }, }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, {NodeID: node2Name}, }, @@ -101,7 +101,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Minute), @@ -116,7 +116,7 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node2Name}, {NodeID: node3Name}, }, @@ -131,7 +131,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Hour), @@ -146,7 +146,7 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, {NodeID: node3Name}, }, @@ -161,7 +161,7 @@ func TestCanAcquireMultiple(t *testing.T) { maxOwners: 2, current: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ { NodeID: node2Name, Created: time.Now().UTC().Add(-1 * time.Hour), @@ -176,17 +176,17 @@ func TestCanAcquireMultiple(t *testing.T) { }, desired: multiLockAnnotationValue{ MaxOwners: 2, - LockAnnotations: []lockAnnotationValue{ + LockAnnotations: []LockAnnotationValue{ {NodeID: node1Name}, }, }, lockPossible: true, }, } - + nm := NodeMeta{Unschedulable: false} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, struct{}{}, time.Minute, testCase.maxOwners) + lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, nm, time.Minute, testCase.maxOwners) if lockPossible != testCase.lockPossible { t.Fatalf( "unexpected result for lock possible (got %t expected %t new annotation %v",