From 27622e87aea65b9efdbdaf021f389f8f204e47c7 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 23 Mar 2017 23:23:39 -0700 Subject: [PATCH] Fix for cursor reset after topic reload --- .../mledger/impl/ManagedCursorImpl.java | 41 +++++-------------- .../mledger/impl/ManagedLedgerBkTest.java | 32 +++++++++++++++ 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f740b0dd9f5be..600d3777f5aaf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -240,7 +240,6 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac } // Read the last entry in the ledger - cursorLedger = lh; lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> { if (log.isDebugEnabled()) { log.debug("readComplete rc={} entryId={}", rc1, lh1.getLastAddConfirmed()); @@ -762,24 +761,17 @@ public void operationFailed(ManagedLedgerException exception) { }; - if (cursorLedger == null) { - persistPositionMetaStore(-1, newPosition, new MetaStoreCallback() { - @Override - public void operationComplete(Void result, Stat stat) { - log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}", - ledger.getName(), name, -1, markDeletePosition, readPosition); - cursorLedgerStat = stat; - finalCallback.operationComplete(); - } + internalAsyncMarkDelete(newPosition, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + finalCallback.operationComplete(); + } - @Override - public void operationFailed(MetaStoreException e) { - finalCallback.operationFailed(e); - } - }); - } else { - persistPosition(cursorLedger, newPosition, finalCallback); - } + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + finalCallback.operationFailed(exception); + } + }, null); } @Override @@ -1817,21 +1809,10 @@ public void operationFailed(ManagedLedgerException exception) { private void flushPendingMarkDeletes() { if (!pendingMarkDeleteOps.isEmpty()) { - if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) { - failPendingMarkDeletes(); - } else { - internalFlushPendingMarkDeletes(); - } + internalFlushPendingMarkDeletes(); } } - private void failPendingMarkDeletes() { - for (PendingMarkDeleteEntry e : pendingMarkDeleteOps) { - e.callback.markDeleteFailed(new ManagedLedgerException("reset cursor in progress"), e.ctx); - } - pendingMarkDeleteOps.clear(); - } - void internalFlushPendingMarkDeletes() { PendingMarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast(); lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index e093385ced9bb..8e2bdbbafd82c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -426,4 +426,36 @@ public void testOfflineTopicBacklog() throws Exception { factory.shutdown(); assertNotNull(offlineTopicStats); } + + @Test(timeOut = 20000) + void testResetCursorAfterRecovery() throws Exception { + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc); + ManagedLedgerConfig conf = new ManagedLedgerConfig().setMaxEntriesPerLedger(10).setEnsembleSize(1) + .setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1) + .setMetadataAckQuorumSize(1); + ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", conf); + ManagedCursor cursor = ledger.openCursor("trc1"); + Position p1 = ledger.addEntry("dummy-entry-1".getBytes()); + Position p2 = ledger.addEntry("dummy-entry-2".getBytes()); + Position p3 = ledger.addEntry("dummy-entry-3".getBytes()); + Position p4 = ledger.addEntry("dummy-entry-4".getBytes()); + + cursor.markDelete(p3); + + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc); + ledger = factory2.open("my_test_move_cursor_ledger", conf); + cursor = ledger.openCursor("trc1"); + + assertEquals(cursor.getMarkDeletedPosition(), p3); + assertEquals(cursor.getReadPosition(), p4); + assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + + cursor.resetCursor(p2); + assertEquals(cursor.getMarkDeletedPosition(), p1); + assertEquals(cursor.getReadPosition(), p2); + assertEquals(cursor.getNumberOfEntriesInBacklog(), 3); + + factory2.shutdown(); + factory.shutdown(); + } }