Skip to content

Commit

Permalink
Merge pull request #365 from anfernee/fix-queue
Browse files Browse the repository at this point in the history
Fix the issue where Shutdown doesn't shutdown taskqueue
  • Loading branch information
k8s-ci-robot authored Oct 30, 2018
2 parents 5a27ec5 + c66c189 commit fdfb8b8
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func runControllers(ctx *ingctx.ControllerContext) {

go app.RunSIGTERMHandler(lbc, flags.F.DeleteAllOnQuit)

go fwc.Run(stopCh)
go fwc.Run()
glog.V(0).Infof("firewall controller started")

ctx.Start(stopCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ func (lbc *LoadBalancerController) Init() {
// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
go lbc.ingQueue.Run(time.Second, lbc.stopCh)
lbc.nodes.Run(lbc.stopCh)
go lbc.ingQueue.Run()
go lbc.nodes.Run()

<-lbc.stopCh
glog.Infof("Shutting down Loadbalancer Controller")
Expand Down
14 changes: 4 additions & 10 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package controller

import (
"time"

apiv1 "k8s.io/api/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -27,10 +25,6 @@ import (
"k8s.io/ingress-gce/pkg/utils"
)

const (
nodeUpdatePeriod = 1 * time.Second
)

// NodeController synchronizes the state of the nodes to the unmanaged instance
// groups.
type NodeController struct {
Expand Down Expand Up @@ -66,12 +60,12 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No
return c
}

// Run a go routine to process updates for the controller.
func (c *NodeController) Run(stopCh chan struct{}) {
go c.queue.Run(nodeUpdatePeriod, stopCh)
// Run a goroutine to process updates for the controller.
func (c *NodeController) Run() {
c.queue.Run()
}

// Run a go routine to process updates for the controller.
// Shutdown shuts down the goroutine that processes node updates.
func (c *NodeController) Shutdown() {
c.queue.Shutdown()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/firewalls/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func (fwc *FirewallController) ToSvcPorts(ings *extensions.IngressList) []utils.
return knownPorts
}

func (fwc *FirewallController) Run(stopCh chan struct{}) {
func (fwc *FirewallController) Run() {
defer fwc.shutdown()
fwc.queue.Run(time.Second, stopCh)
fwc.queue.Run()
}

// This should only be called when the process is being terminated.
Expand Down
45 changes: 18 additions & 27 deletions pkg/utils/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ limitations under the License.
package utils

import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
Expand All @@ -31,7 +28,7 @@ var (

// TaskQueue is a rate limited operation queue.
type TaskQueue interface {
Run(period time.Duration, stopCh <-chan struct{})
Run()
Enqueue(objs ...interface{})
Shutdown()
}
Expand All @@ -53,9 +50,23 @@ type PeriodicTaskQueue struct {
}

// Run the task queue. This will block until the Shutdown() has been called.
// TODO: seems redundant to both have stopCh and Shutdown().
func (t *PeriodicTaskQueue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
func (t *PeriodicTaskQueue) Run() {
for {
key, quit := t.queue.Get()
if quit {
close(t.workerDone)
return
}
glog.V(4).Infof("Syncing %v (%v)", key, t.resource)
if err := t.sync(key.(string)); err != nil {
glog.Errorf("Requeuing %q due to error: %v (%v)", key, err, t.resource)
t.queue.AddRateLimited(key)
} else {
glog.V(4).Infof("Finished syncing %v", key)
t.queue.Forget(key)
}
t.queue.Done(key)
}
}

// Enqueue one or more keys to the work queue.
Expand All @@ -78,26 +89,6 @@ func (t *PeriodicTaskQueue) Shutdown() {
<-t.workerDone
}

// worker processes work in the queue through sync.
func (t *PeriodicTaskQueue) worker() {
for {
key, quit := t.queue.Get()
if quit {
close(t.workerDone)
return
}
glog.V(4).Infof("Syncing %v (%v)", key, t.resource)
if err := t.sync(key.(string)); err != nil {
glog.Errorf("Requeuing %q due to error: %v (%v)", key, err, t.resource)
t.queue.AddRateLimited(key)
} else {
glog.V(4).Infof("Finished syncing %v", key)
t.queue.Forget(key)
}
t.queue.Done(key)
}
}

// NewPeriodicTaskQueue creates a new task queue with the default rate limiter.
func NewPeriodicTaskQueue(resource string, syncFn func(string) error) *PeriodicTaskQueue {
rl := workqueue.DefaultControllerRateLimiter()
Expand Down
10 changes: 6 additions & 4 deletions pkg/utils/taskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"errors"
"reflect"
"testing"
"time"

"k8s.io/client-go/tools/cache"
)

func TestPeriodicTaskQueue(t *testing.T) {
t.Parallel()
synced := map[string]bool{}
stopCh := make(chan struct{})
doneCh := make(chan struct{}, 1)

var tq TaskQueue
Expand All @@ -39,21 +37,25 @@ func TestPeriodicTaskQueue(t *testing.T) {
return errors.New("injected error")
case "stop":
doneCh <- struct{}{}
case "more":
t.Error("synced after TaskQueue.Shutdown()")
}
return nil
}
tq = NewPeriodicTaskQueue("test", sync)

go tq.Run(time.Microsecond, stopCh)
go tq.Run()
tq.Enqueue(cache.ExplicitKey("a"))
tq.Enqueue(cache.ExplicitKey("b"))
tq.Enqueue(cache.ExplicitKey("err"))
tq.Enqueue(cache.ExplicitKey("stop"))

<-doneCh
close(stopCh)
tq.Shutdown()

// Enqueue after Shutdown isn't going to be synced.
tq.Enqueue(cache.ExplicitKey("more"))

expected := map[string]bool{
"a": true,
"b": true,
Expand Down

0 comments on commit fdfb8b8

Please sign in to comment.