diff --git a/lib/march_hare/channel.rb b/lib/march_hare/channel.rb index 32f2f7d..04a111c 100644 --- a/lib/march_hare/channel.rb +++ b/lib/march_hare/channel.rb @@ -614,10 +614,14 @@ def basic_get(queue, auto_ack) end end - def basic_consume(queue, auto_ack, consumer) + def basic_consume(queue, auto_ack, consumer_tag=nil, consumer) consumer.auto_ack = auto_ack tag = converting_rjc_exceptions_to_ruby do - @delegate.basic_consume(queue, auto_ack, consumer) + if consumer_tag + @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer) + else + @delegate.basic_consume(queue, auto_ack, consumer) + end end self.register_consumer(tag, consumer) diff --git a/lib/march_hare/consumers/base.rb b/lib/march_hare/consumers/base.rb index 35141ef..57de897 100644 --- a/lib/march_hare/consumers/base.rb +++ b/lib/march_hare/consumers/base.rb @@ -84,7 +84,7 @@ def terminated? def recover_from_network_failure @terminated.set(false) @cancelled.set(false) - @consumer_tag = @channel.basic_consume(@queue.name, @auto_ack, self) + @consumer_tag = @channel.basic_consume(@queue.name, @auto_ack, @consumer_tag, self) @consumer_tag end diff --git a/lib/march_hare/queue.rb b/lib/march_hare/queue.rb index 0688b02..5421661 100644 --- a/lib/march_hare/queue.rb +++ b/lib/march_hare/queue.rb @@ -180,7 +180,7 @@ def subscribe(opts = {}, &block) end def subscribe_with(consumer, opts = {}) - @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), consumer) + @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer) consumer.consumer_tag = @consumer_tag @default_consumer = consumer diff --git a/spec/higher_level_api/integration/basic_consume_spec.rb b/spec/higher_level_api/integration/basic_consume_spec.rb index a89269a..88a0a3f 100644 --- a/spec/higher_level_api/integration/basic_consume_spec.rb +++ b/spec/higher_level_api/integration/basic_consume_spec.rb @@ -25,6 +25,24 @@ expect(consumer).not_to be_active expect(consumer).to be_cancelled end + + it "has a consumer_tag" do + ch = connection.create_channel + q = ch.queue("", :exclusive => true) + + consumer1 = q.subscribe(:blocking => false) { |_, _| nil } + + sleep(1.0) + expect(consumer1.consumer_tag).to match(/^amq.ctag/) + consumer1.cancel + + custom_consumer_tag = "unique_consumer_tag_#{rand(1_000)}" + consumer2 = q.subscribe(:consumer_tag => custom_consumer_tag, :blocking => false) { |_, _| nil } + + expect(consumer2.consumer_tag).to eq(custom_consumer_tag) + + consumer2.cancel + end end