-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Per Message Unacknowledged Redelivery #51
Conversation
Thank you for submitting this pull request, however I do not see a valid CLA on file for you. Before we can merge this request please visit https://yahoocla.herokuapp.com/ and agree to the terms. Thanks! 😄 |
CLA is valid! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks goood. Just a few comments.
Please few unit tests to verify the redelivery of just the intended messages. Take a look at tests like https://github.com/yahoo/pulsar/blob/master/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java to see how to setup the mocked broker for the tests
.map(messageIdData -> PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId())) | ||
.filter(position -> { | ||
if (!pendingAcks.remove(position)) { | ||
position.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to call position.recycle()
. It's a no-op method that should be removed anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection | ||
* breaks, the messages are redelivered after reconnect. | ||
*/ | ||
void redeliverUnacknowledgedMessages(List<MessageIdImpl> messageIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the use case for exposing this in the public API? I'm not against it, but once it's out, we cannot remove it, so we should only add it if it's really needed.
Also, the method should be called redeliverMessages(messageIds)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we should take MessageId
and not MessageIdImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the use case for exposing this in the public API? I'm not against it, but once it's out, we cannot remove it, so we should only add it if it's really needed.
Also, the method should be called redeliverMessages(messageIds)ç
I believe there would be no need for exposing this, at least not now, but maybe we could make void redeliverUnacknowledgedMessages()
have the default behavior of not redelivering all messages.
I think this should not be called redeliverMessages(messageIds)
because the broker actually makes sure it is redelivering only unacknowledged messages.
And we should take MessageId and not MessageIdImpl
I've seen a lot of things using MessageIdImpl, that's why I went that way, it's a trivial change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following comment by @rdhabalia should we switch method signature to receive MessageId
? UnAckedMessageTracker
keeps MessageIdImpl
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sboobna does it really make sense to receive MessageId
? It's just an empty interface
|
||
import java.io.Closeable; | ||
|
||
public interface UnAckedMessageTracker extends Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an internal interface and should not be in the public API java package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would probably like to provide our own implementation of UnAckedMessageTracker
sometime in the future, or have Pulsar provide a more precise tracking than the current one, that's why I made this public.
Is there any reason you're against this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep that change separate from this PR for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
|
||
private boolean perMessageRedeliverUnacknowledged = false; | ||
|
||
private UnAckedMessageTracker unAckedMessageTracker = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
@@ -47,6 +47,12 @@ | |||
|
|||
private long ackTimeoutMillis = 0; | |||
|
|||
private int maxRedeliverUnacknowledgedMessagesBatch = 1000; | |||
|
|||
private boolean perMessageRedeliverUnacknowledged = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need this flag, the per-message redelivery should always be enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
ClientCnx cnx = cnx(); | ||
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) { | ||
MessageIdData.Builder builder = MessageIdData.newBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, let's leave the protobuf operation to be done in Commands
class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now Commands
class cannot receive MessageId
because pulsar-common
doesn't and shouldn't have dependency on anything else.
How would you create a builder for this?
I've checked if I could find a similar case in Commands
but couldn't find one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat any thought on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. It's good as it is then.
MessageIdData.Builder builder = MessageIdData.newBuilder(); | ||
List<MessageIdData> messageIdDatas = messageIds.stream() | ||
.map(messageId -> { | ||
builder.setPartition(messageId.getPartitionIndex()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a new builder instance each time the mapping function is called?
Also, the builder itself should be recycled after builder.build()
is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this to avoid generating garbage or cpu time, since we're replacing the same fields on every iteration there should be no problems with reusing the same builder instance and should save calls to recycler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks good
@@ -262,6 +263,12 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { | |||
|
|||
} | |||
|
|||
@Override | |||
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) { | |||
// It doesn't seem to make sense to redeliver single messages to single consumers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, we cannot redeliver individual messages in this case or we would break ordering.
* <i>(default: 1000)</i> | ||
*/ | ||
public int getMaxRedeliverUnacknowledgedMessagesBatch() { | ||
return maxRedeliverUnacknowledgedMessagesBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but what is the reason for this config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, instead of having a user-configurable setting, we could have an internal limit on how many messages to ask for redelivery with a single protobuf command.
Eg.: each redelivery command can contain up to 1000 position. If there are more than 1000 timed out messages, the client will send multiple redelivery commands.
This would also solve the problem of an application setting that limit too big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sschepens Please consider the change described above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, could you check if it's somewhat what you expected @merlimat ?
.map(messageId -> { | ||
builder.setPartition(messageId.getPartitionIndex()); | ||
builder.setLedgerId(messageId.getLedgerId()); | ||
builder.setEntryId(messageId.getEntryId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of batch-message usecase: messageId
will be type of BatchMessageIdImpl
which may create duplicate MessageId
into the final list. instead should we use set or more-specific SortedSet to read entries in sequence at broker side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't BatchMessageIdImpl
only used for acks? SimpleUnAckedMessageTracker
stores MessageIdImpl
so there should be no case where this method receives BatchMessageIdImpl
.
That's also a reason why we the method signature should probably be redeliverUnacknowledgedMessages(List<MessageIdImpl> messageIds)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, in case of batched messages, the application will use ack multiple BatchMessageImpl
. When all the messages within a single batch are acked, that will trigger the ack to the broker based on the whole batch MessageIdImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia @merlimat I'm not really sure how we should deal with BatchMessageIdImpl
. I believe it doesn't really make sense te redeliver a single message in a batch, I don't really know if it's possible too. So, what should we do when UnAckedMessageTracker
considers a BatchMessageIdImpl
should be redelivered?
just a thought:
|
Yes
I thought of this but, I don't really know if it's that easy. Yes, this implementation has a lot of network overhead but it is precise. Another way to provide a more efficient behavior would be to move |
6ee3a33
to
8057ea8
Compare
} | ||
ClientCnx cnx = cnx(); | ||
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) { | ||
MessageIdData.Builder builder = MessageIdData.newBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. It's good as it is then.
.map(messageId -> { | ||
builder.setPartition(messageId.getPartitionIndex()); | ||
builder.setLedgerId(messageId.getLedgerId()); | ||
builder.setEntryId(messageId.getEntryId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, in case of batched messages, the application will use ack multiple BatchMessageImpl
. When all the messages within a single batch are acked, that will trigger the ack to the broker based on the whole batch MessageIdImpl
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import com.yahoo.pulsar.client.impl.ConsumerStats; | ||
import com.yahoo.pulsar.client.impl.MessageIdImpl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed unused imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* <i>(default: 1000)</i> | ||
*/ | ||
public int getMaxRedeliverUnacknowledgedMessagesBatch() { | ||
return maxRedeliverUnacknowledgedMessagesBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sschepens Please consider the change described above.
8057ea8
to
9e7c806
Compare
Change looks good to me, once you add some test to verify the new behavior we should be good to go. There is one compilation failure in travis build :
|
@merlimat I don't really know how that error happened, I ran the tests on my machine and they just worked. |
@sschepens About
After #55 gets merged, only the "whole batch" MessageId will be inserted into the unacked message tracker, thus this PR will be good as it is now (except for some probable minor conflicts with the other PR) |
9e7c806
to
f147576
Compare
@merlimat I committed some tests, could you check them? They also raised a a few more tweaks to prevent using per message redelivery on subscriptions that are not shared. |
f147576
to
cded896
Compare
Edit: Nevermind, I found the issue, could you give this a review now? |
9c921aa
to
40c8543
Compare
40c8543
to
5365256
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. Thank you
I've rebased to current master and fixed some minor conflicts in #70 |
fixed readme and examples
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com> Improve README.md and add CONTRIBUTING.md file.
(cherry picked from commit a45886c)
…lculating resource usage of bandwith logic. (apache#51)
Motivation
Pulsar should be more robust in terms of which messages it redelivers when there are unacked messages, right now Pulsar redelivers ALL currently unacked messages, this increases A LOT the chance to get duplicate messages.
Modifications
Protobuf:
Broker:
Client:
Result
Change should theoretically be backwards-compatible because Pulsar uses Protobuf.
Users can chose to prevent message duplicates on Redelivery be incrementing network traffic to Broker (sending message ids).
This is a first try at this implementation, please provide feedback!
I would also need a little guidance with tests, if I run
mvn test
now it hangs on some strange BookKeeper tests on projectmanaged-ledger
. I would also like some opinion on where to write tests.Cheers!