Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow message_attributes to be set from ActiveJob clients #635

Merged
merged 3 commits into from
Jan 24, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
detectors:

UtilityFunction:
public_methods_only: true
26 changes: 13 additions & 13 deletions lib/shoryuken/extensions/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,25 @@ def calculate_delay(timestamp)
def message(queue, job, options = {})
body = job.serialize

msg = {}
attributes = options.delete(:message_attributes) || {}

msg = {
message_body: body,
message_attributes: attributes.merge(MESSAGE_ATTRIBUTES)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timbreitkreutz Sorry for taking long to get back on this.

Is it being used when enqueuing a job? MyJob.perform_later(params)

I do miss the ability to send Shoryuken specific params (for example, FIFO params) when using Active Job is this change adding this support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry was on vacation. Will try to get an answer soon :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly I don't think there is a way to inject this via perform_later--unless I'm missing something. This PR was actually aimed more at the middleware layer.

Copy link
Collaborator

@cjlarose cjlarose Jan 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phstc I was able to test this locally. The proposed changes do not permit specification of message_attributes when using perform_later (I've documented that as a separate issue in #646).

Instead, the following can be used to enqueue a message with additional message_attributes, without overriding the shoryuken_class attribute.

class AddJob < ActiveJob::Base
  def perform(a, b)
    puts a + b
  end
end

job = AddJob.new 1, 2
AddJob.queue_adapter.enqueue(job, message_attributes: { 'attr' => { string_value: 'myval', data_type: 'String' } })

Everything looks good from a testing perspective, so I think we're good to merge it. Although it doesn't solve #646, it doesn't hinder it and might actually be an important prerequisite.

@timbreitkreutz Thanks so much for your patience on this one!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timbreitkreutz If you could rebase on master, I'd appreciate it. Travis should run correctly after pushing up the rebased version. 🤞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cjlarose !


if queue.fifo?
# See https://github.com/phstc/shoryuken/issues/457
msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id')))
end

msg[:message_body] = body
msg[:message_attributes] = message_attributes

msg.merge(options)
end

def register_worker!(job)
Shoryuken.register_worker(job.queue_name, JobWrapper)
end

def message_attributes
@message_attributes ||= {
'shoryuken_class' => {
string_value: JobWrapper.to_s,
data_type: 'String'
}
}
end

class JobWrapper #:nodoc:
include Shoryuken::Worker

Expand All @@ -88,6 +81,13 @@ def perform(_sqs_msg, hash)
Base.execute hash
end
end

MESSAGE_ATTRIBUTES = {
'shoryuken_class' => {
string_value: JobWrapper.to_s,
data_type: 'String'
}
}.freeze
end
end
end
24 changes: 24 additions & 0 deletions spec/shared_examples_for_active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
specify do
expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_deduplication_id]).to_not be
expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s)
expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String")
expect(hash[:message_attributes].keys).to eq(['shoryuken_class'])
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

Expand All @@ -39,6 +42,27 @@
subject.enqueue(job)
end
end

context 'with additional message attributes' do
it 'should combine with activejob attributes' do
custom_message_attributes = {
'tracer_id' => {
string_value: SecureRandom.hex,
data_type: 'String'
}
}

expect(queue).to receive(:send_message) do |hash|
expect(hash[:message_attributes]['shoryuken_class'][:string_value]).to eq(described_class::JobWrapper.to_s)
expect(hash[:message_attributes]['shoryuken_class'][:data_type]).to eq("String")
expect(hash[:message_attributes]['tracer_id'][:string_value]).to eq(custom_message_attributes['tracer_id'][:string_value])
expect(hash[:message_attributes]['tracer_id'][:data_type]).to eq("String")
end
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper)

subject.enqueue(job, message_attributes: custom_message_attributes)
end
end
end

describe '#enqueue_at' do
Expand Down