From b91e4a44e082b95f8a4d4e5797eb40c0261418e5 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 25 Apr 2024 14:42:47 -0700 Subject: [PATCH] =?UTF-8?q?Fix=20type=20assertion=20when=20an=20event=20is?= =?UTF-8?q?=20missed=20while=20connection=20to=20apiser=E2=80=A6=20(#208)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ver was severed ## Overview Noticed a panic while debugging an unrelated code with the following stack: ``` E0418 21:43:47.622243 1 runtime.go:79] Observed a panic: &runtime.TypeAssertionError{_interface:(*abi.Type)(0x2b033a0), concrete:(*abi.Type)(0x2d55540), asserted:(*abi.Type)(0x313f000), missingMethod:""} (interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event) goroutine 571 [running]: k8s.io/apimachinery/pkg/util/runtime.logPanic({0x2bf7960?, 0xc041e49200}) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/runtime/runtime.go:75 +0x85 k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0xc041fed1d0?}) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/runtime/runtime.go:49 +0x6b panic({0x2bf7960?, 0xc041e49200?}) /usr/local/go/src/runtime/panic.go:914 +0x21f github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s.(*eventWatcher).OnDelete(0xc041e27f80?, {0x2d55540?, 0xc041fed8c0?}) /go/src/github.com/unionai/cloud/flyte/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go:75 +0x228 k8s.io/client-go/tools/cache.(*processorListener).run.func1() /root/.cache/gocache/k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:979 +0x9f k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1(0x30?) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:226 +0x33 k8s.io/apimachinery/pkg/util/wait.BackoffUntil(0xc041e95f38?, {0x3891740, 0xc041e78210}, 0x1, 0xc041e6e6c0) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:227 +0xaf k8s.io/apimachinery/pkg/util/wait.JitterUntil(0x0?, 0x3b9aca00, 0x0, 0x0?, 0x0?) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:204 +0x7f k8s.io/apimachinery/pkg/util/wait.Until(...) /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/backoff.go:161 k8s.io/client-go/tools/cache.(*processorListener).run(0xc000a8a360) /root/.cache/gocache/k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:968 +0x69 k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1() /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/wait.go:72 +0x4f created by k8s.io/apimachinery/pkg/util/wait.(*Group).Start in goroutine 563 /root/.cache/gocache/k8s.io/apimachinery@v0.28.2/pkg/util/wait/wait.go:70 +0x73 panic: interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event [recovered] panic: interface conversion: interface {} is cache.DeletedFinalStateUnknown, not *v1.Event ``` ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [X] To be upstreamed ## Jira Issue https://unionai.atlassian.net/browse/ ## Checklist * [ ] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- .../nodes/task/k8s/event_watcher.go | 14 ++++++++++++- .../nodes/task/k8s/event_watcher_test.go | 20 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go b/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go index e53de83e10..13ebbc4cc5 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher.go @@ -72,7 +72,19 @@ func (e *eventWatcher) OnUpdate(_, newObj interface{}) { } func (e *eventWatcher) OnDelete(obj interface{}) { - event := obj.(*eventsv1.Event) + event, casted := obj.(*eventsv1.Event) + if !casted { + unknown, casted := obj.(cache.DeletedFinalStateUnknown) + if !casted { + logger.Warnf(context.Background(), "Unknown object type [%T] in OnDelete", obj) + } else { + logger.Warnf(context.Background(), "Deleted object of unknown key [%v] type [%T] in OnDelete", + unknown.Key, unknown.Obj) + } + + return + } + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{}) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher_test.go index d3ffbcc5b9..53932eef01 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/event_watcher_test.go @@ -1,6 +1,7 @@ package k8s import ( + "k8s.io/client-go/tools/cache" "testing" "time" @@ -143,6 +144,25 @@ func TestEventWatcher_OnDelete(t *testing.T) { v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) assert.Nil(t, v) }) + + t.Run("bad object type", func(t *testing.T) { + ew.OnDelete(cache.DeletedFinalStateUnknown{ + Key: "key", + Obj: &eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) + assert.Nil(t, v) + }) } func TestEventWatcher_List(t *testing.T) {