Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages not emitted in time for processing, resulting in failed ack deadline extensions and redeliveries #1848

Open
vikmovcan opened this issue Oct 26, 2023 · 6 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. status: investigating The issue is under investigation, which is determined to be non-trivial. type: question Request for information or clarification. Not an issue.

Comments

@vikmovcan
Copy link

vikmovcan commented Oct 26, 2023

Description

Hello, I've been experiencing message redeliveries when using a subscription with exactly once delivery enabled where batches of messages are published. It is not clear whether this is a client or product issue, therefore I first wanted to make sure there is no misunderstanding/misconfiguration on my side.

Environment details

  • OS: MacOS 13
  • Node.js version: 20.8.1
  • npm version: 8.1.0
  • @google-cloud/pubsub version: v4.0.5

Steps to reproduce

  1. Create a pull subscription with exactly once delivery enabled and ack deadline of 60 seconds
    e.g.
   const projectId = "some-project-id";
   const topicNameOrId = "test-topic";
   const pubsub = new PubSub({ projectId });
   const topic = pubsub.topic(topicNameOrId);
   const subscriptionName = "eod-with-batching";

   await topic.createSubscription(subscriptionName, {
     enableExactlyOnceDelivery: true,
     ackDeadlineSeconds: 60,
   });
  1. Set up a subscription with the following configuration (I would like to manually extend the lease)
  const maxMessages = 300; // a couple of hundred should suffice
  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: maxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // issue more prominent with more than one stream
    },
  });
  1. Add a message handler which adds an interval which extends the deadline of each message received every 60 seconds by 120 seconds and awaits the completion of a long-running async action.
  const waitFor = (ms: number) =>
    new Promise((resolve) => {
      setTimeout(() => {
        resolve(0);
      }, ms);
    });
    
  const messageMap: Record<string, Message> = {};

  subscription.on("message", async (message: Message) => {
    if (messageMap[message.id]) {
       console.log(`message ${message.id} unexpectedly redelivered. delivery attempt:${message.deliveryAttempt}`);
    }
  
    messageMap[message.id] = message;
  
    const interval = setInterval(async () => {
      try {
        await message.modAckWithResponse(120);
      } catch (e) {
        console.log("error", e);
      }
    }, 1000 * 60);

    await waitFor(1000 * 60 * 1200);
  });
  1. Create and publish messages (note: redeliveries occur with or without batching)
  const messagesToPublish: string[] = [];

  // ensure some messages are in the backlog
  const numberOfBatchedMessages = maxMessages * 3;

  for (let i = 0; i < numberOfBatchedMessages; i++) {
    messagesToPublish.push(
      `Test message ${i + 1} at ${new Date().toISOString()}`
    );
  }

  const publishOptions: PublishOptions = {
    batching: {
      maxMessages: 5,
      maxMilliseconds: 100,
    },
  };

  const batchPublisher = pubsub.topic(topicNameOrId, publishOptions);

  const promises = messagesToPublish.map((m) => {
    return batchPublisher.publishMessage({
      data: Buffer.from(m),
    });
  });

    const publishedMessages = await Promise.all(promises);
    console.log(`published ${publishedMessages.length} messages`);
  1. Wait for redeliveries starting to happen within minutes of running the example above. Note that the issue is intermittent.

As I understand the documentation here for EOD (https://cloud.google.com/pubsub/docs/exactly-once-delivery), as long as

  1. message is being processed
  2. message deadline keeps being extended
  3. message is not negatively acked
    the message should not get redelivered (but it is).

Is there something I am missing/doing incorrectly or this is expected behaviour?

@vikmovcan vikmovcan added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Oct 26, 2023
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Oct 26, 2023
@vikmovcan
Copy link
Author

vikmovcan commented Nov 6, 2023

After investigating this further, I narrowed the issue down (will update the title to match)
Some messages received by the library internally do not get emitted immediately for processing (triggering message event callback). This was identified when comparing the received timestamps on the messages against the time they got into the callback for message event.

The issue can be reproduced given the following:

a) a topic with a few hundred messages
b) a subscription using streaming pull e.g.

  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: tripleDigitMaxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // any value greater than 1
    },
  });

c) message processing can take minutes

While this makes sense if you consider the maxMessages configuration value, it is not clear why
a) the maxMessages value gets exceeded (is this value per stream?), (not immediately, but over a period of time, the number of messages being processed reaches maxMessages value times the number of streams)
b) the "excess" messages are sent only after a (sometimes minute-long) delay
c) the library (internally) obtains and holds onto the messages which may potentially exceed their ack deadline soon after being emitted

