Skip to content

Commit

Permalink
Add support for selectively choosing deduplication keys. (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyto64 authored Sep 28, 2023
1 parent 10d4961 commit 925fbb5
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 9 deletions.
10 changes: 9 additions & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ Naming/FileName:
Style/GlobalVars:
AllowedVariables:
- $VERSION
- $REPO_ROOT
- $REPO_ROOT

Metrics/BlockLength:
Exclude:
- 'test/**/*.rb'

Metrics/ModuleLength:
Exclude:
- 'test/**/*.rb'
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Add support for selectively choosing deduplication keys.

* Feature - Set required Ruby version to >= 2.3 (#104)

* Issue - Run Rubocop on all files. (#104)
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,37 @@ When using FIFO queues, jobs will NOT be processed concurrently by the poller
to ensure the correct ordering. Additionally, all jobs on a FIFO queue will be queued
synchronously, even if you have configured the `amazon_sqs_async` adapter.

#### Message Deduplication ID

FIFO queues support [Message deduplication ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html), which is the token used for deduplication of sent messages.
If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.

##### Customize Deduplication keys

If necessary, the deduplication key used to create the message deduplication ID can be customized:

```ruby
Aws::Rails::SqsActiveJob.configure do |config|
config.excluded_deduplication_keys = [:job_class, :arguments]
end
# Or to set deduplication keys to exclude for a single job:
class YourJob < ApplicationJob
include Aws::Rails::SqsActiveJob
deduplicate_without :job_class, :arguments
#...
end
```

By default, the following keys are used for deduplication keys:

```
job_class, provider_job_id, queue_name, priority, arguments, executions, exception_executions, locale, timezone, enqueued_at
```

Note that `job_id` is NOT included in deduplication keys because it is unique for each initialization of the job, and the run-once behavior must be guaranteed for ActiveJob retries.
Even without setting job_id, it is implicitly excluded from deduplication keys.

#### Message Group IDs

FIFO queues require a message group id to be provided for the job. It is determined by:
Expand Down
12 changes: 8 additions & 4 deletions lib/active_job/queue_adapters/amazon_sqs_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ def _enqueue(job, body = nil, send_message_opts = {})
send_message_opts[:message_attributes] = message_attributes(job)

if Aws::Rails::SqsActiveJob.fifo?(queue_url)
# job_id is unique per initialization of job
# Remove it from message dup id to ensure run-once behavior
# with ActiveJob retries
send_message_opts[:message_deduplication_id] =
Digest::SHA256.hexdigest(Aws::Json.dump(body.except('job_id')))
Digest::SHA256.hexdigest(Aws::Json.dump(deduplication_body(job, body)))

message_group_id = job.message_group_id if job.respond_to?(:message_group_id)
message_group_id ||= Aws::Rails::SqsActiveJob.config.message_group_id
Expand All @@ -53,6 +50,13 @@ def message_attributes(job)
}
}
end

def deduplication_body(job, body)
ex_dedup_keys = job.excluded_deduplication_keys if job.respond_to?(:excluded_deduplication_keys)
ex_dedup_keys ||= Aws::Rails::SqsActiveJob.config.excluded_deduplication_keys

body.except(*ex_dedup_keys)
end
end

# create an alias to allow `:amazon` to be used as the adapter name
Expand Down
1 change: 1 addition & 0 deletions lib/aws-sdk-rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require_relative 'aws/rails/railtie'
require_relative 'aws/rails/notifications'
require_relative 'aws/rails/sqs_active_job/configuration'
require_relative 'aws/rails/sqs_active_job/deduplication'
require_relative 'aws/rails/sqs_active_job/executor'
require_relative 'aws/rails/sqs_active_job/job_runner'
require_relative 'aws/rails/sqs_active_job/lambda_handler'
Expand Down
18 changes: 15 additions & 3 deletions lib/aws/rails/sqs_active_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ class Configuration
shutdown_timeout: 15,
queues: {},
logger: ::Rails.logger,
message_group_id: 'SqsActiveJobGroup'
message_group_id: 'SqsActiveJobGroup',
excluded_deduplication_keys: ['job_id']
}.freeze

# @api private
attr_accessor :queues, :max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id

attr_reader :excluded_deduplication_keys

# Don't use this method directly: Configuration is a singleton class, use
# +Aws::Rails::SqsActiveJob.config+ to access the singleton config.
#
Expand Down Expand Up @@ -65,7 +68,7 @@ class Configuration
# for the poller.
#
# @option options [String] :config_file
# Override file to load configuration from. If not specified will
# Override file to load configuration from. If not specified will
# attempt to load from config/aws_sqs_active_job.yml.
#
# @option options [String] :message_group_id (SqsActiveJobGroup)
Expand All @@ -79,8 +82,13 @@ class Configuration
# +active_job.queue_adapter = :amazon_sqs_async+. Called with:
# [error, job, job_options]
#
# @option options [SQS::Client] :client SQS Client to use. A default
# @option options [SQS::Client] :client SQS Client to use. A default
# client will be created if none is provided.
#
# @option options [Array] :excluded_deduplication_keys (['job_id'])
# The type of keys stored in the array should be String or Symbol.
# Using this option, job_id is implicitly added to the keys.

