Skip to content

Commit

Permalink
Merge pull request #325 from webit-de/master
Browse files Browse the repository at this point in the history
refactor publisher connection logic
  • Loading branch information
michaelklishin authored Nov 2, 2017
2 parents 5043c16 + ac2d572 commit f349c9f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 42 deletions.
22 changes: 13 additions & 9 deletions lib/sneakers/publisher.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
module Sneakers
class Publisher

attr_reader :exchange, :channel

def initialize(opts = {})
@mutex = Mutex.new
@opts = Sneakers::CONFIG.merge(opts)
# If we've already got a bunny object, use it. This allows people to
# specify all kinds of options we don't need to know about (e.g. for ssl).
@bunny = @opts[:connection]
end

def publish(msg, options = {})
@mutex.synchronize do
ensure_connection! unless connected?
end
ensure_connection!
to_queue = options.delete(:to_queue)
options[:routing_key] ||= to_queue
Sneakers.logger.info {"publishing <#{msg}> to [#{options[:routing_key]}]"}
@exchange.publish(ContentType.serialize(msg, options[:content_type]), options)
end


attr_reader :exchange, :channel
def ensure_connection!
@mutex.synchronize do
connect! unless connected?
end
end

private
def ensure_connection!
# If we've already got a bunny object, use it. This allows people to
# specify all kinds of options we don't need to know about (e.g. for ssl).
@bunny = @opts[:connection]
def connect!
@bunny ||= create_bunny_connection
@bunny.start
@channel = @bunny.create_channel
Expand Down
74 changes: 42 additions & 32 deletions spec/sneakers/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'fixtures/integration_worker'

require "rabbitmq/http/client"
require 'timeout'


describe "integration" do
Expand All @@ -17,23 +18,30 @@ def integration_log(msg)
puts msg if ENV['INTEGRATION_LOG']
end

def rmq_addr
@rmq_addr ||= compose_or_localhost("rabbitmq")
end

def admin
@admin ||=
begin
puts "RABBITMQ is at #{rmq_addr}"
RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest")
rescue
fail "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}"
end
end

def prepare
# clean up all integration queues; admin interface must be installed
# in integration env
rmq_addr = compose_or_localhost("rabbitmq")
puts "RABBITMQ is at #{rmq_addr}"
begin
admin = RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest")
qs = admin.list_queues
qs.each do |q|
name = q.name
if name.start_with? 'integration_'
admin.delete_queue('/', name)
integration_log "cleaning up #{name}."
end
qs = admin.list_queues
qs.each do |q|
name = q.name
if name.start_with? 'integration_'
admin.delete_queue('/', name)
integration_log "cleaning up #{name}."
end
rescue
puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}"
end

Sneakers.clear!
Expand Down Expand Up @@ -85,22 +93,27 @@ def start_worker(w)
pid
end

def any_consumers
rmq_addr = compose_or_localhost("rabbitmq")
result = false
begin
admin = RabbitMQ::HTTP::Client.new("http://#{rmq_addr}:15672/", username: "guest", password: "guest")
qs = admin.list_queues
qs.each do |q|
if q.name.start_with? 'integration_'
puts "We see #{q.consumers} consumers on #{q.name}"
return true if q.consumers > 0
end
def consumers_count
qs = admin.list_queues
qs.each do |q|
if q.name.start_with? 'integration_'
return [q.consumers, q.name]
end
return false
rescue
puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n#{$!}"
end
return [0, nil]
end

def assert_any_consumers(consumers_should_be_there, maximum_wait_time = 15)
Timeout::timeout(maximum_wait_time) do
loop do
consumers, queue = consumers_count
fail 'no queues so no consumers' if consumers_should_be_there && !queue
puts "We see #{consumers} consumers on #{queue}"
(consumers_should_be_there == consumers.zero?) ? sleep(1) : return
end
end
rescue Timeout::Error
fail "Consumers should #{'not' unless consumers_should_be_there} be here but #{consumers} consumers were after #{maximum_wait_time}s waiting."
end

it 'should be possible to terminate when queue is full' do
Expand All @@ -116,11 +129,10 @@ def any_consumers
end

pid = start_worker(IntegrationWorker)
any_consumers.must_equal true
assert_any_consumers true
integration_log "Killing #{pid} now!"
Process.kill("TERM", pid)
sleep(2)
any_consumers.must_equal false
assert_any_consumers false
end

it 'should pull down 100 jobs from a real queue' do
Expand All @@ -144,5 +156,3 @@ def any_consumers

end
end


2 changes: 1 addition & 1 deletion spec/sneakers/publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
p.instance_variable_set(:@exchange, xchg)

mock(p).connected? { true }
mock(p).ensure_connection!.times(0)
mock(p).connect!.times(0)

p.publish('test msg', to_queue: 'downloads')
end
Expand Down

0 comments on commit f349c9f

Please sign in to comment.