Skip to content

Commit

Permalink
Rename Publishing Confirms related methods before it's too late
Browse files Browse the repository at this point in the history
Original names did not follow any convention I know. #confirmations to turn on
confirmations, #confirm to define a callback... Seriously?

How about this:

Channel#basic_select <=> confirm.select
Channel#on_ack       <=> basic.ack
Channel#on_nack      <=> basic.nack

At least #basic_select won't be confused with a collection reader and
  • Loading branch information
michaelklishin committed May 6, 2011
1 parent 88100b7 commit 5653b07
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
channel.open do
puts "Channel #{channel.id} is now open"

channel.confirmations
channel.use_publisher_confirmations!
channel.on_error do
puts "Oops, there is a channel-levle exceptions!"
end


channel.confirm do |basic_ack|
channel.on_ack do |basic_ack|
puts "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}"
end

Expand Down Expand Up @@ -53,4 +53,4 @@

EM.add_timer(3, show_stopper)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
channel.open do
puts "Channel #{channel.id} is now open"

channel.confirmations
channel.use_publisher_confirmations!
channel.on_error do
puts "Oops, there is a channel-levle exceptions!"
end


channel.confirm do |basic_ack|
channel.on_ack do |basic_ack|
puts "Received basic_ack: multiple = #{basic_ack.multiple}, delivery_tag = #{basic_ack.delivery_tag}"
end

Expand All @@ -43,4 +43,4 @@

EM.add_timer(3, show_stopper)
end
end
end
116 changes: 54 additions & 62 deletions lib/amq/client/extensions/rabbitmq/confirm.rb
Original file line number Diff line number Diff line change
@@ -1,60 +1,51 @@
# encoding: utf-8

# === Purpose === #
# In case that the broker crashes, some messages can get lost.
# Thanks to this extension, broker sends Basic.Ack when the message
# is processed by the broker. In case of persistent messages, it must
# be written to disk or ack'd on all the queues it was delivered to.
# However it doesn't have to be necessarily 1:1, because the broker
# can send Basic.Ack with multi flag to acknowledge multiple messages.
#
# So it provides clients a lightweight way of keeping track of which
# messages have been processed by the broker and which would need
# re-publishing in case of broker shutdown or network failure.
#
# Transactions are solving the same problem, but they are very slow:
# confirmations are more than 100 times faster.
#
# === Workflow === #
# * Client asks broker to confirm messages on given channel (Confirm.Select).
# * Broker sends back Confirm.Select-Ok, unless we sent Confirm.Select with nowait=true.
# * After each published message, the client receives Basic.Ack from the broker.
# * If something bad happens inside the broker, it sends Basic.Nack.
#
# === Gotchas === #
# Note that we don't keep track of messages awaiting confirmation.
# It'd add a huge overhead and it's impossible to come up with one-suits-all solution.
# If you want to create such module, you'll probably want to redefine Channel#after_publish,
# so it will put messages into a queue and then handlers for Basic.Ack and Basic.Nack.
# This is the reason why we pass every argument from Exchange#publish to Channel#after_publish.
# You should not forget though, that both of these methods can have multi flag!
#
# Transactional channel cannot be put into confirm mode and a confirm
# mode channel cannot be made transactional.
#
# If the connection between the publisher and broker drops with outstanding
# confirms, it does not necessarily mean that the messages were lost, so
# republishing may result in duplicate messages.

# === Links === #
# http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
# http://www.rabbitmq.com/amqp-0-9-1-quickref.html#class.confirm
# http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.ack

puts "in confirm.rb"

