Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SQS SendMessage options (e.g., message_attributes, message_deduplication_id, and message_group_id) to be specified when using ActiveJob to enqueue messages #646

Closed
cjlarose opened this issue Jan 24, 2021 · 7 comments
Assignees

Comments

@cjlarose
Copy link
Collaborator

cjlarose commented Jan 24, 2021

When enqueuing a message with ActiveJob's perform_later method, it is not currently possible to specify SQS-specific options like the message_group_id, message_deduplication_id, or additional message_attributes. There doesn't appear to be much that ActiveJob offers in terms of being able to specify queue-adapter-specific options, but that kinda makes sense: it's meant to provide the common-denominator interface for all queue adapters.

It's desirable to use ActiveJob for some folks so they can write workers in a way that is queue-adapter-agnostic. In testing environments especially, it's nice to be able to swap out the adapter for the :async or :inline adapters.

It desirable, too, to be able to specify SQS-specific options when the server-side (enqueue-side) code is actually using the :shoryuken adapter. It's possible to use ShoryukenAdapter's API directly in order to specify message_group_id, message_deduplication_id, and message_system_attributes, andmessage_attributes (since #635) like this:

class MyJob < ActiveJob::Base
end

job = MyJob.new 'args'
MyJob.queue_adapter.enqueue(job, message_group_id: 'GroupOne')

However, this isn't really documented and actually shouldn't be encouraged because code like this won't work when you swap out ActiveJob backends. Ideally, we'd have a solution where users could specify SQS-specific options in a way that, when executed against different backends, those options were ignored.

Related discussion:

📌 #648

@cjlarose
Copy link
Collaborator Author

cjlarose commented Jan 24, 2021

One natural API might be to encode Shoryuken options in the job arguments:

class AddJob < ActiveJob::Base
  def perform(a, b)
    puts a + b
  end
end


AddJob.perform_later 123, 'abc', shoryuken_options: {
  message_group_id: 'abc123',
  message_deduplication_id: 'dedupe',
  message_attributes: {
    'key' => {
      string_value: 'myvalue',
      data_type: 'String'
    }
  }
}

But it's not clear how to get this to work. If we make it so that the ShoryukenAdapter strips the shoryuken_options from the job arguments, that would work when the job is executed using the ShoryukenAdapater, but would fail with wrong number of arguments (given 3, expected 2) when executed with any other adapter.

@cjlarose
Copy link
Collaborator Author

cjlarose commented Jan 24, 2021

The best option I've come across is to use .set, but it likely involves monkey-patching ActiveJob, as suggested in #4 (comment)

MyJob.set(message_group_id: 'abc123',
          message_deduplication_id: 'dedupe',
          message_attributes: {
            'key' => {
              string_value: 'myvalue',
              data_type: 'String'
            }
          })
     .perform_later 123, 'abc'

@cjlarose cjlarose self-assigned this Jan 24, 2021
@cjlarose
Copy link
Collaborator Author

cjlarose commented Jan 24, 2021

@timbreitkreutz I'm just curious: since you mentioned that your application specifies additional messages_attributes with the changes from #635, what does the code look like that actually enqueues those messages? Are you using ShoryukenAdapter#enqueue directly? Something like this?

class MyJob < ActiveJob::Base
end

job = MyJob.new 'arg1', 'arg2'
MyJob.queue_adapter.enqueue job,
                            message_attributes: {
                              'key' => {
                                string_value: 'myvalue',
                                data_type: 'String'
                              }
                            }

@cjlarose
Copy link
Collaborator Author

cjlarose commented Feb 1, 2021

Using .set to set SQS SendMessage parameters is now possible (#648). message_group_id, message_deduplication_id, message_attributes, and message_system_attributes are all supported.

While testing it, I noticed there's one thing I think I want to change before I close the issue and ship a new version: the sqs_send_message_parameters attribute on the ActiveJob::Base job that is returned to the caller has the SendMessage parameters that were specified by the user when set was called. Instead, it probably makes more sense to populate that attribute with the parameters that were actually sent to SQS, including any defaults (message_group_id), computed parameters (message_deduplication_id), or anything that was modified in client middleware.

@timbreitkreutz
Copy link
Contributor

Sorry @cjlarose didn't see your question earlier. Essentially in a middleware we are doing something like:

class ShoryukenClientMetrics
  def call(options)
    options[:message_attributes].merge!(message_attributes(options))
    yield
  end
  
  def message_attributes(options)
    {
      "app_enqueued_at" => {
        data_type: "Number",
        string_value: (Time.now.to_f + options[:delay_seconds].to_f).to_s
      }
    }
  end

@cjlarose
Copy link
Collaborator Author

cjlarose commented Feb 4, 2021

@timbreitkreutz Are you using that ShoryukenClientMetrics class in Shoryuken client middleware? If so, I'm a little surprised since that would work even without your changes from #635

class ShoryukenClientMetrics
  def call(options)
    options[:message_attributes].merge!(message_attributes(options))
    yield
  end
  
  def message_attributes(options)
    {
      "app_enqueued_at" => {
        data_type: "Number",
        string_value: (Time.now.to_f + options[:delay_seconds].to_f).to_s
      }
    }
  end
end

Shoryuken.configure_client do |config|
  config.client_middleware do |chain|
    chain.add ShoryukenClientMetrics
  end
end

AddJob.perform_later(123, 321)

@cjlarose
Copy link
Collaborator Author

cjlarose commented Feb 7, 2021

Support for setting SQS SendMessage parameters via .set is now available in shoryuken version 5.1.0 and documented in the wiki

@cjlarose cjlarose closed this as completed Feb 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants