Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue.messages should include both messages visible and not-visible #86

Merged
merged 2 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions pkg/queue/beanstalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,34 +253,14 @@ func (b *Beanstalk) getClient(
return client.(BeanstalkClientInterface), nil
}

func (b *Beanstalk) getApproxMessages(queueURI string) (int32, error) {
func (b *Beanstalk) getMessages(queueURI string) (int32, int32, error) {
client, err := b.getClient(queueURI)
if err != nil {
return 0, err
}

jobsWaiting, _, _, err := client.getStats()
if err != nil {
return jobsWaiting, err
}

return jobsWaiting, nil
}

func (b *Beanstalk) getApproxMessagesNotVisible(
queueURI string) (int32, error) {

client, err := b.getClient(queueURI)
if err != nil {
return 0, err
}

_, _, jobsReserved, err := client.getStats()
if err != nil {
return jobsReserved, err
return 0, 0, err
}

return jobsReserved, nil
jobsWaiting, _, jobsReserved, err := client.getStats()
return jobsWaiting, jobsReserved, err
}

func (b *Beanstalk) getIdleWorkers(queueURI string) (int32, error) {
Expand Down Expand Up @@ -368,14 +348,14 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) {
// klog.Infof("%s: messagesSentPerMinute=%v", queueSpec.name, messagesSentPerMinute)
}

approxMessages, err := b.getApproxMessages(queueSpec.uri)
approxMessages, approxMessagesNotVisible, err := b.getMessages(queueSpec.uri)
if err != nil {
klog.Errorf("Unable to get approximate messages in queue %q, %v.",
queueSpec.name, err)
return
}
klog.Infof("%s: approxMessages=%d", queueSpec.name, approxMessages)
b.queues.updateMessage(key, approxMessages)
b.queues.updateMessage(key, approxMessages+approxMessagesNotVisible)

if approxMessages != 0 {
b.queues.updateIdleWorkers(key, -1)
Expand All @@ -386,13 +366,7 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) {
// approxMessagesNotVisible is queried to prevent scaling down when their are
// workers which are doing the processing, so if approxMessagesNotVisible > 0 we
// do not scale down as those messages are still being processed (and we dont know which worker)
approxMessagesNotVisible, err := b.getApproxMessagesNotVisible(
queueSpec.uri)
if err != nil {
klog.Errorf("Unable to get approximate messages not visible in queue %q, %v.",
queueSpec.name, err)
return
}

// klog.Infof("approxMessagesNotVisible=%d", approxMessagesNotVisible)

if approxMessagesNotVisible > 0 {
Expand Down
22 changes: 12 additions & 10 deletions pkg/queue/beanstalk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestPollSyncWhenNoMessagesInQueue(t *testing.T) {
},
}
messages := int32(0)
reserved := int32(0)
name := queueSpecs[0].name
namespace := queueSpecs[0].namespace
queueURI := getQueueURI(namespace, name)
Expand Down Expand Up @@ -114,8 +115,8 @@ func TestPollSyncWhenNoMessagesInQueue(t *testing.T) {
t.Errorf("expected %s qName, got=%v\n", name, nameGot)
}

if messagesGot != messages {
t.Errorf("expected 0 messages, got=%v\n", messages)
if messagesGot != messages+reserved {
t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot)
}

if messagesPerMinGot != -1 {
Expand Down Expand Up @@ -146,6 +147,7 @@ func TestPollSyncWhenMessagesInQueue(t *testing.T) {
},
}
messages := int32(25)
reserved := int32(0)
name := queueSpecs[0].name
namespace := queueSpecs[0].namespace
queueURI := getQueueURI(namespace, name)
Expand Down Expand Up @@ -183,8 +185,8 @@ func TestPollSyncWhenMessagesInQueue(t *testing.T) {
t.Errorf("expected %s qName, got=%v\n", name, nameGot)
}

if messagesGot != messages {
t.Errorf("expected 0 messages, got=%v\n", messages)
if messagesGot != messages+reserved {
t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot)
}

if messagesPerMinGot != -1 {
Expand Down Expand Up @@ -235,7 +237,7 @@ func TestPollSyncWhenNoMessagesInQueueButMessagesAreInFlight(t *testing.T) {
EXPECT().
getStats().
Return(messages, int32(0), reserved, nil).
Times(2)
Times(1)

// TODO: due to this call not able to use poller as an interface
poller.clientPool.Store(queueURI, mockBeanstalkClient)
Expand All @@ -252,8 +254,8 @@ func TestPollSyncWhenNoMessagesInQueueButMessagesAreInFlight(t *testing.T) {
t.Errorf("expected %s qName, got=%v\n", name, nameGot)
}

if messagesGot != messages {
t.Errorf("expected 0 messages, got=%v\n", messages)
if messagesGot != messages+reserved {
t.Errorf("expected 0 messages, got=%v\n", messagesGot)
}

if messagesPerMinGot != -1 {
Expand Down Expand Up @@ -305,7 +307,7 @@ func TestPollSyncWhenNoMessagesInQueueAndNoMessagesAreInFlight(t *testing.T) {
EXPECT().
getStats().
Return(messages, currentWorkers, reserved, nil).
Times(3)
Times(2)

// TODO: due to this call not able to use poller as an interface
poller.clientPool.Store(queueURI, mockBeanstalkClient)
Expand All @@ -323,8 +325,8 @@ func TestPollSyncWhenNoMessagesInQueueAndNoMessagesAreInFlight(t *testing.T) {
t.Errorf("expected %s qName, got=%v\n", name, nameGot)
}

if messagesGot != messages {
t.Errorf("expected 0 messages, got=%v\n", messages)
if messagesGot != messages+reserved {
t.Errorf("expected %v messages, got=%v\n", messages+reserved, messagesGot)
}

if messagesPerMinGot != -1 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type QueueSpec struct {
host string
protocol string
queueServiceName string
// messages is the number of messages in the queue which have not
// been picked up for processing by the worker
// SQS: ApproximateNumberOfMessagesVisible metric
// messages is the total number of messages in the queue that are either
// not picked up or is not completely processed by the worker
// SQS: ApproximateNumberOfMessagesVisible + ApproximateNumberOfMessagesNotVisible
messages int32
// messagesSent is the number of messages sent to the queue per minute
// SQS: NumberOfMessagesSent metric
Expand Down
15 changes: 8 additions & 7 deletions pkg/queue/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,6 @@ func (s *SQS) poll(key string, queueSpec QueueSpec) {
}
}
klog.Infof("%s: approxMessages=%d", queueSpec.name, approxMessages)
s.queues.updateMessage(key, approxMessages)

if approxMessages != 0 {
s.queues.updateIdleWorkers(key, -1)
s.waitForShortPollInterval()
return
}

// approxMessagesNotVisible is queried to prevent scaling down when their are
// workers which are doing the processing, so if approxMessagesNotVisible > 0 we
Expand All @@ -471,6 +464,14 @@ func (s *SQS) poll(key string, queueSpec QueueSpec) {
}
// klog.Infof("approxMessagesNotVisible=%d", approxMessagesNotVisible)

s.queues.updateMessage(key, approxMessages+approxMessagesNotVisible)

if approxMessages != 0 {
s.queues.updateIdleWorkers(key, -1)
s.waitForShortPollInterval()
return
}

if approxMessagesNotVisible > 0 {
klog.Infof("%s: approxMessagesNotVisible > 0, not scaling down", queueSpec.name)
s.waitForShortPollInterval()
Expand Down