Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/all: Using long polling when consuming from a FIFO SQS queue ends up blocking requests #3456

Closed
mitsos1os opened this issue Aug 4, 2024 · 0 comments · Fixed by #3457

Comments

@mitsos1os
Copy link
Contributor

mitsos1os commented Aug 4, 2024

Describe the bug

The pubsub package has trouble consuming from a FIFO enabled SQS queue when using long polling on messages with the same MessageGroupID and blocks requests by the amount of wait time configured for long polling.

The reason this is happening is due to the way the pubsub library is structured and also due to the basic operation mode of FIFO SQS queues.

Details about SQS FIFO queue operation

When an SQS queue has FIFO enabled, then as the name suggests, it guarantees that the messages belonging to the same group (using the same MessageGroupID), will be delivered in-order and with an exactly-once delivery guarantee. This means, that unless a message is acknowledged, the next in-line will not be made available when reading from the queue. Sending a poll request when the last message has not been acknowledged, will return 0 messages from the queue. More info here

Long Polling is a mode during which you configure a Wait Time, which shows how much time a poll request will wait before returning until it gets a message or the configured wait time expires. This is used to reduce the number of repeated requests that might return 0 messages. For example, if you configure a wait time of 10 seconds and then send a poll request, it will wait up to 10 seconds, either returning as soon as a message is available or when the 10 seconds have passed by (returning 0 messages again). If short polling is used and a poll request is sent when 0 messages, are available the request will return immediately, sending an other request right after, repeating until a message is available. It is generally advised to use long polling. More info can be found here

pubsub operation

The package supports an internal auto-scaling mechanism that tries to increase the message consumption throughput based on the current speed of message processing (time from the message received, until it is acknowledged).

When the message rate is increased, the so-called batch size of messages requested is increased and requests are split into batches depending on the allowed limits of each driver/cloud-provider.

For example, SQS supports up to 10 messages returned in a single request. So, if the driver determines that we can process more than 10 messages it will increase the batch size to eg: 12. This will have to be split into two batches, one requesting 10 (the max SQS limit) and one requesting the remaining 2. These requests will be sent at the same time from the function getNextBatch and once resolved they will be passed over to the execution flow.

Now, this here is the problematic part:

  • Execution flow won't continue until both requests have returned their results
  • Messages are processed/acked only after the above flow is completed
  • SQS Queue will not provide the next group item unless the previous one is acknowledged

This leads to the following results:

  1. 1st request for 10 messages will be served immediately returning up to 10 messages from the queue
  2. the 2nd request of the remaining 2 will be sent at the same time
  3. However, since the 10 messages from the 1st request have not been acknowledged (message acknowledgment, is after we exit this flow), the second request will not receive any messages
  4. With long polling enabled, the second request will not return immediately, but wait up to the configured WaitTime until it eventually returns 0, because the first batch was not acked, blocking the whole flow.
  5. After that the flow will continue normally (only with the messages from the 1st request) and repeat the cycle continuously

To Reproduce

We have to create an SQS queue with the following:

  • SQS Queue has FIFO enabled
  • We use Long Polling in our connection (awssnssqs.SubscriptionOptions.WaitTime option)
  • There are a lot of messages in the same group (using the same MessageGroupID)
  • We consume fast enough so that auto-scaling will increase the batch size enough to split requests into more than one batch. (We can also emulate this by setting a lower MaxBatchSize limit on our connection).

Steps to reproduce the behavior

  1. Publish quite some messages to the FIFO SQS queue (to have room for consuming enough) using the same MessageGroupId
  2. Open a connection to the queue using long polling:
subURL := <OUR_QUEUE's URL>

urlMux := &pubsub.URLMux{}

u := &awssnssqs.URLOpener{
  UseV2: true,
  SubscriptionOptions: awssnssqs.SubscriptionOptions{
    WaitTime: 10 * time.Second, // enable long-polling
    ReceiveBatcherOptions: batcher.Options{
      MaxBatchSize: 2, // we reduce batch size to speed up bug appearance
    },
  },
}
urlMux.RegisterSubscription(awssnssqs.SQSScheme, u)
// Open a *pubsub.Subscription using the URL.
sub, err := urlMux.OpenSubscription(ctx, subURL)
  1. Start consuming the messages using some logging to be able to see the added delay:
for {
  now := time.Now()
  fmt.Println("Start message reception", now)
  m, err := sub.Receive(ctx)
  fmt.Println("End of message reception", time.Since(now))
  if err != nil {
    log.Print(err)
    return err
  }
  fmt.Printf("%s\n", m.Body)
  m.Ack()
}

Almost immediately, you will start noticing that you will be getting 10 seconds delay (the configured WaitTime) between start and end of the Message Receival (due to the 2nd batch request being blocked)

Expected behavior

I would expect that the batches are processed independently and the flow not being blocked. Since, Subscription.Receive returns a single message, it could work without getting blocked by independent requests

Version

v0.38.0

Additional context

The problematic code lies in the pubsub basic package (that's why I used all in the issue name) and not in the awssnssqs one, but because the bug is based on AWS operation, I am not sure if it is entirely transferrable to other cloud providers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant