diff --git a/lib/resque.rb b/lib/resque.rb index e4df4d936..25e01be89 100644 --- a/lib/resque.rb +++ b/lib/resque.rb @@ -584,9 +584,9 @@ def keys def queue_sizes queue_names = queues - sizes = redis.pipelined do + sizes = redis.pipelined do |piped| queue_names.each do |name| - redis.llen("queue:#{name}") + piped.llen("queue:#{name}") end end @@ -597,11 +597,11 @@ def queue_sizes def sample_queues(sample_size = 1000) queue_names = queues - samples = redis.pipelined do + samples = redis.pipelined do |piped| queue_names.each do |name| key = "queue:#{name}" - redis.llen(key) - redis.lrange(key, 0, sample_size - 1) + piped.llen(key) + piped.lrange(key, 0, sample_size - 1) end end diff --git a/lib/resque/data_store.rb b/lib/resque/data_store.rb index 9b317a489..bfe8143b3 100644 --- a/lib/resque/data_store.rb +++ b/lib/resque/data_store.rb @@ -100,9 +100,9 @@ def initialize(redis) @redis = redis end def push_to_queue(queue,encoded_item) - @redis.pipelined do - watch_queue(queue) - @redis.rpush redis_key_for_queue(queue), encoded_item + @redis.pipelined do |piped| + watch_queue(queue, redis: piped) + piped.rpush redis_key_for_queue(queue), encoded_item end end @@ -129,9 +129,9 @@ def queue_names end def remove_queue(queue) - @redis.pipelined do - @redis.srem(:queues, queue.to_s) - @redis.del(redis_key_for_queue(queue)) + @redis.pipelined do |piped| + piped.srem(:queues, queue.to_s) + piped.del(redis_key_for_queue(queue)) end end @@ -145,8 +145,8 @@ def remove_from_queue(queue,data) end # Private: do not call - def watch_queue(queue) - @redis.sadd(:queues, queue.to_s) + def watch_queue(queue, redis: @redis) + redis.sadd(:queues, queue.to_s) end # Private: do not call @@ -237,24 +237,24 @@ def worker_exists?(worker_id) end def register_worker(worker) - @redis.pipelined do - @redis.sadd(:workers, worker) - worker_started(worker) + @redis.pipelined do |piped| + piped.sadd(:workers, worker) + worker_started(worker, redis: piped) end end - def worker_started(worker) - @redis.set(redis_key_for_worker_start_time(worker), Time.now.to_s) + def worker_started(worker, redis: @redis) + redis.set(redis_key_for_worker_start_time(worker), Time.now.to_s) end def unregister_worker(worker, &block) - @redis.pipelined do - @redis.srem(:workers, worker) - @redis.del(redis_key_for_worker(worker)) - @redis.del(redis_key_for_worker_start_time(worker)) - @redis.hdel(HEARTBEAT_KEY, worker.to_s) + @redis.pipelined do |piped| + piped.srem(:workers, worker) + piped.del(redis_key_for_worker(worker)) + piped.del(redis_key_for_worker_start_time(worker)) + piped.hdel(HEARTBEAT_KEY, worker.to_s) - block.call + block.call redis: piped end end @@ -288,9 +288,9 @@ def worker_start_time(worker) end def worker_done_working(worker, &block) - @redis.pipelined do - @redis.del(redis_key_for_worker(worker)) - block.call + @redis.pipelined do |piped| + piped.del(redis_key_for_worker(worker)) + block.call redis: piped end end @@ -317,16 +317,16 @@ def stat(stat) @redis.get("stat:#{stat}").to_i end - def increment_stat(stat, by = 1) - @redis.incrby("stat:#{stat}", by) + def increment_stat(stat, by = 1, redis: @redis) + redis.incrby("stat:#{stat}", by) end def decremet_stat(stat, by = 1) @redis.decrby("stat:#{stat}", by) end - def clear_stat(stat) - @redis.del("stat:#{stat}") + def clear_stat(stat, redis: @redis) + redis.del("stat:#{stat}") end end end diff --git a/lib/resque/stat.rb b/lib/resque/stat.rb index 5af079af7..f18ed2606 100644 --- a/lib/resque/stat.rb +++ b/lib/resque/stat.rb @@ -35,8 +35,8 @@ def [](stat) # # Can optionally accept a second int parameter. The stat is then # incremented by that amount. - def incr(stat, by = 1) - data_store.increment_stat(stat,by) + def incr(stat, by = 1, **opts) + data_store.increment_stat(stat, by, **opts) end # Increments a stat by one. @@ -58,8 +58,8 @@ def >>(stat) end # Removes a stat from Redis, effectively setting it to 0. - def clear(stat) - data_store.clear_stat(stat) + def clear(stat, **opts) + data_store.clear_stat(stat, **opts) end end end diff --git a/lib/resque/worker.rb b/lib/resque/worker.rb index bddeae952..4f7573b14 100644 --- a/lib/resque/worker.rb +++ b/lib/resque/worker.rb @@ -697,9 +697,9 @@ def unregister_worker(exception = nil) kill_background_threads - data_store.unregister_worker(self) do - Stat.clear("processed:#{self}") - Stat.clear("failed:#{self}") + data_store.unregister_worker(self) do |**opts| + Stat.clear("processed:#{self}", **opts) + Stat.clear("failed:#{self}", **opts) end rescue Exception => exception_while_unregistering message = exception_while_unregistering.message @@ -726,8 +726,8 @@ def working_on(job) # Called when we are done working - clears our `working_on` state # and tells Redis we processed a job. def done_working - data_store.worker_done_working(self) do - processed! + data_store.worker_done_working(self) do |**opts| + processed!(**opts) end end @@ -745,9 +745,9 @@ def processed end # Tell Redis we've processed a job. - def processed! - Stat << "processed" - Stat << "processed:#{self}" + def processed!(**opts) + Stat.incr("processed", 1, **opts) + Stat.incr("processed:#{self}", 1, **opts) end # How many failed jobs has this worker seen? Returns an int. diff --git a/test/stat_test.rb b/test/stat_test.rb index b39f7c0d4..d159af085 100644 --- a/test/stat_test.rb +++ b/test/stat_test.rb @@ -10,7 +10,7 @@ def stat(stat) @stat[stat] end - def increment_stat(stat, by) + def increment_stat(stat, by = 1, redis: nil) @stat[stat] += by end @@ -18,7 +18,7 @@ def decrement_stat(stat, by) @stat[stat] -= by end - def clear_stat(stat) + def clear_stat(stat, redis: nil) @stat[stat] = 0 end end