-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-22: Dead Letter Topic #2400
Conversation
…MultipleConsumers
…MultipleConsumers
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
…MultipleConsumers
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Consumer; | ||
|
||
public class Quorum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It maybe better to add some comments, such as how to use it, and the meaning of each public methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
.create(); | ||
|
||
for (int i = 0; i < sendMessages; i++) { | ||
producer.send("Hello Pulsar!".getBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about combine "i" together with the message content? it may be useful a little to debug.
consumer.acknowledge(message); | ||
totalInDeadLetter++; | ||
} while (totalInDeadLetter < sendMessages); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we assert some thing to do a verification? such as messages are all received and acked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm. add some nit comments.
overall looks good to me. @merlimat can you take a look at this? |
run java8 tests |
this.cursor = cursor; | ||
this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); | ||
this.topic = topic; | ||
this.messagesToReplay = new ConcurrentLongPairSet(512, 2); | ||
this.messagesToDeadLetter = new HashSet<>(8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
umm.. this can cause in high gc for broker. Essentially we would like to avoid storing objects in heap with relatively some what long life-cycle. therefore, in past release we had an effort to clean up storing PositionImp objects from code base. so, for broker serving high throughput with low latency requirement might want to having this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, @codelipenghui, we may need following the way of messagesToReplay
, which store ledgerId and entryId as primitive value, and then compose a Position from(ledgerId + entryId) when using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok!
if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) { | ||
try { | ||
if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) { | ||
deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if tenant already has topic with postfix DLQ
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better suggest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it is up to application to decide. In this change, application can choose its own name, and only use “-DLQ” when the application doesn’t specify.
…alue, and then compose a Position from(ledgerId + entryId) when using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I would prefer to handle most of logic for the DLQ in
the client library rather than the broker, just leaving the
minimum support required on server side.
This, in my view has several advantages:
-
No need to worry about use of resources in broker (which
could be a problem when many topics would want to use DLQ
with, potentially, arbitrary settings. -
Clear isolation from different consumers
Another point that I think should be tied to the DLQ is the
concept of "negative acks". Currently we only have positive acks
for messages. Application might decide not to ack a message (and
possibly set a timeout for redelivery), but there's no way to
immediately tell the broker that a message cannot be processed.
I think negative ack in consumer interface would make it more
clear, from a broker perspective that the "processing" of the
message has indeed failed, not just the delivery to consumer.
The case of ack timeout, could then be folded as a type of
negative ack.
My suggestion would be :
-
Have the broker to keep track of negative acks received by
consumers for a particular message id. This would involve to
augment thependingAcks
map (which has a "free" long spot in
value field) to also have the count. -
Broker will include the "deliveryCount" as part of
CommandMessage
when pushing messages to a consumer -
The count of redeliveries in broker is kept as
a "best-effort". If a broker crashes, some message might end up
getting redeliveried few more times than configured. I think
this is a reasonable tradeoff for this functionality. -
Consumer object in client library will have the DLQ configuration
-
When a consumer receives a message whose "deliveryCount" exceeds the
consumer's own max, the client library will re-publish it on the DLQ.
Finally, one more item to consider is that, by default, if there
is no subscription on the DLQ topic, the messages will be dropped
immediately.
For messages to be retained, we would either:
-
Make sure at least a subscription exist in the DLQ topic
-
Force to use a default subscription name (eg:
DLQ
) -
Allow to configure the subscription name as part of the DLQ
policies -
Enforce to have data retention on the DLQ topic. This might
be complicated since the retention is generally applied at
the namespace level.
/** | ||
* Return entry at the position. | ||
*/ | ||
Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can already be done through replayEntries()
and asyncReplayEntries()
methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When i use asyncReplayEntries(), i should change ConcurrentLongPairSet to Set then call asyncReplayEntries() and in asyncReplayEntries() implement call ledger.asyncReadEntry() foreach. So i think i need a readEntry() method to read entry that no need to change ConcurrentLongPairSet to Set.
* Set max un-acked messages per consumer. | ||
* This config should less than broker config, if not, config will not enable. 0 is not limit, default is 0 | ||
*/ | ||
ConsumerBuilder<T> maxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From API perspective it might be good to encapsulate all the DLQ related settings into a single DeadLetterQueuePolicy
optional int32 maxRedeliveryCount = 14; | ||
// Name of Dead Letter Topic. | ||
// If not set, pulsar broker will generate with topic name and subscription name and suffix with -DLQ | ||
optional string deadLetterTopic = 15; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a bit dangerous to leave the option to have a large number of possible topics for which we need to create producer objects inside the broker.
I think this PR has already made things pretty simple. it keeps redelivery count in memory and doesn't persistent. the major difference I can see between the approach here and your approach is "who" publishes messages to DLQ: the approach here in favor of broker doing publishes, your approach prefer client doing publishes to DLQ. I am not very convinced on doing DLQ logic in client, for multiple reasons: a) doing DLQ in client actually make things a bit more complicated. because the redelivery count is tracked at broker, but the publishing happens at the client. Things like configurations/policies are separated in different places, which can result in inconsistent behaviors, which make debugging much harder when issue occurs.
"negative acks" is a good thing to have. However I don't think DLQ has to depend on "negative acks". because the current "negative acks" pulsar has is ack timeout. for DLQ, it doesn't really matter it is a ack timeout or explicit ack timeout. We don't really need to tie things together. If DLQ goes first, we still can introduce "negative acks" later, and it will not affect DLQ logic. From this spective, it is making more sense to do DLQ in broker side. because when DLQ happens at broker side, when we introduce explicitly "negative acks" later, there will be very minimal code changes at both client and broker sides.
I think the simplest thing is just let user configure message retention in DLQ topic. That is what typically cloud providers offer. Not sure we need to get into the business of managing subscriptions for DLQ. |
I'm still convinced it's much easier to approach this problem in the Other than the scalability concerns and the complexity in changing the
The current PR is anyway configuring the DLQ in client API. If we
This I agree. It would require to do implementation in both Java and C++
Well, the difference here would be that broker just need to keep only If the broker is doing the publishes, then it needs to create and cache
Optmizations like storing the counter would possible in either case.
Sure, we can defer "negative acks" changes and just focus on current
Again, the problem lies with policies which are currently namespace I believe the best compromise solution we can adopt initially is to
With this command, one can make sure that every message pushed to the |
I agree with @merlimat , I can see two major drawbacks with server-side implementation, which may prevent us to enable this feature with high traffic pulsar system: DLQ mainly requires count of message redelivery and then publish max-redelivered message to the DLQ. Same message can be re-delivered to different consumers so, client can't track of redelivery-count of a specific message. so, client can rely on broker to get that count and publish that message to DLQ and ack the message on original topic. I think client-side implementation can make it simpler to use this feature. |
frankly speaking, I am not sure about the producer concerns: producer memory, authorization, and schema. if you model DLQ topic as replicating messages whose delivery count is larger than max-delivery-count, it should be no difference than cross-data-center replication. but anyway, since there are two strong opinions on doing this at client-side. I am fine with doing publish at client side. just keep one thing in mind, this feature might end up never happening at some of the languages (for example, websocket) unless there are efforts spent on this. Let me summarize the discussion in this PR to make sure everyone is on same page about the scope of this PR. The suggestion for the implementation will be:
for other comments from Matteo will not be included in PR. but will be done via future improvements:
@merlimat @rdhabalia can you confirm the summary is correct? @codelipenghui does this approach work for your application? |
@sijie Yes, it's approach work for me. We should send a request to broker for get redelivery count per message when consumer send redeliverUnackMessages request before? |
can you answer @codelipenghui 's question? |
👍
I'm not saying it's not possible to make it efficient and safe, but that it would require more work to cover all conditions
The dispatcher could include the redelivery count when pushing messages to the consumer (in |
I have a idea for get the delivery count from broker. When consumer send a redeliveryUnackMessages request then broker process the request, broker can return the messageIds(if consumer can contains the message payload) which should send to the DLQ. For this solution, broker must keep the max redelivery count per subscription. A message possible send to different consumer by dispatcher, if keep max redelivery count in consumer, it's difficult to confirm how many times for redelivery. I think that the max redelivery count should use the last consumer config in a subscription. I think this solution is smaller spending. |
This PR can close now. I have a new PR-2508 for PIP-22 Dead Letter Topic. |
Closed this PR and use #2508 for it. |
Motivation
Fixes #189
When consumer got messages from pulsar, It's difficult to ensure every message can be consume success. Pulsar support message redelivery feature by set acknowledge timeout when create a new consumer. This is a good feature guarantee consumer will not lost messages.
But however, some message will redelivery so many times possible, even to the extent that it can be never stop.
So, It's necessary to support a feature to control it by pulsar. Users can use this feature and customize this feature to control the message redelivery behavior. The feature named Dead Letter Topic.
Modifications
Consumer can set maximum number of redeliveries by java client.
Consumer can set the name of Dead Letter Topic by java client, It’s not necessary.
Message exceeding the maximum number of redeliveries should send to Dead Letter Topic and acknowledged automatic.
Result
If consumer enable future of dead letter topic. When Message exceeding the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic.