diff --git a/lib/shoryuken/extensions/active_job_adapter.rb b/lib/shoryuken/extensions/active_job_adapter.rb index 740210e8..92ad4807 100644 --- a/lib/shoryuken/extensions/active_job_adapter.rb +++ b/lib/shoryuken/extensions/active_job_adapter.rb @@ -53,16 +53,18 @@ def calculate_delay(timestamp) def message(queue, job, options = {}) body = job.serialize - msg = {} + attributes = options.delete(:message_attributes) || {} + + msg = { + message_body: body, + message_attributes: attributes.merge(MESSAGE_ATTRIBUTES) + } if queue.fifo? # See https://github.com/phstc/shoryuken/issues/457 msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id'))) end - msg[:message_body] = body - msg[:message_attributes] = message_attributes - msg.merge(options) end @@ -70,15 +72,6 @@ def register_worker!(job) Shoryuken.register_worker(job.queue_name, JobWrapper) end - def message_attributes - @message_attributes ||= { - 'shoryuken_class' => { - string_value: JobWrapper.to_s, - data_type: 'String' - } - } - end - class JobWrapper #:nodoc: include Shoryuken::Worker @@ -88,6 +81,13 @@ def perform(_sqs_msg, hash) Base.execute hash end end + + MESSAGE_ATTRIBUTES = { + 'shoryuken_class' => { + string_value: JobWrapper.to_s, + data_type: 'String' + } + }.freeze end end end diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index ae4ed956..06a60d1a 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -19,6 +19,9 @@ specify do expect(queue).to receive(:send_message) do |hash| expect(hash[:message_deduplication_id]).to_not be + expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s) + expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String") + expect(hash[:message_attributes].keys).to eq(['shoryuken_class']) end expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) @@ -39,6 +42,27 @@ subject.enqueue(job) end end + + context 'with additional message attributes' do + it 'should combine with activejob attributes' do + custom_message_attributes = { + 'tracer_id' => { + string_value: SecureRandom.hex, + data_type: 'String' + } + } + + expect(queue).to receive(:send_message) do |hash| + expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s) + expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String") + expect(hash[:message_attributes]['tracer_id'][:string_value]).to eq(custom_message_attributes['tracer_id'][:string_value]) + expect(hash[:message_attributes]['tracer_id'][:data_type]).to eq("String") + end + expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) + + subject.enqueue(job, message_attributes: custom_message_attributes) + end + end end describe '#enqueue_at' do