Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

We're seeing thousands of duplicated messages when we stop a Broker #150

Closed
estebangarcia opened this issue Dec 20, 2016 · 8 comments
Closed

Comments

@estebangarcia
Copy link

We started a Redis to store the consumed messages ID, each time a new message is consumed we check if the ID is in redis. If it is that means that the message is duplicated.

Expected behavior

Shouldn't see duplicated messages

Actual behavior

We're seeing thousands of duplicated messages after a broker goes down

Steps to reproduce

  1. Start Redis and store each consumed message ID
  2. Each time a message is consumed, search if the ID is already on Redis
  3. Stop a Broker
  4. Duplicated messages appear

Don't know if it helps but in the logs we see that all the duplicated messages have the same ledgerId and partitionIndex.

Message id duplicated messageId -> MessageIdImpl{ledgerId=9, entryId=510587, partitionIndex=2}

System configuration

Pulsar version: built from master

If you need any further information, please let us know.

@rdhabalia
Copy link
Contributor

This happens when consumer doesn't acknowledge back to broker for the consumed the message using api: acknowledge(message)

At high level: Broker persists the offset/markDeletePosition(broker has received ack for all messages up to this position) when consumer acks the message. Therefore, in case of broker-restarts: broker recovers this persist position and starts dispatching messages from this position.

So, in your case: if consumer doesn't ack the message then broker will not persist the position so, broker-restart will send all the messages to consumer again. Hence, consumer needs to confirm message consumption by sending acknowledgement to broker.

@lovelle
Copy link
Contributor

lovelle commented Dec 21, 2016

Hi @rdhabalia thanks for your response. I don't think this is the case we are facing, we have dummy consumers which are receiving and acknowledging messages all the time (like stress test in consume perf) but with a sqs like api in front of pulsar.
So, when a broker restart happens, we are seeing in a randomly way, a huge number of messages repeated (more than the number of messages that can be consider unacked during restart)
Always in the same ledgerId and partitionIndex, also, we are consuming with partitioned consumers.

We know there are messages being repeated, because messageId consumed are being stored in a different data store.
Did you ever face any trouble like this on pulsar?

Thanks.

@rdhabalia
Copy link
Contributor

  • Broker increments offset/markDeletePosition only if it receives ack for all the messages up to that message-position. If client misses to ack one of the message then it creates an ack-hole and it prevents markDeletePosition to move forward from that position.

Always in the same ledgerId and partitionIndex

It seems a client is failing to process that exact message due to unexpected exception. And therefore, it creates ack-hole which always keeps markDeletePosition on that message-id and broker-restarts starts dispatching message from that message-position.

You can verify this behavior before restarting the broker.

  1. How to verify markDeletePosition and missing ack-message?
    You can verify it using pulsar-admin-stats.
    For example: partitioned topic : stats for partition-1
    pulsar-admin persistent stats-internal persistent://test-property/cl1/ns1/tp1-partition-1
    under your subscriber name : we can see two specific fields:
"cursors": {
        "my-subscription": {
            "markDeletePosition": "324711539:3133",
            "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",    
             :
        }

markDeletePosition: MessageId position which will be persisted and in case of broker-restart: broker will start dispatching message from that position.
individuallyDeletedMessages:list of range of ack-messages. In above example: consumer has not acked 324711539:3134 and 324711539:3137. So, it can help you to debug why client has not acked that specific message.

You can also use REST-API to get the same stats:

GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/internalStats

example:

http://localhost:8080/admin/persistent/test-property/cl1/ns1/tp1-partition-1/internalStats

@merlimat
Copy link
Contributor

@estebangarcia The other thing is that the frequency at which the markDelete position is persisted is configurable in the broker.conf file:

# Rate limit the amount of writes generated by consumer acking the messages
managedLedgerDefaultMarkDeleteRateLimit=0.1

That means, the default (per broker) is to save it every 10secs. This frequency can also be configured individually on the namespace policies.

Set this configuration to something higher (eg: 10 will mean to save the markDelete position at most 10 times a second). Setting this to 0 will disable the rate limiter and persist the position after every single ack (that moves the markDelete position).

I'll update the docs on that setting because it's not very clear at the moment. Also, default should probably be something like 1.0.

@estebangarcia
Copy link
Author

@merlimat @rdhabalia thanks for your responses. We will make sure to check these things and get back to you.

@estebangarcia
Copy link
Author

We tried setting the managedLedgerDefaultMarkDeleteRateLimit to 1 but we had the same issue, so we set it to 0 and now we have just a few duplicates.

I'm wondering the implications of disabling the rate limiter and if it can have any negative impact in the long term.

@merlimat
Copy link
Contributor

Setting that to 0, means that there will be 1 write on the bookies per each message you acknowledge. The write is very small, but it effectively multiplies the write rate on the number of subscriptions.

Maybe you can try to limit that to 100/s or 1000/s to get a better tradeoff between additional writes and number of duplicates.

@estebangarcia
Copy link
Author

I think we'll keep "playing" with this parameter until we find a better tradeoff that suit us just like you suggested.

Thanks for your help!

sijie pushed a commit to sijie/pulsar that referenced this issue Mar 4, 2018
* adding unittest for windowing

* cleaning up

* removing print line
hangc0276 pushed a commit to hangc0276/pulsar that referenced this issue May 26, 2021
optimize for apache#134 including the following aspects:

recordToEntry paralleled
publish message to bookie asynchronous
In our environment, produce delay is about one-fifth of the previous
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants