-
-
Notifications
You must be signed in to change notification settings - Fork 364
Idempotence
Idempotence (which you may read a formal definition of on Wikipedia), when we are talking about messaging, is when a message redelivery can be handled without ending up in an unintended state.
Since we always run the risk of processing the same message twice, it is a good idea to think a little bit about idempotence from time to time.
Please read the page about delivery guarantees if it is not clear at this point why we might process the same message twice.
So how do we deal with the risk of receiving the same message twice?
Generally, the best way to deal with message redeliveries is to make the processing of each message naturally idempotent.
Natural idempotence arises when the processing of a message consists of calling an idempotent method on a domain object, like
obj.MarkAsDeleted();
or
obj.UpdatePeriod(message.NewPeriod);
or something like that, or if the processing of the message is of the upsert type, e.g. like
var itemRows = message.Items.Select(i => new ItemRow(i.Id, i.Name));
await debaser.Upsert(itemRows);
(in this case using the nifty Debaser library to do that, but some manually written SQL with appropriate unique constraints can do the trick too...)
You can come a long way, simply by making much of your message handler code idempotent like this.
Another way of making message processing idempotent, is to simply track IDs of processed messages explicitly, and then make your code handle a redelivery.
Assuming that you are keeping track of message IDs by using an IMessageTracker
that uses the same transactional data store as the rest of your work, your code might look somewhat like this:
readonly IMessageContext _messageContext;
readonly IMessageTracker _messageTracker;
public SomeMessageHandler(IMessageContext messageContext, IMessageTracker messageTracker)
{
_messageContext = messageContext;
_messageTracker = messageTracker;
}
public async Task Handle(SomeMessage message)
{
var messageId = _messageContext.Headers[Headers.MessageId];
if (await _messageTracker.HasProcessed(messageId))
{
// REMEMBER TO SEND/PUBLISH ANY OUTGOING MESSAGES AGAIN
// IN HERE!
return;
}
// do the work here
// ...
// remember that this message has been processed
await _messageTracker.MarkAsProcessed(messageId);
}
It is of course crucial that the IMessageTracker
's underlying data store supports unique constraints on the message IDs passed to it, and it must use the same transaction as the rest of your work.
When you are using processing managers a.k.a. "sagas", you often want to make them idempotent. This can be done quite easily with Rebus, because it has the concept of "idempotent sagas".
An idempotent saga simply stores IDs of all handled messages, and it contains an outbox of all sent/published messages, meaning that the saga itself is capable of handling all the tricky bits associated with a redelivered message.
MarkAsComplete()
then that outbox will be DELETED ALONG WITH THE SAGA DATA. So you should NOT rely on the outbox when completing the saga! IOW you should never
// ❗❗❗
await bus.Send(aMessage);
// and
MarkAsComplete();
// ❗❗❗
in the same handler, please mark the saga as complete with a separate message:
await bus.Send(aMessage);
await bus.SendLocal(new MarkSagaAsComplete(Data.Id));
(of course handling the MarkSagaAsComplete
message simply by calling MarkAsComplete()
).
You create an idempotent saga by first enabling the feature:
.Options(o =>
{
o.EnableIdempotentSagas();
})
and then your saga handler must be derived from IdempotentSaga<>
like this:
public class MySaga : IdempotentSaga<MySagaData>, ...
{
...
}
and your saga data must be derived from IdempotentSagaData
like this:
public class MySagaData : IdempotentSagaData
{
//.. add your own stuff here
}
When using idempotent sagas, it's important that they have a limited lifetime (or, more specifically: Send a limited number of outgoing messages!), because they will store all of them as part of the saga data.
When a message can be redelivered, and it follows from the order of operations when handling a message that we are unsure whether any outgoing messages were sent, you must always ensure that outgoing sent and published messages are sent and published every time, even though the processing of a message can be identified as a redelivery.
Idempotent sagas handle this aspect automatically, but in all other cases you need to be careful to ensure that this is done.
Basic stuff
- Home
- Introduction
- Getting started
- Different bus modes
- How does rebus compare to other .net service buses?
- 3rd party extensions
- Rebus versions
Configuration
Scenarios
Areas
- Logging
- Routing
- Serialization
- Pub sub messaging
- Process managers
- Message context
- Data bus
- Correlation ids
- Container adapters
- Automatic retries and error handling
- Message dispatch
- Thread safety and instance policies
- Timeouts
- Timeout manager
- Transactions
- Delivery guarantees
- Idempotence
- Unit of work
- Workers and parallelism
- Wire level format of messages
- Handler pipeline
- Polymorphic message dispatch
- Persistence ignorance
- Saga parallelism
- Transport message forwarding
- Testing
- Outbox
- Startup/shutdown
Transports (not a full list)
Customization
- Extensibility
- Auto flowing user context extensibility example
- Back off strategy
- Message compression and encryption
- Fail fast on certain exception types
Pipelines
- Log message pipelines
- Incoming messages pipeline
- Incoming step context
- Outgoing messages pipeline
- Outgoing step context
Prominent application services