Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for dynamically increasing visibility timeout for long-running message processing #320

Closed
ambethell opened this issue Sep 7, 2020 · 4 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@ambethell
Copy link

Hi,

Excellent work on this library - really looks to be a great improvement on the spring cloud SQS support.

With VisibilityExtender we're given hooks to extend the visibility timeout for the message being processed by that thread - do you think support to dynamically call extend() could be added into the library?

For instance - lets say I have a queue which has a default visibility timeout of 5 minutes - but certain messages may take up to an hour to process successfully. So I'd want to configure the listener of that queue to bump the visibility timeout on an interval for long-running threads- (up to a limit of an hour) - similar to what is being asked here : spring-attic/spring-cloud-aws#92 (comment).

👍

Thanks

@JaidenAshmore
Copy link
Owner

Hey, thanks for the interest!

So let me understand the use case for this, you have code something like:

@QueueListener("myQueue")
public void processMessage(final Message message, final VisibilityExtender unusedExtender) {
     someService.process(message); // this can take 5 minutes to an hour
}

However, you don't want to manually call the VisibilityExtender in the codebase and would like the library to do it under the hood, e.g. the API could be something like:

@QueueListener(value = "myQueue", messageVisibilityTimeoutInSeconds=30, automaticallyExtendedVisibilityTimeout = true)
public void processMessage(final Message message) {
    someService.process(message); // this can take 5 minutes to an hour
}

In this case above it would make sure to extend the visibility automatically every say 20 seconds.

Is this what you are thinking?

Potential design

The easiest way that we could this it to update or create a new MessageProcessor that runs one or more backgrounds threads that will automatically extending these messages that are currently being processed. One thread per message would be the easiest to implement but wouldn't be nice having all these extra threads in the app. One thread for all the messages would be more performant but a bit more complicated to get right as their timeouts could all be at different times.

Alternative

An alternative that you could do now and not need to wait for any changes to this library is to just implement this extending logic yourself, something like the below. I dunno if this would work but theoretically it should work.

@QueueListener("myQueue")
public void processMessage(final Message message, final VisibilityExtender visibilityExtender) {
     final AutomaticVisibilityExtender automaticVisibilityExtender = new AutomaticVisibilityExtender(visibilityExtender, Duration.ofMinutes(3));
    automaticVisibilityExtender.start();
     try {
          someService.process(message); // this can take 5 minutes to an hour
     } finally {
          automaticVisibilityExtender.stop();
     }
}


public class AutomaticVisibilityExtender {
          private final VisibilityExtender visibilityExtender;
          private final Duration duration;
          private final ScheduledExecutorService executorService;

          ...

          public void start() {
                          scheduledExecutorService.schedule(() => {
                                     visibilityExtender.extend();
                          }, duration.ofSeconds(), TimeUnit.SECONDS);
          } 

          public void stop() {
                     scheduledExecutorService.shutdown();
          }
} 

@JaidenAshmore JaidenAshmore added this to the 4.3.0 milestone Sep 7, 2020
@JaidenAshmore JaidenAshmore added the enhancement New feature or request label Sep 7, 2020
@ambethell
Copy link
Author

@JaidenAshmore yup exactly that - I guess I'd want the ability to specify

  • how frequently to heartbeat / poll and extend the timeout
  • the maximum processing time allowed for a message (before throwing an error)

@JaidenAshmore JaidenAshmore modified the milestones: 4.3.0, 4.4.0 Sep 26, 2020
@JaidenAshmore
Copy link
Owner

Hey, sorry for the delay but I will be taking a look at this over the next couple of weeks.

For your use case are you using synchronous or asynchronous processing of messages. e.g. does your listener look something like:

@QueueListener(...)
public void listener() {

}

or

@QueueListener(...)
public CompletableFuture<?> listener() {

}

The reason I ask is that the asynchronous processing is more difficult to implement due to the framework not having access to the thread that the message listener is using for async processing and so I can't easily interrupt it when the timeout has been reached. If you are just doing synchronous processing I can get something out for you quicker and then look at a better long term solution for also handling async interruption.

@JaidenAshmore
Copy link
Owner

Release Notes

Allows for the processing of long running messages with the message visibility of the message being automatically extended when it is about to reach its limit. If the message takes longer than a certain amount of time without completing it will be forced stopped via interrupting the thread.

Note: this only works with synchronous processing of messages, e.g. ones that do not return a CompletableFuture. Asynchronous message listener's will be supported at a later point.

Usage

Core
  1. Add the AutoVisibilityExtenderMessageProcessingDecorator to a DecoratingMessageProcessor wrapping your main MessageProcessor like the LambdaMessageProcessor.
        final MessageProcessingDecorator autoVisibilityExtender = new AutoVisibilityExtenderMessageProcessingDecorator(
            sqsAsyncClient,
            queueProperties,
            new AutoVisibilityExtenderMessageProcessingDecoratorProperties() {
    
                @Override
                public Duration visibilityTimeout() {
                    return Duration.ofMinutes(1);
                }
    
                @Override
                public Duration maxDuration() {
                    return Duration.ofMinutes(5);
                }
    
                @Override
                public Duration bufferDuration() {
                    return Duration.ofSeconds(30);
                }
            }
        );
    
        MessageProcessor processor = new DecoratingMessageProcessor(
             "listener-identifier",
              queueProperties,
              Collections.singletonList(autoVisibilityExtender),
              new LambdaMessageProcessor(
                   sqsAsyncClient,
                   queueProperties,
                   message -> {
                       try {
                           someLongFileIOMethod();
                       } catch (InterruptionException e) {
                            // the message took to long and it was interrupted
                       }
                   }
               )
        );
    See Core - How to extend message visibility during processing for more details.
Spring
  1. Add the @AutoVisibilityExtender annotation to your message listener.
    @QueueListener("${insert.queue.url.here}")
    @AutoVisibilityExtender(visibilityTimeoutInSeconds = 60, maximumDurationInSeconds = 300, bufferTimeInSeconds = 10)
    public void processMessage(@Payload final String payload) {
        // process the message payload here
    }
    See Spring - How to extend message visibility during processing for more details.

@JaidenAshmore JaidenAshmore self-assigned this Oct 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants