Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change how number of pending messages is calculated and add more error handling. #533

Merged
merged 12 commits into from
Jan 7, 2020
58 changes: 43 additions & 15 deletions pkg/scalers/stan_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,47 +108,75 @@ func parseStanMetadata(metadata map[string]string) (stanMetadata, error) {

// IsActive determines if we need to scale from zero
func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
resp, err := http.Get(s.getMonitoringEndpoint())
monitoringEndpoint := s.getMonitoringEndpoint()

resp, err := http.Get(monitoringEndpoint)
if err != nil {
stanLog.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
return false, err
}

if resp.StatusCode == 404 {
baseResp, _ := http.Get(s.getSTANChannelsEndpoint())

if baseResp.StatusCode == 404 {
stanLog.Info("Streaming broker endpoint returned 404. Please ensure it has been created", "url", monitoringEndpoint, "channelName", s.metadata.subject)

} else {
stanLog.Info("Unable to connect to STAN. Please ensure you have configured the ScaledObject with the correct endpoint.", "baseResp.StatusCode", baseResp.StatusCode, "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
}

return false, err
}

defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&s.channelInfo)

return s.hasPendingMessage() || s.getMaxMsgLag() > 0, nil
}

func (s *stanScaler) getMonitoringEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz?" + "channel=" + s.metadata.subject + "&subs=1"
func (s *stanScaler) getSTANChannelsEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz"
}

func (s *stanScaler) getTotalMessages() int64 {
return s.channelInfo.MsgCount
func (s *stanScaler) getMonitoringEndpoint() string {
return s.getSTANChannelsEndpoint() + "?channel=" + s.metadata.subject + "&subs=1"
}

func (s *stanScaler) getMaxMsgLag() int64 {
var maxValue int64
maxValue = 0
maxValue := int64(0)
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.LastSent > maxValue && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
if subs.LastSent > maxValue && subs.QueueName == combinedQueueName {
maxValue = subs.LastSent
}
}
cwoolum marked this conversation as resolved.
Show resolved Hide resolved

return s.channelInfo.MsgCount - maxValue
return s.channelInfo.LastSequence - maxValue
}

func (s *stanScaler) hasPendingMessage() bool {
var hasPending bool
hasPending = false
func (s *stanScaler) hasPendingMessage() bool {
subscriberFound := false
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.PendingCount > 0 && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
hasPending = true
if subs.QueueName == combinedQueueName {
subscriberFound = true

if subs.PendingCount > 0 {
return true
}

break
}
}

return hasPending
if !subscriberFound {
stanLog.Info("The STAN subscription was not found.", "combinedQueueName", combinedQueueName)
}

return false
}

func (s *stanScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
Expand Down