Skip to content

Commit

Permalink
Add option for jitter in retry delays
Browse files Browse the repository at this point in the history
Jitter, or randomness in signals, is a common technique for avoiding
calls that fail at the same time to retry at the same time. Therefore
jitter can help avoid the thundering herd problem, where many instances
of the same application all retry at the same time, causing a spike in
load on the system. For example, if a service was down for some reason
and comes back up, all queued events will hit at once and could cause
the service to fail for most events. However, without jitter, the events
will all retry at the same time, causing the same problem. Adding some
jitter (or randomness) to the retry delays result in the events retrying
at different times.

In our implementation, the configuration says how much jitter is used
based on the retry delay. For example, if `retry_jitter_ratio` is set to
0, no jitter will be applied and all events will retry after the usual
retry delay. If it is set to 100, the retry delay will be a random
number between 0 and the retry delay without jitter. If it is set to 20,
the retry delay will be a random number between 80 and 100% of the retry
delay without jitter.
  • Loading branch information
florianpilz committed Dec 18, 2023
1 parent f901452 commit d3b80f7
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ A subscription queue should be defined to receive any events raised for the subs
- **require_signature** [Bool] [Optional] [Default=false] This is used to specify if messages within this queue must be signed.
- **retry_delay** [Int] [Optional] [Default=30000] This is used to specify the time delay in milliseconds before a failed message is re-added to the subscription queue.
- **retry_back_off_weight** [Int] [Optional] [Default=1] Additional multiplier for the timeout backoff. Normally used when `retry_delay` is too small (eg: 30ms) in order to get meaningful backoff values.
- **retry_jitter_ratio** [Int] [Optional] [Default=0] Amount of randomness for retry delays in percent to avoid a bulk of retries hitting again at the same time. 0% means no randomness, while 100% means full randomness. With full randomness, a random number between 0 and the calculated retry delay will be chosen for the delay.

**Example**

Expand Down
10 changes: 10 additions & 0 deletions lib/eventq/eventq_aws/aws_calculate_visibility_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def initialize(max_timeout:, logger: EventQ.logger)
# @option max_retry_delay [Integer] Maximum amount of time a retry will take in ms
# @option retry_back_off_grace [Integer] Amount of retries to wait before starting to backoff
# @option retry_back_off_weight [Integer] Multiplier for the backoff retry
# @option retry_jitter_ratio [Integer] Ratio of how much jitter to apply to the backoff retry
# @option retry_delay [Integer] Amount of time to wait until retry in ms
# @return [Integer] the calculated visibility timeout in seconds
def call(retry_attempts:, queue_settings:)
Expand All @@ -28,6 +29,7 @@ def call(retry_attempts:, queue_settings:)
@max_retry_delay = queue_settings.fetch(:max_retry_delay)
@retry_back_off_grace = queue_settings.fetch(:retry_back_off_grace)
@retry_back_off_weight = queue_settings.fetch(:retry_back_off_weight)
@retry_jitter_ratio = queue_settings.fetch(:retry_jitter_ratio)
@retry_delay = queue_settings.fetch(:retry_delay)

visibility_timeout = if @allow_retry_back_off && retry_past_grace_period?
Expand All @@ -36,6 +38,8 @@ def call(retry_attempts:, queue_settings:)
timeout_without_back_off
end

visibility_timeout = apply_jitter(visibility_timeout) if @retry_jitter_ratio > 0

ms_to_seconds(visibility_timeout)
end

Expand Down Expand Up @@ -65,6 +69,12 @@ def timeout_with_back_off
check_for_max_timeout(visibility_timeout)
end

def apply_jitter(visibility_timeout)
ratio = @retry_jitter_ratio / 100.0
min_visibility_timeout = (visibility_timeout * (1 - ratio)).to_i
rand(min_visibility_timeout..visibility_timeout)
end

