From c66c1892d7b3151b97e127199f8012e11748996c Mon Sep 17 00:00:00 2001 From: Yongkun Gui Date: Mon, 25 Jun 2018 16:18:02 -0700 Subject: [PATCH] Fix the issue where Shutdown doesn't shutdown taskqueue Originally, PeriodicTaskQueue has 2 loops: - wait.Until managed by stopCh - for loop in worker() managed by workqueue Shutdown only exits the second loop. Failing to close stopCh before calling Shutdown will cause panic because of double close workDone channel. --- cmd/glbc/main.go | 2 +- pkg/controller/controller.go | 4 ++-- pkg/controller/node.go | 14 ++++------- pkg/firewalls/controller.go | 4 ++-- pkg/utils/taskqueue.go | 45 +++++++++++++++--------------------- pkg/utils/taskqueue_test.go | 10 ++++---- 6 files changed, 33 insertions(+), 46 deletions(-) diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index ed51c705fc..77cd0ec0bd 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -192,7 +192,7 @@ func runControllers(ctx *ingctx.ControllerContext) { go app.RunSIGTERMHandler(lbc, flags.F.DeleteAllOnQuit) - go fwc.Run(stopCh) + go fwc.Run() glog.V(0).Infof("firewall controller started") ctx.Start(stopCh) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 379e551887..f759fe53db 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -226,8 +226,8 @@ func (lbc *LoadBalancerController) Init() { // Run starts the loadbalancer controller. func (lbc *LoadBalancerController) Run() { glog.Infof("Starting loadbalancer controller") - go lbc.ingQueue.Run(time.Second, lbc.stopCh) - lbc.nodes.Run(lbc.stopCh) + go lbc.ingQueue.Run() + go lbc.nodes.Run() <-lbc.stopCh glog.Infof("Shutting down Loadbalancer Controller") diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 366169a17d..309042045e 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -17,8 +17,6 @@ limitations under the License. package controller import ( - "time" - apiv1 "k8s.io/api/core/v1" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -27,10 +25,6 @@ import ( "k8s.io/ingress-gce/pkg/utils" ) -const ( - nodeUpdatePeriod = 1 * time.Second -) - // NodeController synchronizes the state of the nodes to the unmanaged instance // groups. type NodeController struct { @@ -66,12 +60,12 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No return c } -// Run a go routine to process updates for the controller. -func (c *NodeController) Run(stopCh chan struct{}) { - go c.queue.Run(nodeUpdatePeriod, stopCh) +// Run a goroutine to process updates for the controller. +func (c *NodeController) Run() { + c.queue.Run() } -// Run a go routine to process updates for the controller. +// Shutdown shuts down the goroutine that processes node updates. func (c *NodeController) Shutdown() { c.queue.Shutdown() } diff --git a/pkg/firewalls/controller.go b/pkg/firewalls/controller.go index 3e028287cd..43899dc549 100644 --- a/pkg/firewalls/controller.go +++ b/pkg/firewalls/controller.go @@ -129,9 +129,9 @@ func (fwc *FirewallController) ToSvcPorts(ings *extensions.IngressList) []utils. return knownPorts } -func (fwc *FirewallController) Run(stopCh chan struct{}) { +func (fwc *FirewallController) Run() { defer fwc.shutdown() - fwc.queue.Run(time.Second, stopCh) + fwc.queue.Run() } // This should only be called when the process is being terminated. diff --git a/pkg/utils/taskqueue.go b/pkg/utils/taskqueue.go index 87ce5f3ef8..3f570866fe 100644 --- a/pkg/utils/taskqueue.go +++ b/pkg/utils/taskqueue.go @@ -17,10 +17,7 @@ limitations under the License. package utils import ( - "time" - "github.com/golang/glog" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -31,7 +28,7 @@ var ( // TaskQueue is a rate limited operation queue. type TaskQueue interface { - Run(period time.Duration, stopCh <-chan struct{}) + Run() Enqueue(objs ...interface{}) Shutdown() } @@ -53,9 +50,23 @@ type PeriodicTaskQueue struct { } // Run the task queue. This will block until the Shutdown() has been called. -// TODO: seems redundant to both have stopCh and Shutdown(). -func (t *PeriodicTaskQueue) Run(period time.Duration, stopCh <-chan struct{}) { - wait.Until(t.worker, period, stopCh) +func (t *PeriodicTaskQueue) Run() { + for { + key, quit := t.queue.Get() + if quit { + close(t.workerDone) + return + } + glog.V(4).Infof("Syncing %v (%v)", key, t.resource) + if err := t.sync(key.(string)); err != nil { + glog.Errorf("Requeuing %q due to error: %v (%v)", key, err, t.resource) + t.queue.AddRateLimited(key) + } else { + glog.V(4).Infof("Finished syncing %v", key) + t.queue.Forget(key) + } + t.queue.Done(key) + } } // Enqueue one or more keys to the work queue. @@ -78,26 +89,6 @@ func (t *PeriodicTaskQueue) Shutdown() { <-t.workerDone } -// worker processes work in the queue through sync. -func (t *PeriodicTaskQueue) worker() { - for { - key, quit := t.queue.Get() - if quit { - close(t.workerDone) - return - } - glog.V(4).Infof("Syncing %v (%v)", key, t.resource) - if err := t.sync(key.(string)); err != nil { - glog.Errorf("Requeuing %q due to error: %v (%v)", key, err, t.resource) - t.queue.AddRateLimited(key) - } else { - glog.V(4).Infof("Finished syncing %v", key) - t.queue.Forget(key) - } - t.queue.Done(key) - } -} - // NewPeriodicTaskQueue creates a new task queue with the default rate limiter. func NewPeriodicTaskQueue(resource string, syncFn func(string) error) *PeriodicTaskQueue { rl := workqueue.DefaultControllerRateLimiter() diff --git a/pkg/utils/taskqueue_test.go b/pkg/utils/taskqueue_test.go index 7ee1ea26df..2f1cf35fcf 100644 --- a/pkg/utils/taskqueue_test.go +++ b/pkg/utils/taskqueue_test.go @@ -20,7 +20,6 @@ import ( "errors" "reflect" "testing" - "time" "k8s.io/client-go/tools/cache" ) @@ -28,7 +27,6 @@ import ( func TestPeriodicTaskQueue(t *testing.T) { t.Parallel() synced := map[string]bool{} - stopCh := make(chan struct{}) doneCh := make(chan struct{}, 1) var tq TaskQueue @@ -39,21 +37,25 @@ func TestPeriodicTaskQueue(t *testing.T) { return errors.New("injected error") case "stop": doneCh <- struct{}{} + case "more": + t.Error("synced after TaskQueue.Shutdown()") } return nil } tq = NewPeriodicTaskQueue("test", sync) - go tq.Run(time.Microsecond, stopCh) + go tq.Run() tq.Enqueue(cache.ExplicitKey("a")) tq.Enqueue(cache.ExplicitKey("b")) tq.Enqueue(cache.ExplicitKey("err")) tq.Enqueue(cache.ExplicitKey("stop")) <-doneCh - close(stopCh) tq.Shutdown() + // Enqueue after Shutdown isn't going to be synced. + tq.Enqueue(cache.ExplicitKey("more")) + expected := map[string]bool{ "a": true, "b": true,