def initialize(options = {})
options[:config_file] ||= config_file if File.exist?(config_file)
options = DEFAULTS
Expand All @@ -89,6 +97,10 @@ def initialize(options = {})
set_attributes(options)
end

def excluded_deduplication_keys=(keys)
@excluded_deduplication_keys = keys.map(&:to_s) | ['job_id']
end

def client
@client ||= begin
client = Aws::SQS::Client.new
Expand Down
21 changes: 21 additions & 0 deletions lib/aws/rails/sqs_active_job/deduplication.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module Aws
module Rails
# SQS ActiveJob modules
module SqsActiveJob
extend ActiveSupport::Concern

included do
class_attribute :excluded_deduplication_keys
end

# class methods for SQS ActiveJob.
module ClassMethods
def deduplicate_without(*keys)
self.excluded_deduplication_keys = keys.map(&:to_s) | ['job_id']
end
end
end
end
end
60 changes: 60 additions & 0 deletions test/active_job/queue_adapters/amazon_sqs_adapter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,66 @@ module QueueAdapters
sleep(0.2)
end

describe 'when job has excluded deduplication keys defined' do
let(:ex_dedup_keys) { %w[job_class queue_name] }
let(:ex_dudup_keys_with_job_id) { ex_dedup_keys << 'job_id' }
let(:hashed_body) { 'hashed_body' }

describe 'through #deduplicate_without' do
before do
TestJobWithDedupKeys.deduplicate_without(*ex_dedup_keys)
end

it 'adds customized message_deduplication_id' do
expect(Digest::SHA256).to receive(:hexdigest) do |body|
ex_dudup_keys_with_job_id.each do |key|
expect(body).not_to include(%("#{key}"))
end
end.and_return(hashed_body)
expect(client).to receive(:send_message).with(
{
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: Aws::Rails::SqsActiveJob.config.message_group_id,
message_deduplication_id: hashed_body
}
)

TestJobWithDedupKeys.perform_later('test')
sleep(0.2)
end
end

describe 'through Aws::Rails::SqsActiveJob config' do
before do
Aws::Rails::SqsActiveJob.configure do |config|
config.excluded_deduplication_keys = ex_dedup_keys
end
end

it 'adds customized message_deduplication_id' do
expect(Digest::SHA256).to receive(:hexdigest) do |body|
ex_dudup_keys_with_job_id.each do |key|
expect(body).not_to include(%("#{key}"))
end
end.and_return(hashed_body)
expect(client).to receive(:send_message).with(
{
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: Aws::Rails::SqsActiveJob.config.message_group_id,
message_deduplication_id: hashed_body
}
)

TestJob.perform_later('test')
sleep(0.2)
end
end
end

describe 'when job has #message_group_id defined' do
it 'adds message_deduplication_id and default message_group_id if job does not return a value' do
expect(client).to receive(:send_message).with(
Expand Down
7 changes: 6 additions & 1 deletion test/aws/rails/sqs_active_job/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ module SqsActiveJob
it 'allows configuration through a block' do
Aws::Rails::SqsActiveJob.configure do |config|
config.visibility_timeout = 360
config.excluded_deduplication_keys = [:job_class]
end
expect(Aws::Rails::SqsActiveJob.config.visibility_timeout).to eq 360

expect(Aws::Rails::SqsActiveJob.config).to have_attributes(
visibility_timeout: 360,
excluded_deduplication_keys: contain_exactly('job_class', 'job_id')
)
end
end

Expand Down
26 changes: 26 additions & 0 deletions test/aws/rails/sqs_active_job/deduplication_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

require 'test_helper'
require_relative 'test_job'

module Aws
module Rails
module SqsActiveJob
describe 'ClassMethods' do
describe '.deduplicate_without' do
let(:keys) { %w[job_id job_class queue_name] }
let(:expected_keys) { keys.map(&:to_s) | ['job_id'] }

it 'excluded deduplication keys set successfully' do
expect(TestJobWithDedupKeys.deduplicate_without(*keys)).to contain_exactly(*expected_keys)
end

it 'excluded deduplication keys set successfully and job_id is added' do
keys.delete(:job_id)
expect(TestJobWithDedupKeys.deduplicate_without(*keys)).to contain_exactly(*expected_keys)
end
end
end
end
end
end
4 changes: 4 additions & 0 deletions test/aws/rails/sqs_active_job/test_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ def perform(a1, a2); end
class TestJobWithMessageGroupID < TestJob
def message_group_id; end
end

class TestJobWithDedupKeys < TestJob
include Aws::Rails::SqsActiveJob
end

0 comments on commit 925fbb5

Please sign in to comment.