def ms_to_seconds(value)
value / 1000
end
Expand Down
1 change: 1 addition & 0 deletions lib/eventq/eventq_aws/aws_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def reject_message(queue, poller, msg, retry_attempts, message, args)
max_retry_delay: queue.max_retry_delay,
retry_back_off_grace: queue.retry_back_off_grace,
retry_back_off_weight: queue.retry_back_off_weight,
retry_jitter_ratio: queue.retry_jitter_ratio,
retry_delay: queue.retry_delay
}
)
Expand Down
3 changes: 3 additions & 0 deletions lib/eventq/eventq_base/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Queue
attr_accessor :retry_delay
attr_accessor :retry_back_off_grace
attr_accessor :retry_back_off_weight
attr_accessor :retry_jitter_ratio
# Character delimiter between namespace and queue name. Default = '-'
attr_accessor :namespace_delimiter
# Flag to control that the queue runs in isolation of auto creating the topic it belongs to
Expand All @@ -37,6 +38,8 @@ def initialize
@retry_back_off_grace = 0
# Multiplier for the backoff retry in case retry_delay is too small
@retry_back_off_weight = 1
# Ratio of how much jitter to apply to the retry delay
@retry_jitter_ratio = 0
@isolated = false
end
end
Expand Down
10 changes: 9 additions & 1 deletion lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,15 @@ def retry_delay(queue, retry_attempts)
message_ttl = queue.max_retry_delay
end

message_ttl
apply_jitter(queue.retry_jitter_ratio, message_ttl)
end

def apply_jitter(retry_jitter_ratio, message_ttl)
return message_ttl if retry_jitter_ratio == 0

ratio = retry_jitter_ratio / 100.0
min_message_ttl = (message_ttl * (1 - ratio)).to_i
rand(min_message_ttl..message_ttl)
end
end
end
Expand Down
46 changes: 45 additions & 1 deletion spec/eventq_aws/aws_calculate_visibility_timeout_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
let(:retry_delay) { 30_000 } # 30s
let(:retry_back_off_grace) { 1000 } # iterations before the backoff grace quicks in
let(:retry_back_off_weight) { 1 } # backoff multiplier
let(:retry_jitter_ratio) { 0 } # ratio for randomness on retry delay

let(:queue_settings) do
{
Expand All @@ -17,7 +18,8 @@
max_retry_delay: max_retry_delay,
retry_delay: retry_delay,
retry_back_off_grace: retry_back_off_grace,
retry_back_off_weight: retry_back_off_weight
retry_back_off_weight: retry_back_off_weight,
retry_jitter_ratio: retry_jitter_ratio
}
end

Expand All @@ -42,6 +44,27 @@

expect(result).to eq(ms_to_seconds(retry_delay))
end

context 'when jitter is set to 30' do
let(:retry_jitter_ratio) { 30 }

it 'stays between 70-100% of the calculated visibility timeout' do
results = []
1000.times do |i|
result = subject.call(
retry_attempts: i,
queue_settings: queue_settings
)

expect(result).to be_between(ms_to_seconds(retry_delay * 0.7), ms_to_seconds(retry_delay))

results << result
end

average = results.sum.to_f / results.size
expect(average).to be_between(ms_to_seconds(retry_delay * 0.75), ms_to_seconds(retry_delay * 0.95))
end
end
end

context 'when retry backoff is enabled' do
Expand Down Expand Up @@ -111,6 +134,27 @@
end
end

context 'when jitter is set to 30' do
let(:retry_jitter_ratio) { 30 }

it 'stays between 70-100% of the calculated visibility timeout' do
results = []
1000.times do |i|
result = subject.call(
retry_attempts: i,
queue_settings: queue_settings
)

expect(result).to be_between(ms_to_seconds(retry_delay * 0.7), ms_to_seconds(retry_delay))

results << result
end

average = results.sum.to_f / results.size
expect(average).to be_between(ms_to_seconds(retry_delay * 0.75), ms_to_seconds(retry_delay * 0.95))
end
end

context 'when exponential backoff is enabled' do
let(:max_retry_delay) { 40_000_000 }
let(:allow_exponential_back_off) { true }
Expand Down
35 changes: 35 additions & 0 deletions spec/eventq_aws/integration/aws_queue_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@
context 'queue.allow_retry_back_off = true' do
let(:retry_delay) { 1_000 }
let(:max_retry_delay) { 5_000 }
let(:retry_jitter_ratio) { 0 }

let(:allow_retry) { true }
let(:allow_retry_back_off) { true }
Expand All @@ -322,6 +323,7 @@
before do
subscriber_queue.retry_delay = retry_delay
subscriber_queue.max_retry_delay = max_retry_delay
subscriber_queue.retry_jitter_ratio = retry_jitter_ratio
subscriber_queue.allow_retry = allow_retry
subscriber_queue.allow_retry_back_off = allow_retry_back_off
subscriber_queue.allow_exponential_back_off = allow_exponential_back_off
Expand Down Expand Up @@ -402,6 +404,39 @@
expect(queue_worker.is_running).to eq(false)
end
end

