Skip to content

Commit

Permalink
Allow message_attributes to be passed in from ActiveJob client
Browse files Browse the repository at this point in the history
Co-authored-by: Tamara Temple <tamara.temple@drip.com>
  • Loading branch information
timbreitkreutz and Tamara Temple committed Nov 2, 2020
1 parent ba0218c commit b4327f8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
26 changes: 13 additions & 13 deletions lib/shoryuken/extensions/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,25 @@ def calculate_delay(timestamp)
def message(queue, job, options = {})
body = job.serialize

msg = {}
attributes = options.delete(:message_attributes) || {}

msg = {
message_body: body,
message_attributes: attributes.merge(MESSAGE_ATTRIBUTES)
}

if queue.fifo?
# See https://github.com/phstc/shoryuken/issues/457
msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id')))
end

msg[:message_body] = body
msg[:message_attributes] = message_attributes

msg.merge(options)
end

def register_worker!(job)
Shoryuken.register_worker(job.queue_name, JobWrapper)
end

def message_attributes
@message_attributes ||= {
'shoryuken_class' => {
string_value: JobWrapper.to_s,
data_type: 'String'
}
}
end

class JobWrapper #:nodoc:
include Shoryuken::Worker

Expand All @@ -88,6 +81,13 @@ def perform(_sqs_msg, hash)
Base.execute hash
end
end

MESSAGE_ATTRIBUTES = {
'shoryuken_class' => {
string_value: JobWrapper.to_s,
data_type: 'String'
}
}.freeze
end
end
end
24 changes: 24 additions & 0 deletions spec/shared_examples_for_active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
specify do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s)
expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String")
expect(hash[:message_attributes].keys).to eq(['shoryuken_class'])
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

Expand All @@ -39,6 +42,27 @@
subject.enqueue(job)
end
end

context 'with additional message attributes' do
it 'should combine with activejob attributes' do
custom_message_attributes = {
'tracer_id' => {
string_value: SecureRandom.hex,
data_type: 'String'
}
}

expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s)
expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String")
expect(hash[:message_attributes]['tracer_id'][:string_value]).to eq(custom_message_attributes['tracer_id'][:string_value])
expect(hash[:message_attributes]['tracer_id'][:data_type]).to eq("String")
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job, message_attributes: custom_message_attributes)
end
end
end

describe '#enqueue_at' do
Expand Down

0 comments on commit b4327f8

Please sign in to comment.