Skip to content

Commit

Permalink
[Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Dele…
Browse files Browse the repository at this point in the history
…tion (apache#17915) (#152)

(cherry picked from commit 0854032)

Fixes apache#9962 

### Motivation

Offloaded ledgers can be orphaned on topic deletion. 

This is a redo of apache#15914 which conflicted with concurrently merged apache#17736 thus resulting in apache#17889 .

apache#17736 made a decision to not allow managed ledger trimming for the fenced mledgers because in many case fencing indicates a problems that should stop all operations on mledger. At the same time fencing is used before deletion starts, so trimming added to the deletion process cannot proceed.
After discussion with @eolivelli I introduced new state, FencedForDeletion, which acts as Fenced state except for the trimming/deletion purposes.

### Modifications

Topic to be truncated before deletion to delete offloaded ledgers properly and fail if truncation fails.

### Verifying this change

local fork tests: dlg99#1

- [ ] Make sure that the change passes the CI checks.

This change added integration tests

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

Nothing changed in the options but admin CLI will implicitly run truncate before topic delete. 

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API: (yes / no)
  - The schema: (yes / no / don't know)
  - The default values of configurations: (yes / no)
  - The wire protocol: (yes / no)
  - The rest endpoints: (yes / no)
  - The admin cli options: (yes / no)
  - Anything that affects deployment: (yes / no / don't know)

### Documentation

Check the box below or label this PR directly.

Need to update docs? 

- [ ] `doc-required` 
(Your PR needs to update docs and you will update later)
  
- [x] `doc-not-needed` 
(Please explain why)
  
- [ ] `doc` 
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)
  • Loading branch information
dlg99 authored Oct 31, 2022
1 parent 79d958b commit 059f444
Show file tree
Hide file tree
Showing 16 changed files with 684 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void delete(String name) throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException;

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
Expand All @@ -154,6 +164,16 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);

/**
* Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
*
* @param name
* @throws InterruptedException
* @throws ManagedLedgerException
*/
void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx);

/**
* Releases all the resources maintained by the ManagedLedgerFactory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -70,13 +71,15 @@
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -354,7 +357,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
if (existingFuture.isDone()) {
try {
ManagedLedgerImpl l = existingFuture.get();
if (l.getState() == State.Fenced || l.getState() == State.Closed) {
if (l.getState().isFenced() || l.getState() == State.Closed) {
// Managed ledger is in unusable state. Recreate it.
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it",
name, l.getState());
Expand Down Expand Up @@ -818,12 +821,18 @@ public void operationFailed(MetaStoreException e) {

@Override
public void delete(String name) throws InterruptedException, ManagedLedgerException {
delete(name, CompletableFuture.completedFuture(null));
}

@Override
public void delete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture)
throws InterruptedException, ManagedLedgerException {
class Result {
ManagedLedgerException e = null;
}
final Result r = new Result();
final CountDownLatch latch = new CountDownLatch(1);
asyncDelete(name, new DeleteLedgerCallback() {
asyncDelete(name, mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
latch.countDown();
Expand All @@ -845,10 +854,16 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx) {
asyncDelete(name, CompletableFuture.completedFuture(null), callback, ctx);
}

@Override
public void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
CompletableFuture<ManagedLedgerImpl> future = ledgers.get(name);
if (future == null) {
// Managed ledger does not exist and we're not currently trying to open it
deleteManagedLedger(name, callback, ctx);
deleteManagedLedger(name, mlConfigFuture, callback, ctx);
} else {
future.thenAccept(ml -> {
// If it's open, delete in the normal way
Expand All @@ -863,7 +878,8 @@ public void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx)
/**
* Delete all managed ledger resources and metadata.
*/
void deleteManagedLedger(String managedLedgerName, DeleteLedgerCallback callback, Object ctx) {
void deleteManagedLedger(String managedLedgerName, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
// Read the managed ledger metadata from store
asyncGetManagedLedgerInfo(managedLedgerName, new ManagedLedgerInfoCallback() {
@Override
Expand All @@ -875,7 +891,7 @@ public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
.map(e -> deleteCursor(bkc, managedLedgerName, e.getKey(), e.getValue()))
.collect(Collectors.toList());
Futures.waitForAll(futures).thenRun(() -> {
deleteManagedLedgerData(bkc, managedLedgerName, info, callback, ctx);
deleteManagedLedgerData(bkc, managedLedgerName, info, mlConfigFuture, callback, ctx);
}).exceptionally(ex -> {
callback.deleteLedgerFailed(new ManagedLedgerException(ex), ctx);
return null;
Expand All @@ -890,22 +906,80 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}

private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info,
DeleteLedgerCallback callback, Object ctx) {
CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
final CompletableFuture<Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
ledgerInfosFuture = new CompletableFuture<>();
store.getManagedLedgerInfo(managedLedgerName, false, null,
new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> infos = new HashMap<>();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
infos.put(ls.getLedgerId(), ls);
}
ledgerInfosFuture.complete(infos);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("Failed to get managed ledger info for {}", managedLedgerName, e);
ledgerInfosFuture.completeExceptionally(e);
}
});

Futures.waitForAll(info.ledgers.stream()
.filter(li -> !li.isOffloaded)
.map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
.map(li -> {
final CompletableFuture<Void> res;
if (li.isOffloaded) {
res = mlConfigFuture
.thenCombine(ledgerInfosFuture, Pair::of)
.thenCompose(pair -> {
ManagedLedgerConfig mlConfig = pair.getLeft();
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfos = pair.getRight();

if (mlConfig == null || ledgerInfos == null) {
return CompletableFuture.completedFuture(null);
}
return result;
}))

MLDataFormats.ManagedLedgerInfo.LedgerInfo ls = ledgerInfos.get(li.ledgerId);

if (ls.getOffloadContext().hasUidMsb()) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
String driverName = OffloadUtils.getOffloadDriverName(ls,
mlConfig.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);

UUID uuid = new UUID(ls.getOffloadContext().getUidMsb(),
ls.getOffloadContext().getUidLsb());
return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
"Deletion", managedLedgerName, scheduledExecutor);
}

return CompletableFuture.completedFuture(null);
});
} else {
res = CompletableFuture.completedFuture(null);
}
return res.thenCompose(__ -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
|| rc == BKException.Code.NoSuchLedgerExistsException) {
log.info("Ledger {} does not exist, ignoring", li.ledgerId);
return null;
}
throw new CompletionException(ex);
}
return result;
}));
})
.collect(Collectors.toList()))
.thenRun(() -> {
// Delete the metadata
Expand Down
Loading

0 comments on commit 059f444

Please sign in to comment.