context 'queue.retry_jitter_ratio = 50' do
let(:retry_delay) { 4_000 }
let(:retry_jitter_ratio) { 50 }

before do
allow(
subject.instance_variable_get('@calculate_visibility_timeout')
).to receive(:rand).and_return(2_000)
end

it 'retries after half the retry delay has passed' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 0.5, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(3)

expect(retry_attempt_count).to eq(2)

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end
end
end

def add_to_received_list(received_messages)
Expand Down
36 changes: 30 additions & 6 deletions spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -453,17 +453,19 @@
let(:allow_retry_back_off) { true }
let(:allow_exponential_back_off) { false }

let(:retry_delay) { 500 }
let(:max_retry_delay) { 5_000 }
let(:retry_jitter_ratio) { 0 }

before do
subscriber_queue.name = SecureRandom.uuid
subscriber_queue.allow_retry = allow_retry
subscriber_queue.allow_retry_back_off = allow_retry_back_off
subscriber_queue.allow_exponential_back_off = allow_exponential_back_off

# set queue retry delay to 0.5 seconds
subscriber_queue.retry_delay = 500

# set to max retry delay to 5 seconds
subscriber_queue.max_retry_delay = 5000
subscriber_queue.retry_delay = retry_delay
subscriber_queue.max_retry_delay = max_retry_delay
subscriber_queue.retry_jitter_ratio = retry_jitter_ratio

event_type = SecureRandom.uuid

Expand Down Expand Up @@ -540,6 +542,29 @@

expect(queue_worker.running?).to eq(false)
end

context 'queue.retry_jitter_ratio = 50' do
let(:retry_delay) { 4_000 }
let(:max_retry_delay) { 10_000 }
let(:retry_jitter_ratio) { 50 }

before do
allow(subject).to receive(:rand).and_return(2_000)
end

it 'retries after half the retry delay has passed' do
retry_attempt_count = 0

queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, :thread_count => 1, :sleep => 0.5, client: client}) do |event, args|
retry_attempt_count = args.retry_attempts
raise 'Fail on purpose to send event to retry queue.'
end

sleep(3)

expect(retry_attempt_count).to eq(1)
end
end
end

end
Expand Down Expand Up @@ -666,4 +691,3 @@ def add_to_received_list(received_messages)

end
end

9 changes: 7 additions & 2 deletions utilities/plot_visibility_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def plot(settings)
@queue_allow_retry_back_off = settings.fetch(:queue_allow_retry_back_off)
@queue_allow_exponential_back_off = settings.fetch(:queue_allow_exponential_back_off)
@queue_retry_back_off_weight = settings.fetch(:queue_retry_back_off_weight)
@queue_retry_jitter_ratio = settings.fetch(:queue_retry_jitter_ratio)
@queue_max_retry_delay = settings.fetch(:queue_max_retry_delay)
@queue_max_timeout = settings.fetch(:queue_max_timeout)
@queue_retry_back_off_grace = settings.fetch(:queue_retry_back_off_grace)
Expand Down Expand Up @@ -76,7 +77,8 @@ def calculate(retry_counter)
max_retry_delay: @queue_max_retry_delay,
retry_back_off_grace: @queue_retry_back_off_grace,
retry_back_off_weight: @queue_retry_back_off_weight,
retry_delay: @queue_retry_delay,
retry_jitter_ratio: @queue_retry_jitter_ratio,
retry_delay: @queue_retry_delay
}
)
end
Expand Down Expand Up @@ -130,7 +132,10 @@ def print_output(retry_counter, max_visibility_timeout,total_elapsed_time)
# Delay and retry for each queue iterations. The multiplier is necessary in case the calculated values
# are insignificant between iterations.
queue_retry_back_off_weight: 100, # Backoff multiplier
queue_retry_delay: 30 # 30ms
queue_retry_delay: 30, # 30ms

# Ratio of randomness allowed on retry delay to avoid a bulk of retries hitting again at the same time
queue_retry_jitter_ratio: 0 # ratio for randomness on retry delay
}

PlotVisibilityTimeout.new.plot(settings)

0 comments on commit d3b80f7

Please sign in to comment.