Skip to content

Commit

Permalink
Fix type assertion when an event is missed while connection to apiser… (
Browse files Browse the repository at this point in the history
#208)

…ver was severed

## Overview
Noticed a panic while debugging an unrelated code with the following stack:
```
E0418 21:43:47.622243       1 runtime.go:79] Observed a panic: &runtime.TypeAssertionError{_interface:(*abi.Type)(0x2b033a0), concrete:(*abi.Type)(0x2d55540), asserted:(*abi.Type)(0x313f000), missingMethod:""} (interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event)
goroutine 571 [running]:
k8s.io/apimachinery/pkg/util/runtime.logPanic({0x2bf7960?, 0xc041e49200})
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/runtime/runtime.go:75 +0x85
k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0xc041fed1d0?})
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/runtime/runtime.go:49 +0x6b
panic({0x2bf7960?, 0xc041e49200?})
        /usr/local/go/src/runtime/panic.go:914 +0x21f
github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s.(*eventWatcher).OnDelete(0xc041e27f80?, {0x2d55540?, 0xc041fed8c0?})
        /go/src/github.com/unionai/cloud/flyte/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go:75 +0x228
k8s.io/client-go/tools/cache.(*processorListener).run.func1()
        /root/.cache/gocache/k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:979 +0x9f
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1(0x30?)
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:226 +0x33
k8s.io/apimachinery/pkg/util/wait.BackoffUntil(0xc041e95f38?, {0x3891740, 0xc041e78210}, 0x1, 0xc041e6e6c0)
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:227 +0xaf
k8s.io/apimachinery/pkg/util/wait.JitterUntil(0x0?, 0x3b9aca00, 0x0, 0x0?, 0x0?)
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:204 +0x7f
k8s.io/apimachinery/pkg/util/wait.Until(...)
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:161
k8s.io/client-go/tools/cache.(*processorListener).run(0xc000a8a360)
        /root/.cache/gocache/k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:968 +0x69
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/wait.go:72 +0x4f
created by k8s.io/apimachinery/pkg/util/wait.(*Group).Start in goroutine 563
        /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/wait.go:70 +0x73
panic: interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event [recovered]
        panic: interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event
```

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [X] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/<project-number>

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
EngHabu committed Apr 25, 2024
1 parent 60d2dd4 commit b91e4a4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
14 changes: 13 additions & 1 deletion flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,19 @@ func (e *eventWatcher) OnUpdate(_, newObj interface{}) {
}

func (e *eventWatcher) OnDelete(obj interface{}) {
event := obj.(*eventsv1.Event)
event, casted := obj.(*eventsv1.Event)
if !casted {
unknown, casted := obj.(cache.DeletedFinalStateUnknown)
if !casted {
logger.Warnf(context.Background(), "Unknown object type [%T] in OnDelete", obj)
} else {
logger.Warnf(context.Background(), "Deleted object of unknown key [%v] type [%T] in OnDelete",
unknown.Key, unknown.Obj)
}

return
}

objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name}
eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name}
v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{})
Expand Down
20 changes: 20 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/k8s/event_watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package k8s

import (
"k8s.io/client-go/tools/cache"
"testing"
"time"

Expand Down Expand Up @@ -143,6 +144,25 @@ func TestEventWatcher_OnDelete(t *testing.T) {
v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"})
assert.Nil(t, v)
})

t.Run("bad object type", func(t *testing.T) {
ew.OnDelete(cache.DeletedFinalStateUnknown{
Key: "key",
Obj: &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Namespace: "eventns3",
Name: "eventname3",
},
Regarding: corev1.ObjectReference{
Namespace: "ns3",
Name: "name3",
},
},
})

v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"})
assert.Nil(t, v)
})
}

func TestEventWatcher_List(t *testing.T) {
Expand Down

0 comments on commit b91e4a4

Please sign in to comment.