Skip to content

Commit

Permalink
Merge pull request #103 from Sage/add-exponential-backoff-option
Browse files Browse the repository at this point in the history
Add option for exponential back off
  • Loading branch information
florianpilz authored Dec 21, 2023
2 parents c330fde + c659cf3 commit a971bda
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 119 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ A subscription queue should be defined to receive any events raised for the subs

- **allow_retry** [Bool] [Optional] [Default=false] This determines if the queue should allow processing failures to be retried.
- **allow_retry_back_off** [Bool] [Optional] [Default=false] This is used to specify if failed messages that retry should incrementally backoff.
- **allow_exponential_back_off** [Bool] [Optional] [Default=false] This is used to specify if failed messages that retry should expontentially backoff.
- **retry_back_off_grace** [Int] [Optional] [Default=0] This is the number of times to allow retries without applying retry back off if enabled.
- **dlq** [EventQ::Queue] [Optional] [Default=nil] A queue that will receive the messages which were not successfully processed after maximum number of receives by consumers. This is created at the same time as the parent queue.
- **max_retry_attempts** [Int] [Optional] [Default=5] This is used to specify the max number of times an event should be allowed to retry before failing.
Expand Down
23 changes: 15 additions & 8 deletions lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ def initialize(max_timeout:, logger: EventQ.logger)
# @param retry_attempts [Integer] Current retry
# @param queue_settings [Hash] Queue settings
# @option allow_retry_back_off [Bool] Enables/Disables backoff strategy
# @option allow_exponential_back_off [Bool] Enables/Disables exponential backoff strategy
# @option max_retry_delay [Integer] Maximum amount of time a retry will take in ms
# @option retry_back_off_grace [Integer] Amount of retries to wait before starting to backoff
# @option retry_back_off_weight [Integer] Multiplier for the backoff retry
# @option retry_delay [Integer] Amount of time to wait until retry in ms
# @return [Integer] the calculated visibility timeout in seconds
def call(retry_attempts:, queue_settings:)
@retry_attempts = retry_attempts
@retry_attempts = retry_attempts

@allow_retry_back_off = queue_settings.fetch(:allow_retry_back_off)
@max_retry_delay = queue_settings.fetch(:max_retry_delay)
@retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace)
@retry_back_off_weight= queue_settings.fetch(:retry_back_off_weight)
@retry_delay = queue_settings.fetch(:retry_delay)
@allow_retry_back_off = queue_settings.fetch(:allow_retry_back_off)
@allow_exponential_back_off = queue_settings.fetch(:allow_exponential_back_off)
@max_retry_delay = queue_settings.fetch(:max_retry_delay)
@retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace)
@retry_back_off_weight = queue_settings.fetch(:retry_back_off_weight)
@retry_delay = queue_settings.fetch(:retry_delay)

if @allow_retry_back_off && retry_past_grace_period?
visibility_timeout = timeout_with_back_off
Expand All @@ -53,7 +55,12 @@ def timeout_without_back_off
def timeout_with_back_off
factor = @retry_attempts - @retry_back_off_grace

visibility_timeout = ms_to_seconds(@retry_delay * factor * @retry_back_off_weight)
visibility_timeout = if @allow_exponential_back_off
ms_to_seconds(@retry_delay * @retry_back_off_weight * 2 ** (factor - 1))
else
ms_to_seconds(@retry_delay * @retry_back_off_weight * factor)
end

max_retry_delay = ms_to_seconds(@max_retry_delay)

if visibility_timeout > max_retry_delay
Expand All @@ -77,4 +84,4 @@ def check_for_max_timeout(visibility_timeout)
end
end
end
end
end
11 changes: 6 additions & 5 deletions lib/eventq/eventq_aws/aws_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ def reject_message(queue, poller, msg, retry_attempts, message, args)
visibility_timeout = @calculate_visibility_timeout.call(
retry_attempts: retry_attempts,
queue_settings: {
allow_retry_back_off: queue.allow_retry_back_off,
max_retry_delay: queue.max_retry_delay,
retry_back_off_grace: queue.retry_back_off_grace,
retry_back_off_weight: queue.retry_back_off_weight,
retry_delay: queue.retry_delay
allow_retry_back_off: queue.allow_retry_back_off,
allow_exponential_back_off: queue.allow_exponential_back_off,
max_retry_delay: queue.max_retry_delay,
retry_back_off_grace: queue.retry_back_off_grace,
retry_back_off_weight: queue.retry_back_off_weight,
retry_delay: queue.retry_delay
}
)

Expand Down
3 changes: 3 additions & 0 deletions lib/eventq/eventq_base/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module EventQ
class Queue
attr_accessor :allow_retry
attr_accessor :allow_retry_back_off
attr_accessor :allow_exponential_back_off
attr_accessor :dlq
attr_accessor :max_retry_attempts
attr_accessor :max_retry_delay
Expand All @@ -20,6 +21,8 @@ def initialize
@allow_retry = false
# Default retry back off settings
@allow_retry_back_off = false
# Default exponential back off settings
@allow_exponential_back_off = false
# Default max receive count is 30
@max_receive_count = 30
# Default max retry attempts is 5
Expand Down
28 changes: 18 additions & 10 deletions lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,7 @@ def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort)
message.retry_attempts += 1
retry_attempts = message.retry_attempts - queue.retry_back_off_grace
retry_attempts = 1 if retry_attempts < 1

