diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index 961d2cfa..fd48adb9 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -253,34 +253,14 @@ func (b *Beanstalk) getClient( return client.(BeanstalkClientInterface), nil } -func (b *Beanstalk) getApproxMessages(queueURI string) (int32, error) { +func (b *Beanstalk) getMessages(queueURI string) (int32, int32, error) { client, err := b.getClient(queueURI) if err != nil { - return 0, err - } - - jobsWaiting, _, _, err := client.getStats() - if err != nil { - return jobsWaiting, err - } - - return jobsWaiting, nil -} - -func (b *Beanstalk) getApproxMessagesNotVisible( - queueURI string) (int32, error) { - - client, err := b.getClient(queueURI) - if err != nil { - return 0, err - } - - _, _, jobsReserved, err := client.getStats() - if err != nil { - return jobsReserved, err + return 0, 0, err } - return jobsReserved, nil + jobsWaiting, _, jobsReserved, err := client.getStats() + return jobsWaiting, jobsReserved, err } func (b *Beanstalk) getIdleWorkers(queueURI string) (int32, error) { @@ -368,14 +348,14 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) { // klog.Infof("%s: messagesSentPerMinute=%v", queueSpec.name, messagesSentPerMinute) } - approxMessages, err := b.getApproxMessages(queueSpec.uri) + approxMessages, approxMessagesNotVisible, err := b.getMessages(queueSpec.uri) if err != nil { klog.Errorf("Unable to get approximate messages in queue %q, %v.", queueSpec.name, err) return } klog.Infof("%s: approxMessages=%d", queueSpec.name, approxMessages) - b.queues.updateMessage(key, approxMessages) + b.queues.updateMessage(key, approxMessages+approxMessagesNotVisible) if approxMessages != 0 { b.queues.updateIdleWorkers(key, -1) @@ -386,13 +366,7 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) { // approxMessagesNotVisible is queried to prevent scaling down when their are // workers which are doing the processing, so if approxMessagesNotVisible > 0 we // do not scale down as those messages are still being processed (and we dont know which worker) - approxMessagesNotVisible, err := b.getApproxMessagesNotVisible( - queueSpec.uri) - if err != nil { - klog.Errorf("Unable to get approximate messages not visible in queue %q, %v.", - queueSpec.name, err) - return - } + // klog.Infof("approxMessagesNotVisible=%d", approxMessagesNotVisible) if approxMessagesNotVisible > 0 { diff --git a/pkg/queue/beanstalk_test.go b/pkg/queue/beanstalk_test.go index 43fdb299..fb835d58 100644 --- a/pkg/queue/beanstalk_test.go +++ b/pkg/queue/beanstalk_test.go @@ -77,6 +77,7 @@ func TestPollSyncWhenNoMessagesInQueue(t *testing.T) { }, } messages := int32(0) + reserved := int32(0) name := queueSpecs[0].name namespace := queueSpecs[0].namespace queueURI := getQueueURI(namespace, name) @@ -114,8 +115,8 @@ func TestPollSyncWhenNoMessagesInQueue(t *testing.T) { t.Errorf("expected %s qName, got=%v\n", name, nameGot) } - if messagesGot != messages { - t.Errorf("expected 0 messages, got=%v\n", messages) + if messagesGot != messages+reserved { + t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot) } if messagesPerMinGot != -1 { @@ -146,6 +147,7 @@ func TestPollSyncWhenMessagesInQueue(t *testing.T) { }, } messages := int32(25) + reserved := int32(0) name := queueSpecs[0].name namespace := queueSpecs[0].namespace queueURI := getQueueURI(namespace, name) @@ -183,8 +185,8 @@ func TestPollSyncWhenMessagesInQueue(t *testing.T) { t.Errorf("expected %s qName, got=%v\n", name, nameGot) } - if messagesGot != messages { - t.Errorf("expected 0 messages, got=%v\n", messages) + if messagesGot != messages+reserved { + t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot) } if messagesPerMinGot != -1 { @@ -235,7 +237,7 @@ func TestPollSyncWhenNoMessagesInQueueButMessagesAreInFlight(t *testing.T) { EXPECT(). getStats(). Return(messages, int32(0), reserved, nil). - Times(2) + Times(1) // TODO: due to this call not able to use poller as an interface poller.clientPool.Store(queueURI, mockBeanstalkClient) @@ -252,8 +254,8 @@ func TestPollSyncWhenNoMessagesInQueueButMessagesAreInFlight(t *testing.T) { t.Errorf("expected %s qName, got=%v\n", name, nameGot) } - if messagesGot != messages { - t.Errorf("expected 0 messages, got=%v\n", messages) + if messagesGot != messages+reserved { + t.Errorf("expected 0 messages, got=%v\n", messagesGot) } if messagesPerMinGot != -1 { @@ -305,7 +307,7 @@ func TestPollSyncWhenNoMessagesInQueueAndNoMessagesAreInFlight(t *testing.T) { EXPECT(). getStats(). Return(messages, currentWorkers, reserved, nil). - Times(3) + Times(2) // TODO: due to this call not able to use poller as an interface poller.clientPool.Store(queueURI, mockBeanstalkClient) @@ -323,8 +325,8 @@ func TestPollSyncWhenNoMessagesInQueueAndNoMessagesAreInFlight(t *testing.T) { t.Errorf("expected %s qName, got=%v\n", name, nameGot) } - if messagesGot != messages { - t.Errorf("expected 0 messages, got=%v\n", messages) + if messagesGot != messages+reserved { + t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot) } if messagesPerMinGot != -1 { diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index e9215dd6..227e67fa 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -40,9 +40,9 @@ type QueueSpec struct { host string protocol string queueServiceName string - // messages is the number of messages in the queue which have not - // been picked up for processing by the worker - // SQS: ApproximateNumberOfMessagesVisible metric + // messages is the total number of messages in the queue that are either + // not picked up or is not completely processed by the worker + // SQS: ApproximateNumberOfMessagesVisible + ApproximateNumberOfMessagesNotVisible messages int32 // messagesSent is the number of messages sent to the queue per minute // SQS: NumberOfMessagesSent metric diff --git a/pkg/queue/sqs.go b/pkg/queue/sqs.go index 7c78e8f8..43e5cedd 100644 --- a/pkg/queue/sqs.go +++ b/pkg/queue/sqs.go @@ -442,13 +442,6 @@ func (s *SQS) poll(key string, queueSpec QueueSpec) { } } klog.Infof("%s: approxMessages=%d", queueSpec.name, approxMessages) - s.queues.updateMessage(key, approxMessages) - - if approxMessages != 0 { - s.queues.updateIdleWorkers(key, -1) - s.waitForShortPollInterval() - return - } // approxMessagesNotVisible is queried to prevent scaling down when their are // workers which are doing the processing, so if approxMessagesNotVisible > 0 we @@ -471,6 +464,14 @@ func (s *SQS) poll(key string, queueSpec QueueSpec) { } // klog.Infof("approxMessagesNotVisible=%d", approxMessagesNotVisible) + s.queues.updateMessage(key, approxMessages+approxMessagesNotVisible) + + if approxMessages != 0 { + s.queues.updateIdleWorkers(key, -1) + s.waitForShortPollInterval() + return + } + if approxMessagesNotVisible > 0 { klog.Infof("%s: approxMessagesNotVisible > 0, not scaling down", queueSpec.name) s.waitForShortPollInterval()