Skip to content

Commit

Permalink
[fix][ml] No rollover inactive ledgers when metadata service invalid (a…
Browse files Browse the repository at this point in the history
…pache#22284)

### Motivation

We should not rollover inactive ledgers when metadata service is invailable.

### Modifications

Checking metadata service is vailable when schedule `checkInactiveLedgerAndRollOver`

(cherry picked from commit b9bf0a8)
  • Loading branch information
AnonHxy authored and mukesh-ctds committed Apr 15, 2024
1 parent 4abea10 commit 88a9664
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4510,9 +4510,10 @@ private void cancelScheduledTasks() {

@Override
public boolean checkInactiveLedgerAndRollOver() {
long currentTimeMs = System.currentTimeMillis();
if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs
+ inactiveLedgerRollOverTimeMs)) {
if (factory.isMetadataServiceAvailable()
&& currentLedgerEntries > 0
&& inactiveLedgerRollOverTimeMs > 0
&& System.currentTimeMillis() > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs);
if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
LedgerHandle currentLedger = this.currentLedger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3922,6 +3922,30 @@ public void testDontRollOverEmptyInactiveLedgers() throws Exception {
ledger.close();
}

@Test
public void testDontRollOverInactiveLedgersWhenMetadataServiceInvalid() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = spy(new ManagedLedgerFactoryImpl(metadataStore, bkc));
// mock metadata service invalid
when(factory.isMetadataServiceAvailable()).thenReturn(false);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("rollover_inactive", config);

long ledgerId = ledger.currentLedger.getId();

Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
ledger.checkInactiveLedgerAndRollOver();

Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
ledger.checkInactiveLedgerAndRollOver();

assertEquals(ledger.currentLedger.getId(), ledgerId);

ledger.close();
}

@Test
public void testOffloadTaskCancelled() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
Expand Down

0 comments on commit 88a9664

Please sign in to comment.