Skip to content

Commit

Permalink
Poller should operate on filtered WPA.
Browse files Browse the repository at this point in the history
SQS poller should operate on only sqs WPA resources and same goes for beanstalk.
Also change the default to start both.
#69 (comment)
  • Loading branch information
alok87 committed Jun 19, 2020
1 parent ce08a91 commit 2f6a0c9
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 112 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,15 @@ Flags:
--k8s-api-qps float qps indicates the maximum QPS to the k8s api from the clients(wpa). (default 5)
--kube-config string path of the kube config file, if not specified in cluster config is used
--metrics-port string specify where to serve the /metrics and /status endpoint. /metrics serve the prometheus metrics for WPA (default ":8787")
--queue-services string comma separated queue services, the WPA will start with (default "sqs")
--queue-services string comma separated queue services, the WPA will start with (default "sqs,beanstalkd")
--resync-period int sync period for the worker pod autoscaler (default 20)
--sqs-long-poll-interval int the duration (in seconds) for which the sqs receive message call waits for a message to arrive (default 20)
--sqs-short-poll-interval int the duration (in seconds) after which the next sqs api call is made to fetch the queue length (default 20)
--wpa-default-max-disruption string it is the default value for the maxDisruption in the WPA spec. This specifies how much percentage of pods can be disrupted in a single scale down acitivity. Can be expressed as integers or as a percentage. (default "100%")
--wpa-threads int wpa threadiness, number of threads to process wpa resources (default 10)

