Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow worker to specify which arguments to include in uniquing hash #12

Merged
merged 3 commits into from
Mar 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,45 @@ sidekiq_options unique: true, unique_job_expiration: 120 * 60 # 2 hours

Requiring the gem in your gemfile should be sufficient to enable unique jobs.

### Finer Control over Uniqueness

Sometimes it is desired to have a finer control over which arguments are used in determining uniqueness of the job, and others may be _transient_. For this use-case, you need to
set `SidekiqUniqueJobs::Config.unique_args_enabled` to true in an initializer, and then defined either `unique_args` method, or a ruby proc.

```ruby
SidekiqUniqueJobs::Config.unique_args_enabled = true
```

The method or the proc can return a modified version of args without the transient arguments included, as shown below:

```ruby
class UniqueJobWithFilterMethod
include Sidekiq::Worker
sidekiq_options unique: true,
unique_args: :unique_args

def self.unique_args(name, id, options)
[ name, options[:type] ]
end

...

end

class UniqueJobWithFilterProc
include Sidekiq::Worker
sidekiq_options unique: true,
unique_args: ->(args) { [ args.first ] }

...

end
```

Note that objects passed into workers are converted to JSON *after* running through client middleware. In server
middleware, the JSON is passed directly to the worker `#perform` method. So, you may run into issues where the
arguments are different when enqueuing than they are when performing. Your `unique_args` method may need to
account for this.

## Contributing

Expand Down
8 changes: 8 additions & 0 deletions lib/sidekiq-unique-jobs/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@ def self.unique_prefix=(prefix)
def self.unique_prefix
@unique_prefix || "sidekiq_unique"
end

def self.unique_args_enabled=(enabled)
@unique_args_enabled = enabled
end

def self.unique_args_enabled?
@unique_args_enabled || false
end
end
end
17 changes: 17 additions & 0 deletions lib/sidekiq-unique-jobs/payload_helper.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
module SidekiqUniqueJobs
class PayloadHelper
def self.get_payload(klass, queue, *args)
args = yield_unique_args(klass, *args) if SidekiqUniqueJobs::Config.unique_args_enabled?
md5_arguments = {:class => klass, :queue => queue, :args => args}
"#{SidekiqUniqueJobs::Config.unique_prefix}:#{Digest::MD5.hexdigest(Sidekiq.dump_json(md5_arguments))}"
end

def self.yield_unique_args(klass, args)
worker_class = klass.constantize
unique_args = worker_class.get_sidekiq_options['unique_args']
filtered_args = if unique_args
case unique_args
when Proc
unique_args.call(args)
when Symbol
worker_class.send(unique_args, *args) if worker_class.respond_to?(unique_args)
end
end
filtered_args || args
rescue NameError # if we can't instantiate the class, we just fallback to not filtering args
args
end
end
end
37 changes: 37 additions & 0 deletions test/lib/sidekiq/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,43 @@ def perform(x)
assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end

describe 'when unique_args is defined' do
before { SidekiqUniqueJobs::Config.unique_args_enabled = true }
after { SidekiqUniqueJobs::Config.unique_args_enabled = false }

class QueueWorkerWithFilterMethod < QueueWorker
sidekiq_options :unique => true, :unique_args => :args_filter

def self.args_filter(*args)
args.first
end
end

class QueueWorkerWithFilterProc < QueueWorker
# slightly contrived example of munging args to the worker and removing a random bit.
sidekiq_options :unique => true, :unique_args => lambda { |args| a = args.last.dup; a.delete(:random); [ args.first, a ] }
end

it 'does not push duplicate messages based on args filter method' do
assert TestClient::QueueWorkerWithFilterMethod.respond_to?(:args_filter)
assert_equal :args_filter, TestClient::QueueWorkerWithFilterMethod.get_sidekiq_options['unique_args']

for i in (0..10).to_a
Sidekiq::Client.push('class' => TestClient::QueueWorkerWithFilterMethod, 'queue' => 'customqueue', 'args' => [1, i])
end
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end

it 'does not push duplicate messages based on args filter proc' do
assert_kind_of Proc, TestClient::QueueWorkerWithFilterProc.get_sidekiq_options['unique_args']

10.times do
Sidekiq::Client.push('class' => TestClient::QueueWorkerWithFilterProc, 'queue' => 'customqueue', 'args' => [ 1, {:random => rand(), :name => "foobar"} ])
end
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end
end

# TODO: If anyone know of a better way to check that the expiration for scheduled
# jobs are set around the same time as the scheduled job itself feel free to improve.
it 'expires the payload_hash when a scheduled job is scheduled at' do
Expand Down