module AMQ
module Client
module Extensions
module RabbitMQ
# h2. Purpose
# In case that the broker crashes, some messages can get lost.
# Thanks to this extension, broker sends Basic.Ack when the message
# is processed by the broker. In case of persistent messages, it must
# be written to disk or ack'd on all the queues it was delivered to.
# However it doesn't have to be necessarily 1:1, because the broker
# can send Basic.Ack with multi flag to acknowledge multiple messages.
#
# So it provides clients a lightweight way of keeping track of which
# messages have been processed by the broker and which would need
# re-publishing in case of broker shutdown or network failure.
#
# Transactions are solving the same problem, but they are very slow:
# confirmations are more than 100 times faster.
#
# h2. Workflow
# * Client asks broker to confirm messages on given channel (Confirm.Select).
# * Broker sends back Confirm.Select-Ok, unless we sent Confirm.Select with nowait=true.
# * After each published message, the client receives Basic.Ack from the broker.
# * If something bad happens inside the broker, it sends Basic.Nack.
#
# h2. Gotchas
# Note that we don't keep track of messages awaiting confirmation.
# It'd add a huge overhead and it's impossible to come up with one-suits-all solution.
# If you want to create such module, you'll probably want to redefine Channel#after_publish,
# so it will put messages into a queue and then handlers for Basic.Ack and Basic.Nack.
# This is the reason why we pass every argument from Exchange#publish to Channel#after_publish.
# You should not forget though, that both of these methods can have multi flag!
#
# Transactional channel cannot be put into confirm mode and a confirm
# mode channel cannot be made transactional.
#
# If the connection between the publisher and broker drops with outstanding
# confirms, it does not necessarily mean that the messages were lost, so
# republishing may result in duplicate messages.

# h2. Learn more
# @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms
# @see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#class.confirm
# @see http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.ack
module Confirm
module ChannelMixin
# Boolean value expressing whether confirmations are
# on or off, aka whether Confirm.Select was sent or not.
#
# @api public
# @return [Boolean] Whether confirmations are on or off.
attr_reader :confirmations

# Change publisher index. Publisher index is incremented
# by 1 after each Basic.Publish starting at 1. This is done
Expand Down Expand Up @@ -106,22 +97,23 @@ def after_publish(*args)
# @return [self] self.
#
# @see #confirm
def confirmations(nowait = false, &block)
if @confirmations
raise "Confirmations are already activated!"
end

def use_publisher_confirmations!(nowait = false, &block)
if nowait && block
raise "You can't use Confirm.Select with nowait=true and a callback at the same time."
end

@confirmations = true
self.redefine_callback(:confirmations, &block)
@uses_publisher_confirmations = true
self.redefine_callback(:confirm_select, &block)
@client.send(Protocol::Confirm::Select.encode(@id, nowait))

self
end

# @return [Boolean]
def uses_publisher_confirmations?
@uses_publisher_confirmations
end # uses_publisher_confirmations?


# Turn on confirmations for this channel and, if given,
# register callback for basic.ack from the broker.
Expand All @@ -134,8 +126,8 @@ def confirmations(nowait = false, &block)
# @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance.
#
# @return [self] self.
def confirm(nowait = false, &block)
self.confirmations unless @confirmations
def on_ack(nowait = false, &block)
self.use_publisher_confirmations! unless self.uses_publisher_confirmations?

self.define_callback(:ack, &block) if block

Expand All @@ -144,10 +136,10 @@ def confirm(nowait = false, &block)


# Register error callback for Basic.Nack. It's called
# when the broker reject given message(s).
# when message(s) is rejected.
#
# @return [self] self
def confirm_failed(&block)
def on_nack(&block)
self.define_callback(:nack, &block) if block

self
Expand All @@ -165,7 +157,7 @@ def confirm_failed(&block)
#
# @api plugin
def handle_select_ok(method)
self.exec_callback_once(:confirmations, method)
self.exec_callback_once(:confirm_select, method)
end

# Handler for Basic.Ack. By default, it just
Expand Down Expand Up @@ -193,7 +185,7 @@ def handle_basic_nack(method)
def reset_state!
super

@confirmations = false
@uses_publisher_confirmations = false
end


Expand Down

0 comments on commit 5653b07

Please sign in to comment.