if queue.allow_retry_back_off == true
message_ttl = retry_attempts * queue.retry_delay
if (retry_attempts * queue.retry_delay) > queue.max_retry_delay
EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." }
message_ttl = queue.max_retry_delay
end
else
message_ttl = queue.retry_delay
end
message_ttl = retry_delay(queue, retry_attempts)

EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" }
retry_exchange.publish(serialize_message(message), :expiration => message_ttl)
Expand Down Expand Up @@ -128,6 +119,23 @@ def process_message(payload, queue, channel, retry_exchange, delivery_tag, block
raise "Unrecognized status: #{status}"
end
end

def retry_delay(queue, retry_attempts)
return queue.retry_delay unless queue.allow_retry_back_off

message_ttl = if queue.allow_exponential_back_off
queue.retry_delay * 2 ** (retry_attempts - 1)
else
queue.retry_delay * retry_attempts
end

if message_ttl > queue.max_retry_delay
EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." }
message_ttl = queue.max_retry_delay
end

message_ttl
end
end
end
end
Expand Down
123 changes: 63 additions & 60 deletions spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
require 'spec_helper'

RSpec.describe EventQ::Amazon::CalculateVisibilityTimeout do
let(:allow_retry_back_off) { false }
let(:allow_exponential_back_off) { false }

let(:max_timeout) { 43_200 } # 43_200s (12h)
let(:retry_delay) { 30_000 } # 30s
let(:max_retry_delay) { 100_000 } # 100s
let(:retry_delay) { 30_000 } # 30s
let(:retry_back_off_grace) { 1000 } # iterations before the backoff grace quicks in
let(:retry_back_off_weight) { 1 } # backoff multiplier
let(:retry_back_off_weight) { 1 } # backoff multiplier

let(:queue_settings) do
{
allow_retry_back_off: allow_retry_back_off,
allow_exponential_back_off: allow_exponential_back_off,
max_retry_delay: max_retry_delay,
retry_delay: retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_back_off_weight: retry_back_off_weight
}
end

subject { described_class.new(max_timeout: max_timeout) }

Expand All @@ -14,28 +28,16 @@

it 'does not introduces backoff' do
result = subject.call(
retry_attempts: 1,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: max_retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: 1,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay))


result = subject.call(
retry_attempts: retry_back_off_grace + 100,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: max_retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace + 100,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay))
Expand All @@ -48,14 +50,8 @@
context 'when the retry_attempts is lower than the retry_back_off_grace' do
it 'does not introduce backoff' do
result = subject.call(
retry_attempts: retry_back_off_grace - 1,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: max_retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace - 1,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay))
Expand All @@ -67,14 +63,8 @@
retries_past_grace_period = 2

result = subject.call(
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: max_retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay) * retries_past_grace_period)
Expand All @@ -84,59 +74,72 @@
context 'when the visible_timeout exceeds the max_retry_delay' do
it 'returns the max_retry_delay' do
result = subject.call(
retry_attempts: retry_back_off_grace + 100_000,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: max_retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace + 100_000,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(max_retry_delay))
end
end

context 'when the visible_timeout is bigger than max_timeout' do
let(:max_retry_delay) { 50_000_000 }

it 'the visible_timeout is set to max_timeout' do
result = subject.call(
retry_attempts: retry_back_off_grace + 100_000,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: 50_000_000,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace + 100_000,
queue_settings: queue_settings
)

expect(result).to eq(max_timeout)
end
end

context 'when retry_back_off_weight is added' do
let(:retry_back_off_weight) { 2 }
let(:max_retry_delay) { 1_000_000 }

it 'the backoff is multiplied' do
retries_past_grace_period = 2
retry_back_off_weight = 2

result = subject.call(
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: {
allow_retry_back_off: allow_retry_back_off,
max_retry_delay: 1_000_000,
retry_back_off_grace: retry_back_off_grace,
retry_delay: retry_delay,
retry_back_off_weight: retry_back_off_weight
}
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay) * retries_past_grace_period * retry_back_off_weight)
end
end

context 'when exponential backoff is enabled' do
let(:max_retry_delay) { 40_000_000 }
let(:allow_exponential_back_off) { true }

it 'grow the delay by 2 to the power of retries past grace period' do
retries_past_grace_period = 10

result = subject.call(
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(retry_delay) * 2 ** (retries_past_grace_period - 1))
end

it 'still caps at max delay' do
retries_past_grace_period = 20

result = subject.call(
retry_attempts: retry_back_off_grace + retries_past_grace_period,
queue_settings: queue_settings
)

expect(result).to eq(ms_to_seconds(max_retry_delay))
end
end
end

def ms_to_seconds(value)
value / 1000
end
end
end
Loading

0 comments on commit a971bda

Please sign in to comment.