-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persist individually deleted messages #276
Persist individually deleted messages #276
Conversation
Ping @saandrews @rdhabalia |
yes, I will review it soon. |
No rush, just that I have the other changes for binary format in ML written on top of this :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just minor comments.
.collect(Collectors.toList()); | ||
} finally { | ||
lock.readLock().unlock(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we recycle below builders?
nestedPositionBuilder.recycle();
messageRangeBuilder.recycle();
and also should we document on the method that return List<MLDataFormats.MessageRange>
should be recycled by client once client is done with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ML protobuf classes are not generated with the custom protobuf, so they're not recyclable. I'd leave that for later, once we have completely phased out the text format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct.
PositionInfo pi = PositionInfo.newBuilder() | ||
.setLedgerId(position.getLedgerId()) | ||
.setEntryId(position.getEntryId()) | ||
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we recycle returned MLDataFormats.MessageRange
?
case Code.NoSuchLedgerExistsException: | ||
case Code.ReadException: | ||
case Code.LedgerRecoveryException: | ||
case BKException.Code.NoSuchLedgerExistsException: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason changing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason in this PR, will fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's good to disambiguate between ZK & BK Code
classes
import org.apache.bookkeeper.mledger.Entry; | ||
import org.apache.bookkeeper.mledger.ManagedLedgerException; | ||
import org.apache.bookkeeper.mledger.Position; | ||
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; | ||
import com.google.common.base.Predicate; | ||
|
||
import java.util.function.Predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any specific reason to change this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general we should switch to JDK's own things when available, though in this case this is unrelated so I'll take it out
@@ -2278,5 +2280,93 @@ public void testReopenMultipleTimes() throws Exception { | |||
c1 = ledger.openCursor("c1"); | |||
} | |||
|
|||
@Test(timeOut = 20000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a test-case which stores individualDeletedPositions
into ledger and reads back as this PR does this functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 tests, one that closes the ledger/factory (simulating graceful shutdown) and the other that directly opens a new ML factory, simulating a recovery after a crash.
conf/broker.conf
Outdated
@@ -198,6 +198,8 @@ managedLedgerCursorMaxEntriesPerLedger=50000 | |||
# Max time before triggering a rollover on a cursor ledger | |||
managedLedgerCursorRolloverTimeInSeconds=14400 | |||
|
|||
managedLedgerMaxUnackedRangesToPersist=1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add the documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should
@@ -179,6 +179,11 @@ | |||
private int managedLedgerCursorMaxEntriesPerLedger = 50000; | |||
// Max time before triggering a rollover on a cursor ledger | |||
private int managedLedgerCursorRolloverTimeInSeconds = 14400; | |||
// Max number of entries to append to a ledger before triggering a rollover | |||
// A ledger rollover is triggered on these conditions Either the max | |||
// rollover time has been reached or max entries have been written to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't persistPosition only on ledger rollover
but persist markDeletePosition
based on managedLedgerDefaultMarkDeleteRateLimit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this was copy-pasted from above config line. I'll fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@rdhabalia Updated |
8c3269b
to
5490770
Compare
- Allow to have multiple connections per broker
Motivation
Carrying over pr #192 from @sschepens.
Rebased on current master + commit 00fcab1
Added: