Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Duplicate ByteBuffer when Caching Backlogged Consumers #17105

Conversation

michaeljmarshall
Copy link
Member

Fixes #16979

Motivation

#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the ByteBuffer so that the reader index is not shared. The current code has a race condition where the ByteBuffer reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls duplicate() on the shared ByteBuffer, which copies the current reader index, which might not be 0 if the original dispatcher read data from the ByteBuffer.

Note: it seems like the caching insert method creates (or recycles) more EntryImpl instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

Modifications

  • Create a new Entry before inserting it into the cache.
  • Add a new test to the EntryCacheTest. The test fails before this change and passes after it.
  • Update the EntryCacheTest mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned LedgerEntry objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

Verifying this change

This change includes a test that failed before the PR and passes after it.

Documentation

  • doc-not-needed

@michaeljmarshall michaeljmarshall added area/broker release/blocker Indicate the PR or issue that should block the release until it gets resolved doc-not-needed Your PR changes do not impact docs labels Aug 15, 2022
@michaeljmarshall michaeljmarshall added this to the 2.11.0 milestone Aug 15, 2022
@michaeljmarshall michaeljmarshall self-assigned this Aug 15, 2022
Copy link
Member

@mattisonchao mattisonchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@mattisonchao
Copy link
Member

mattisonchao commented Aug 16, 2022

Small suggestion:

Could we make the insert method ensure it? It can avoid the same problem when anybody inserts it in the future.
Maybe we can make cachedData = entry.getDataBuffer().retain(); use retainedDuplicate().
Or change the insert method to insert(EntryImpl entry, boolean duplicateDataBuffer) and the duplicateDataBuffer default is true. you can set it to false when you ensure you never change the original ByteBuffer. (Just quick think and still need deep thinking).

The advantage is that we make this method more fault-tolerant and don't need create another EntryImpl

@michaeljmarshall
Copy link
Member Author

Small suggestion:

Could we make the insert method ensure it? It can avoid the same problem when anybody inserts it in the future. Maybe we can make cachedData = entry.getDataBuffer().retain(); use retainedDuplicate(). Or change the insert method to insert(EntryImpl entry, boolean duplicateDataBuffer) and the duplicateDataBuffer default is true. you can set it to false when you ensure you never change the original ByteBuffer. (Just quick think and still need deep thinking).

The advantage is that we make this method more fault-tolerant and don't need create another EntryImpl

@mattisonchao - I didn't do it this way because the write path for tail read caching does not need the duplicate ByteBuffer created. That path technically doesn't even need an Entry, but it creates one because insert only takes an Entry. It seems like we should consider expanding/improving the EntryCache interface to meet these two different use cases. I agree that we should think about simplifying the entry reference management. I don't have benchmarks to show how much it costs to create an Entry unnecessarily, but this part of the code base is highly optimized, so I would prefer not to introduce unnecessary duplication if we can avoid it.

I am going to merge this as is so that it does not hold up the 2.11 release at all. We can definitely continue this discussion, especially because we have the PIP on improved caching getting implemented right now.

@michaeljmarshall michaeljmarshall merged commit 76f4195 into apache:master Aug 16, 2022
@michaeljmarshall michaeljmarshall deleted the duplicate-bytebuff-when-caching branch August 16, 2022 04:19
michaeljmarshall added a commit that referenced this pull request Aug 16, 2022
…17105)

Fixes #16979

### Motivation

#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 76f4195)
@michaeljmarshall michaeljmarshall added cherry-picked/branch-2.11 and removed release/blocker Indicate the PR or issue that should block the release until it gets resolved labels Aug 16, 2022
michaeljmarshall added a commit to datastax/pulsar that referenced this pull request Aug 16, 2022
…pache#17105)

Fixes apache#16979

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 76f4195)
Technoboy- pushed a commit to merlimat/pulsar that referenced this pull request Aug 16, 2022
…pache#17105)

Fixes apache#16979 

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`
nodece pushed a commit to nodece/pulsar that referenced this pull request Sep 10, 2024
…pache#17105)

Fixes apache#16979

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`

(cherry picked from commit 76f4195)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: MessageDispatchThrottlingTest.testBacklogConsumerCacheReads
3 participants