If I want to process, for example, 500 messages at a time max with the above subscription config and manually extend each message’s ack deadline upon receipt without bumping into the problem described above, what options are there ? One option I see is restricting the number of streams to 1, thus eliminating the possibility of "excess" messages not being emitted or their ack deadlines not extended.

@vikmovcan vikmovcan changed the title Unexpected redeliveries of outstanding messages for subscriptions with exactly once delivery Messages not emitted in time for processing, resulting in failed ack deadline extensions and redeliveries Nov 6, 2023
@feywind feywind self-assigned this Nov 9, 2023
@hongalex
Copy link
Member

hongalex commented Mar 5, 2024

Apologies for the slow response on this issue. First, are you still encountering this issue?

If so, regarding manual lease management, is there a reason you prefer to do that versus the library's lease management?

@vikmovcan
Copy link
Author

Apologies for the slow response on this issue. First, are you still encountering this issue?

If so, regarding manual lease management, is there a reason you prefer to do that versus the library's lease management?

@hongalex, thanks for responding. yes, the issue persists.
the reason for this approach is due to a need to avoid hitting API limitations imposed by a 3rd party e.g. limited number of requests per minute and lack of concurrency support. Additionally, to not overwhelm each consumer, it made sense to 'buffer' a limited number of messages at a time and keep pushing back their ack deadline until processed (each message would get processed in a separate queue later). Unfortunately, cloud tasks was not a good fit for our use case instead of pubsub.

@hongalex
Copy link
Member

Also apologies for missing a few of your questions last time:

While this makes sense if you consider the maxMessages configuration value, it is not clear why
a) the maxMessages value gets exceeded (is this value per stream?), (not immediately, but over a period of time, the number of messages being processed reaches maxMessages value times the number of streams)
b) the "excess" messages are sent only after a (sometimes minute-long) delay
c) the library (internally) obtains and holds onto the messages which may potentially exceed their ack deadline soon after being emitted

This value is indeed set per stream. This is something we're interested in fixing, since we want to make maxMessages / numStreams = the server side flow control amount we allow per stream instead. However, this is technically a breaking change so this might need to wait until we do a major version bump.
However, this is also why we still have client side flow control enabled. This also explains why there is a delay between a message being received and when it is ready for processing.

If I want to process, for example, 500 messages at a time max with the above subscription config and manually extend each message’s ack deadline upon receipt without bumping into the problem described above, what options are there ? One option I see is restricting the number of streams to 1, thus eliminating the possibility of "excess" messages not being emitted or their ack deadlines not extended.
need to avoid hitting API limitations

This can be done by setting "minAckDeadline" setting passed in as SubscriberOptions.
I see you're already doing this. To achieve the behavior of extending the message's ack deadline by 120s, I recommend just setting both minAckDeadline and maxAckDeadline to 120 and don't disable the total timeout. This should actually decrease the number of API calls you make, since modack calls can be batched instead.

@hongalex hongalex added type: question Request for information or clarification. Not an issue. status: investigating The issue is under investigation, which is determined to be non-trivial. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels May 2, 2024
@hongalex
Copy link
Member

hongalex commented May 2, 2024

I wanted to clarify that message.modifyAckDeadline is not a public method that should be called. This is a private method that should be called in the library only. Please refer to my previous comment about how to minimize API calls with the client library.

@feywind feywind added the priority: p3 Desirable enhancement or fix. May not be included in next release. label May 2, 2024
@az-nextsec
Copy link

@hongalex What is the expected process to allow our consuming code (message callback) to keep extending the ack deadline?

E.g. I get a message and start processing some long running operation. At each step of the operation I want to keep telling pub/sub 'hey, wait for us another 10 minutes'. If all steps finished, I will issue a ack. If something fails, I will issue a nack.

If my process crashes and burns I want that deadline to still be at least 10 minutes from my last extend so there is a bit of a cooldown before redrive (and I do NOT want the automatic exponential backoff in redrive because if I nacked a message I want it resent straightaway, but if I abandoned it then I want there to be a delay).

At the moment we are making these message.modAckDeadline calls but just like @vikmovcan we are in SOME cases only experiencing issues with redrive happening even before the first modAckDeadline :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. status: investigating The issue is under investigation, which is determined to be non-trivial. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants