From be1bb3c108d32b5521eeddabbe8bcddd60bc719a Mon Sep 17 00:00:00 2001 From: Christoph Wagner Date: Mon, 30 Oct 2017 12:17:17 +0100 Subject: [PATCH 1/3] refactor publisher connection logic * separate connection stuff from publish method to better reuse Sneakers::Publisher for own derived classes * fix warning: instance variable @bunny not initialized when connected? is called --- lib/sneakers/publisher.rb | 16 ++++++++++------ spec/sneakers/publisher_spec.rb | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/sneakers/publisher.rb b/lib/sneakers/publisher.rb index 294d7597..340daf71 100644 --- a/lib/sneakers/publisher.rb +++ b/lib/sneakers/publisher.rb @@ -3,12 +3,13 @@ class Publisher def initialize(opts = {}) @mutex = Mutex.new @opts = Sneakers::CONFIG.merge(opts) + # If we've already got a bunny object, use it. This allows people to + # specify all kinds of options we don't need to know about (e.g. for ssl). + @bunny = @opts[:connection] end def publish(msg, options = {}) - @mutex.synchronize do - ensure_connection! unless connected? - end + ensure_connection! to_queue = options.delete(:to_queue) options[:routing_key] ||= to_queue Sneakers.logger.info {"publishing <#{msg}> to [#{options[:routing_key]}]"} @@ -20,9 +21,12 @@ def publish(msg, options = {}) private def ensure_connection! - # If we've already got a bunny object, use it. This allows people to - # specify all kinds of options we don't need to know about (e.g. for ssl). - @bunny = @opts[:connection] + @mutex.synchronize do + connect! unless connected? + end + end + + def connect! @bunny ||= create_bunny_connection @bunny.start @channel = @bunny.create_channel diff --git a/spec/sneakers/publisher_spec.rb b/spec/sneakers/publisher_spec.rb index 12a31f47..31c12261 100644 --- a/spec/sneakers/publisher_spec.rb +++ b/spec/sneakers/publisher_spec.rb @@ -63,7 +63,7 @@ p.instance_variable_set(:@exchange, xchg) mock(p).connected? { true } - mock(p).ensure_connection!.times(0) + mock(p).connect!.times(0) p.publish('test msg', to_queue: 'downloads') end From aa9ccbc5c4f1306580f57e00cd7949f36d2858c5 Mon Sep 17 00:00:00 2001 From: Christoph Wagner Date: Mon, 30 Oct 2017 12:54:15 +0100 Subject: [PATCH 2/3] ensure_connection! should be public this way we can do something with the connection before publishing --- lib/sneakers/publisher.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sneakers/publisher.rb b/lib/sneakers/publisher.rb index 340daf71..0e50f35d 100644 --- a/lib/sneakers/publisher.rb +++ b/lib/sneakers/publisher.rb @@ -1,5 +1,8 @@ module Sneakers class Publisher + + attr_reader :exchange, :channel + def initialize(opts = {}) @mutex = Mutex.new @opts = Sneakers::CONFIG.merge(opts) @@ -16,16 +19,13 @@ def publish(msg, options = {}) @exchange.publish(ContentType.serialize(msg, options[:content_type]), options) end - - attr_reader :exchange, :channel - - private def ensure_connection! @mutex.synchronize do connect! unless connected? end end + private def connect! @bunny ||= create_bunny_connection @bunny.start From ac2d572e73f6e938746d75ca688cc03f8e5709ee Mon Sep 17 00:00:00 2001 From: Christoph Wagner Date: Mon, 30 Oct 2017 15:36:24 +0100 Subject: [PATCH 3/3] fix timing issues on integration spec * use timeout and retry if expected consumers are not there * refactor rabbitmq stuff a little in integration spec --- spec/sneakers/integration_spec.rb | 74 ++++++++++++++++++------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/spec/sneakers/integration_spec.rb b/spec/sneakers/integration_spec.rb index 472d32c6..42abf77e 100644 --- a/spec/sneakers/integration_spec.rb +++ b/spec/sneakers/integration_spec.rb @@ -4,6 +4,7 @@ require 'fixtures/integration_worker' require "rabbitmq/http/client" +require 'timeout' describe "integration" do @@ -17,23 +18,30 @@ def integration_log(msg) puts msg if ENV['INTEGRATION_LOG'] end + def rmq_addr + @rmq_addr ||= compose_or_localhost("rabbitmq") + end + + def admin + @admin ||= + begin + puts "RABBITMQ is at #{rmq_addr}" + RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest") + rescue + fail "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}" + end + end + def prepare # clean up all integration queues; admin interface must be installed # in integration env - rmq_addr = compose_or_localhost("rabbitmq") - puts "RABBITMQ is at #{rmq_addr}" - begin - admin = RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest") - qs = admin.list_queues - qs.each do |q| - name = q.name - if name.start_with? 'integration_' - admin.delete_queue('/', name) - integration_log "cleaning up #{name}." - end + qs = admin.list_queues + qs.each do |q| + name = q.name + if name.start_with? 'integration_' + admin.delete_queue('/', name) + integration_log "cleaning up #{name}." end - rescue - puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}" end Sneakers.clear! @@ -85,22 +93,27 @@ def start_worker(w) pid end - def any_consumers - rmq_addr = compose_or_localhost("rabbitmq") - result = false - begin - admin = RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest") - qs = admin.list_queues - qs.each do |q| - if q.name.start_with? 'integration_' - puts "We see #{q.consumers} consumers on #{q.name}" - return true if q.consumers > 0 - end + def consumers_count + qs = admin.list_queues + qs.each do |q| + if q.name.start_with? 'integration_' + return [q.consumers, q.name] end - return false - rescue - puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}" end + return [0, nil] + end + + def assert_any_consumers(consumers_should_be_there, maximum_wait_time = 15) + Timeout::timeout(maximum_wait_time) do + loop do + consumers, queue = consumers_count + fail 'no queues so no consumers' if consumers_should_be_there && !queue + puts "We see #{consumers} consumers on #{queue}" + (consumers_should_be_there == consumers.zero?) ? sleep(1) : return + end + end + rescue Timeout::Error + fail "Consumers should #{'not' unless consumers_should_be_there} be here but #{consumers} consumers were after #{maximum_wait_time}s waiting." end it 'should be possible to terminate when queue is full' do @@ -116,11 +129,10 @@ def any_consumers end pid = start_worker(IntegrationWorker) - any_consumers.must_equal true + assert_any_consumers true integration_log "Killing #{pid} now!" Process.kill("TERM", pid) - sleep(2) - any_consumers.must_equal false + assert_any_consumers false end it 'should pull down 100 jobs from a real queue' do @@ -144,5 +156,3 @@ def any_consumers end end - -