```
For adding multiple queue provider support, you can add comma supported providers.
If you need to enable multiple queue support, you can add queues comma separated in `--queue-services`. For example, if beanstalkd is started and there is no WPA beanstalk resource present, then nothing happens, until a beanstalk WPA resource is created. Queue poller service only operates on the filtered WPA objects.
```
--queue-services=sqs,beanstalkd
```
Expand Down
2 changes: 1 addition & 1 deletion artifacts/deployment-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ spec:
- --k8s-api-qps=5.0
- --k8s-api-burst=10
- --wpa-default-max-disruption=100%
- --queue-services=sqs
- --queue-services=sqs,beanstalkd
resources:
limits:
cpu: 100m
Expand Down
34 changes: 23 additions & 11 deletions cmd/workerpodautoscaler/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (v *runCmd) new() *cobra.Command {
flags.Int("sqs-long-poll-interval", 20, "the duration (in seconds) for which the sqs receive message call waits for a message to arrive")
flags.Int("beanstalk-short-poll-interval", 20, "the duration (in seconds) after which the next beanstalk api call is made to fetch the queue length")
flags.Int("beanstalk-long-poll-interval", 20, "the duration (in seconds) for which the beanstalk receive message call waits for a message to arrive")
flags.String("queue-services", "sqs", "comma separated queue services, the WPA will start with")
flags.String("queue-services", "sqs,beanstalkd", "comma separated queue services, the WPA will start with")

flags.String("metrics-port", ":8787", "specify where to serve the /metrics and /status endpoint. /metrics serve the prometheus metrics for WPA")
flags.Float64("k8s-api-qps", 5.0, "qps indicates the maximum QPS to the k8s api from the clients(wpa).")
Expand All @@ -96,14 +96,16 @@ func parseRegions(regionNames string) []string {
}

func (v *runCmd) run(cmd *cobra.Command, args []string) {
resyncPeriod := time.Second * time.Duration(v.Viper.GetInt("resync-period"))
resyncPeriod := time.Second * time.Duration(
v.Viper.GetInt("resync-period"))
wpaThraeds := v.Viper.GetInt("wpa-threads")
wpaDefaultMaxDisruption := v.Viper.GetString("wpa-default-max-disruption")
awsRegions := parseRegions(v.Viper.GetString("aws-regions"))
kubeConfigPath := v.Viper.GetString("kube-config")
sqsShortPollInterval := v.Viper.GetInt("sqs-short-poll-interval")
sqsLongPollInterval := v.Viper.GetInt("sqs-long-poll-interval")
beanstalkShortPollInterval := v.Viper.GetInt("beanstalk-short-poll-interval")
beanstalkShortPollInterval := v.Viper.GetInt(
"beanstalk-short-poll-interval")
beanstalkLongPollInterval := v.Viper.GetInt("beanstalk-long-poll-interval")
queueServicesToStartWith := v.Viper.GetString("queue-services")
metricsPort := v.Viper.GetString("metrics-port")
Expand Down Expand Up @@ -151,14 +153,18 @@ func (v *runCmd) run(cmd *cobra.Command, args []string) {
q = strings.TrimSpace(q)
switch q {
case queue.SqsQueueService:
sqs, err := queue.NewSQS(awsRegions, queues, sqsShortPollInterval, sqsLongPollInterval)
sqs, err := queue.NewSQS(
queue.SqsQueueService,
awsRegions, queues, sqsShortPollInterval, sqsLongPollInterval)
if err != nil {
klog.Fatalf("Error creating sqs Poller: %v", err)
}

queuingServices = append(queuingServices, sqs)
case queue.BeanstalkQueueService:
bs, err := queue.NewBeanstalk(queues, beanstalkShortPollInterval, beanstalkLongPollInterval)
bs, err := queue.NewBeanstalk(
queue.BeanstalkQueueService,
queues, beanstalkShortPollInterval, beanstalkLongPollInterval)
if err != nil {
klog.Fatalf("Error creating bs Poller: %v", err)
}
Expand All @@ -176,24 +182,30 @@ func (v *runCmd) run(cmd *cobra.Command, args []string) {
go poller.Run(stopCh)
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
customInformerFactory := informers.NewSharedInformerFactory(customClient, resyncPeriod)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(
kubeClient, resyncPeriod)
customInformerFactory := informers.NewSharedInformerFactory(
customClient, resyncPeriod)

controller := workerpodautoscalercontroller.NewController(kubeClient, customClient,
controller := workerpodautoscalercontroller.NewController(
kubeClient, customClient,
kubeInformerFactory.Apps().V1().Deployments(),
customInformerFactory.K8s().V1alpha1().WorkerPodAutoScalers(),
wpaDefaultMaxDisruption,
queues,
)

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
// notice that there is no need to run Start methods in a
// separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered
// informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
customInformerFactory.Start(stopCh)

go serveMetrics(metricsPort)

// TODO: autoscale the worker threads based on number of queues registred in WPA
// TODO: autoscale the worker threads based on number of
// queues registred in WPA
if err = controller.Run(wpaThraeds, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/queue/beanstalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
// Beanstalk is used to by the Poller to get the queue
// information from Beanstalk, it implements the QueuingService interface
type Beanstalk struct {
name string
queues *Queues
clientPool *sync.Map

Expand All @@ -24,11 +25,13 @@ type Beanstalk struct {
}

func NewBeanstalk(
name string,
queues *Queues,
shortPollInterval int,
longPollInterval int) (QueuingService, error) {

return &Beanstalk{
name: name,
queues: queues,
clientPool: new(sync.Map),

Expand Down Expand Up @@ -316,17 +319,21 @@ func (b *Beanstalk) longPollReceiveMessage(
return messages, idleWorkers, err
}

func (b *Beanstalk) waitForShortPollInterval() {
time.Sleep(b.shortPollInterval)
}

func (b *Beanstalk) GetName() string {
return b.name
}

func (b *Beanstalk) Sync(stopCh <-chan struct{}) {
// Sync is only required when cache is implemented
// keeping the noop function to keep the impl same as
// other queue providers
return
}

func (b *Beanstalk) waitForShortPollInterval() {
time.Sleep(b.shortPollInterval)
}

func (b *Beanstalk) poll(key string, queueSpec QueueSpec) {
if queueSpec.workers == 0 && queueSpec.messages == 0 {
// If there are no workers running we do a long poll to find a job(s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/beanstalk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func buildQueues(
<-doneChan
}

poller, err := NewBeanstalk(queues, 1, 1)
poller, err := NewBeanstalk(BeanstalkQueueService, queues, 1, 1)
if err != nil {
return nil, nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/queue/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
// the configured message queuing service provider
type Poller struct {
queues *Queues
service QueuingService
queueService QueuingService
threads map[string]bool
listThreadCh chan chan map[string]bool
updateThreadCh chan map[string]bool
}

func NewPoller(queues *Queues, service QueuingService) *Poller {
func NewPoller(queues *Queues, queueService QueuingService) *Poller {
return &Poller{
queues: queues,
service: service,
queueService: queueService,
threads: make(map[string]bool),
listThreadCh: make(chan chan map[string]bool),
updateThreadCh: make(chan map[string]bool),
Expand All @@ -43,7 +43,7 @@ func (p *Poller) runPollThread(key string) {
if queueSpec.name == "" {
return
}
p.service.poll(key, queueSpec)
p.queueService.poll(key, queueSpec)
}
}

Expand Down Expand Up @@ -80,10 +80,11 @@ func (p *Poller) Sync(stopCh <-chan struct{}) {

func (p *Poller) Run(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second * 1)
queueServiceName := p.queueService.GetName()
for {
select {
case <-ticker.C:
queues := p.queues.List()
queues := p.queues.List(queueServiceName)
// Create a new thread
for key, _ := range queues {
threads := p.listThreads()
Expand Down
74 changes: 35 additions & 39 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package queue

import (
"net/url"
"regexp"
"strings"

"k8s.io/klog"
Expand All @@ -15,8 +14,6 @@ var (
)

const (
QueueProviderSQS = "sqs"
QueueProviderBeanstalk = "beanstalk"
BenanstalkProtocol = "beanstalk"
UnsyncedQueueMessageCount = -1
UnsyncedMessagesSentPerMinute = -1
Expand All @@ -37,12 +34,12 @@ type Queues struct {

// QueueSpec is the specification for a single queue
type QueueSpec struct {
name string
namespace string
uri string
host string
protocol string
provider string
name string
namespace string
uri string
host string
protocol string
queueServiceName string
// messages is the number of messages in the queue which have not
// been picked up for processing by the worker
// SQS: ApproximateNumberOfMessagesVisible metric
Expand All @@ -58,7 +55,8 @@ type QueueSpec struct {
idleWorkers int32
workers int32

// secondsToProcessOneJob tells the time to process one job by one worker process
// secondsToProcessOneJob tells the time to process
// one job by one worker process
secondsToProcessOneJob float64
}

Expand Down Expand Up @@ -145,9 +143,12 @@ func (q *Queues) Sync(stopCh <-chan struct{}) {
}
}

func (q *Queues) Add(namespace string, name string, uri string, workers int32, secondsToProcessOneJob float64) error {
func (q *Queues) Add(namespace string, name string, uri string,
workers int32, secondsToProcessOneJob float64) error {

if uri == "" {
klog.Warningf("Queue is empty(or not synced) ignoring the wpa for uri: %s", uri)
klog.Warningf(
"Queue is empty(or not synced) ignoring the wpa for uri: %s", uri)
return nil
}

Expand All @@ -158,9 +159,10 @@ func (q *Queues) Add(namespace string, name string, uri string, workers int32, s
return err
}

found, provider, err := getProvider(host, protocol)
if !found {
klog.Warningf("Unsupported queue provider: %s, ignoring wpa: %s", provider, name)
supported, queueServiceName, err := getQueueServiceName(host, protocol)
if !supported {
klog.Warningf(
"Unsupported: %s, skipping wpa: %s", queueServiceName, name)
return nil
}

Expand All @@ -180,7 +182,7 @@ func (q *Queues) Add(namespace string, name string, uri string, workers int32, s
uri: uri,
protocol: protocol,
host: host,
provider: provider,
queueServiceName: queueServiceName,
messages: messages,
messagesSentPerMinute: messagesSent,
workers: workers,
Expand All @@ -197,14 +199,24 @@ func (q *Queues) Delete(namespace string, name string) error {
return nil
}

func (q *Queues) List() map[string]QueueSpec {
func (q *Queues) ListAll() map[string]QueueSpec {
listResultCh := make(chan map[string]QueueSpec)
q.listCh <- listResultCh
return <-listResultCh
}

func (q *Queues) List(queueServiceName string) map[string]QueueSpec {
filteredQueues := make(map[string]QueueSpec)
for key, spec := range q.ListAll() {
if spec.queueServiceName == queueServiceName {
filteredQueues[key] = spec
}
}
return filteredQueues
}

func (q *Queues) ListQueue(key string) QueueSpec {
item := q.List()
item := q.ListAll()
if _, ok := item[key]; !ok {
return QueueSpec{}
}
Expand All @@ -216,13 +228,16 @@ func (q *Queues) listQueueByNamespace(namespace string, name string) QueueSpec {
return q.ListQueue(getKey(namespace, name))
}

func (q *Queues) GetQueueInfo(namespace string, name string) (string, int32, float64, int32) {
func (q *Queues) GetQueueInfo(
namespace string, name string) (string, int32, float64, int32) {

spec := q.listQueueByNamespace(namespace, name)
if spec.name == "" {
return "", 0, 0.0, 0
}

return spec.name, spec.messages, spec.messagesSentPerMinute, spec.idleWorkers
return spec.name, spec.messages,
spec.messagesSentPerMinute, spec.idleWorkers
}

func parseQueueURI(uri string) (string, string, error) {
Expand All @@ -234,25 +249,6 @@ func parseQueueURI(uri string) (string, string, error) {
return parsedURI.Scheme, parsedURI.Host, nil
}

// getProvider returns the provider name
// TODO: add validation for the queue provider in the wpa custom resource
func getProvider(host string, protocol string) (bool, string, error) {
matched, err := regexp.MatchString("^sqs.[a-z][a-z]-[a-z]*-[0-9]{1}.amazonaws.com", host)
if err != nil {
return false, "", nil
}

if matched {
return true, QueueProviderSQS, nil
}

if protocol == BenanstalkProtocol {
return true, QueueProviderBeanstalk, nil
}

return false, "", nil
}

func getQueueName(name string) string {
splitted := strings.Split(name, "/")
return splitted[len(splitted)-1]
Expand Down
Loading

0 comments on commit 2f6a0c9

Please sign in to comment.