-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make EntryImpl recyclable to improve gc at broker #318
Conversation
c64c7e3
to
81e154d
Compare
/** | ||
* Recycles entry instance | ||
*/ | ||
void recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is already a ref-counting semantic on the class, can we take advantage of it and automatically recycle when ref-count goes to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we follow the same semantic (same as ByteBuf -> release() will recycle if ref-count=0). However, we provided this method to support usecase where one wants to use EntryImpl temp to just pass the data(byteBuf) and not disturbing life-cycle of that data
and recycle that EntryImpl immediately. eg. OpAddEntry or Consumer
|
||
public static EntryImpl create(LedgerEntry ledgerEntry) { | ||
EntryImpl entry = RECYCLER.get(); | ||
entry.position = new PositionImpl(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even better we could reuse the PositionImpl
instance as well. We just need to make ledgerId
and entryId
non final on that class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the life-cycle of the PositionImpl
, we could return a new PositionImpl
instance in getPosition()
itself. The advantage would be that the object that is being retained for longer time (while in the cache) is going to get reused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, make sense. I will make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat made the change.
@@ -174,7 +175,8 @@ public String consumerName() { | |||
.build(); | |||
|
|||
ByteBuf metadataAndPayload = entry.getDataBuffer(); | |||
|
|||
// increment ref-count of data and release at the end of process: so, we can get chance to call entry.release | |||
entry.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantic here is kind of weird, it's kind of mix-matching ref-counting and recycler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, earlier when dispatcher gives entries to Consumer, consumer retrieves data(byteBuf)
from it and puts bytebuf into netty-channel which decrements the ref-count by 1 (and ByteByf will be recycled if ref-count=0).
Now, as we made EntryImpl
recyclable and it gets recycled when we call Entry.release()
and if data ref-count =0, we recycle the EntryImpl
. So, we retain()
which helps later on to call entry.release() and if entry's data ref-count is 0 then we recycle Entry and data both.
I could have avoided retain and could have recycled Entry in the end But caller(eg. dispatcher) may not want to recycle and if caller has retain data for later use then we may break this contract. So, this scope retain and in the last try to release the entry (and it gets recycled if data ref-count=0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.. and exactly what I was saying, that the 2 semantic are intertwined and make it a bit difficult to reason on retain/release/cycle because they are mixing the lifecycle of the ByteBuf
and that of EntryImpl
which are different.
So in some cases the buffer is released explicitly (by calling entry.release()
and in some other cases just by writing on the socket (outside of the EntryImpl
scope).
My point is that we should simplify that logic, not make that more complicated. One option could be to remove the release()
on the EntryImpl
and just require who gets the entry to call release on the buffer.
That way the EntryImpl
could maintain a simple recycler-only semantic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct. So, if we remove release()
then we have to call entry.data.release()
and entry.recycle()
explicitly. So, we have two options:
- Every time make two explicit calls and even this is not easy and tricky because at multiple places: we don't know when to recycle
EntryImpl
, we can only do when data ref-count reaches to 0, else caller method gives entry to next method and if it recycles then caller method gets NPE.
if(entry.getDataBuffer().release()){
entry.recycle();
}
- rename
release()
withreleaseAndRecycle()
which clarifies purpose => release data and recycle entry. and caller doesn't have to repeat logic of calling multiple things.
Do, you think we 2nd option will solve this concern or we should go with the 1st one.
int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight(); | ||
|
||
|
||
// remove positions first from replay list first : sendMessages recycles entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes seems to be not related with this PR, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is related to this PR. in this change we remove entries from replay list before we send those entries to consumer. Because it's consumer's responsibility to release entry's data and recycle it.
Now, to remove positions from replay list after sending to consumer, we have to read positions from already recycled entires which gives NPE.
So, we remove first from replay list and then send to consumer.
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling | ||
// insert | ||
ml.entryCache.insert(entry); | ||
entry.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to recycle entry here? Since recycle is setting data to null, won't it cause null pointer exception in line 152?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, ml.entryCache.insert(entry)
creates new EntryImpl
internally and allocates new data to store into cache. So, life-cycle of this entry is finished after calling cache.insert(entry)
so, recycling of this entry will not affect cache data because cache is already having duplicate-created entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if that gets changed so that we don't have to use 2 EntryImpl
instances instead of 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean we should just insert same entry object to cache without copying it?
if (ml.hasActiveCursors()) {
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
ml.entryCache.insert(entry); // It should not duplicate and insert the same entry
} else {
data.release();
}
If yes then I don't see any issue but then do you know any reason behind it having in existing code..??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as per documentation the reason is buffer coming from netty is in 64kb chunk so, to avoid storing unnecessary memory for smaller entry < 64kb, we copy and store it. So, I think we can keep existing logic as we are anyway immediately recycling the EntryImpl.
@merlimat Do you have any more concern? We are trying to get this merged today if there is no blocker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saandrews been out the last few days and I'm trying to catch up with reviews. I'd need a bit more time to understand all the implications of this change since it an already complicated mechanism.
The approach looks good, though I'd prefer to simplify the semantic of the EntryImpl class
@@ -17,6 +17,8 @@ | |||
|
|||
import io.netty.buffer.ByteBuf; | |||
|
|||
import org.apache.bookkeeper.mledger.impl.PositionImpl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not include PositionImpl
here
public void recycle() { | ||
this.data = null; | ||
// recycle position if it is recyclable object | ||
this.position.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recyclying on the PositionImpl
is currently disabled (and actually it might be good to remove that dead-code in this PR as well). In any case, since in getPosition()
we return a new instance of PositionImpl
, we can reuse the same position
all the time, we just need to reset the ledgerId
and entryId
on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recyclying on the PositionImpl is currently disabled (and actually it might be good to remove that dead-code in this PR as well)
I have already cleaned up dead-code and enabled the recycling on PositionImpl
we can reuse the same position all the time, we just need to reset the ledgerId and entryId on it.
Don't you think recycle would be better as it can protect to mess up things and it will surely return null on already recycled entryImpl.getPosition() rather reset value.
@@ -141,7 +141,11 @@ public void safeRun() { | |||
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength); | |||
if (ml.hasActiveCursors()) { | |||
// Avoid caching entries if no cursor has been created | |||
ml.entryCache.insert(new EntryImpl(ledger.getId(), entryId, data)); | |||
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic that was duplicate the EntryImpl
instance here was related to the fact that the data itself was copied to avoid holding a ByteBuf
that was used for the socket (and potentially of few MBs in size) just because 1 single entry was cached out of it.
Given that the lifecycle of the EntryImpl
is a bit different now, it might be possible to revisit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, should we try to address change which can prevent duplicate data in to different PR after we merge this one.??
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling | ||
// insert | ||
ml.entryCache.insert(entry); | ||
entry.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if that gets changed so that we don't have to use 2 EntryImpl
instances instead of 1?
@@ -174,7 +175,8 @@ public String consumerName() { | |||
.build(); | |||
|
|||
ByteBuf metadataAndPayload = entry.getDataBuffer(); | |||
|
|||
// increment ref-count of data and release at the end of process: so, we can get chance to call entry.release | |||
entry.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.. and exactly what I was saying, that the 2 semantic are intertwined and make it a bit difficult to reason on retain/release/cycle because they are mixing the lifecycle of the ByteBuf
and that of EntryImpl
which are different.
So in some cases the buffer is released explicitly (by calling entry.release()
and in some other cases just by writing on the socket (outside of the EntryImpl
scope).
My point is that we should simplify that logic, not make that more complicated. One option could be to remove the release()
on the EntryImpl
and just require who gets the entry to call release on the buffer.
That way the EntryImpl
could maintain a simple recycler-only semantic.
@rdhabalia @saandrews Added 1 commit on top of this PR. Take a look at merlimat@08de7c7 Few things:
|
@merlimat Looks good to me. By introducing refCounted |
Yes, take it with a grain of salt.. it would be good to have it go through a regression as well |
@@ -118,12 +118,14 @@ public boolean insert(EntryImpl entry) { | |||
entryBuf.readerIndex(readerIdx); | |||
} | |||
|
|||
if (entries.put(entry.getPosition(), new EntryImpl(entry.getPosition(), cachedData))) { | |||
PositionImpl position = entry.getPosition(); | |||
EntryImpl cacheEntry = EntryImpl.create(position, cachedData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to release cachedData
here. Since cacheEntry
retains the buffer, we need to do cachedData.release()
just after this line
@@ -34,49 +34,31 @@ | |||
private long ledgerId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot to put back final
for ledgerId
and entryId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
Broker creates instance of EntryImpl when it receives a published-msg and delivers to consumer. Broker also keeps it into cache to deliver it to already caught-up consumers. However,
EntryImpl
is not recyclable which impacts young gc as broker can create this object multiple times for each message.Modifications
EntryImpl
recyclable.PositionImpl
object's life-cycle independent toEntryImpl
so, recycling ofEntryImpl
will not impact any other existing flow. It also gives a scope to makePositionImpl
recyclable in future or will be helpful while replacing existing CacheImpl with other implementation.Result
It reduces young gc pause and time at broker.