-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-186] Introduce two phase deletion protocol based on system topic #16590
base: master
Are you sure you want to change the base?
Conversation
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
//No op | ||
} | ||
|
||
private static final CompletableFuture<?> COMPLETABLE_FUTURE = CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you cannot cache CompletableFutures
the behaviour will be unpredictable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good work
I have left some feedback
ledgersStat = stat; | ||
metadataMutex.unlock(); | ||
trimmerMutex.unlock(); | ||
if (ledgerDeletionService instanceof LedgerDeletionService.LedgerDeletionServiceDisable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using instanceof is a bad practice.
we should add a method to LedgerDeletionService and override it on the various implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea.
@ApiResponse(code = 404, message = "Topic does not exist"), | ||
@ApiResponse(code = 412, message = "Topic has active producers/subscriptions"), | ||
@ApiResponse(code = 500, message = "Internal server error")}) | ||
public void deleteLedger( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a new API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are 2 brokers, it will exists two system topic consumers(A and B). If the topic onwership is A, when B received pending delete ledger msg, it will send delete ledger command to A. So introduce rest api in server side and PusarAdmin.
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
doc = "Using two phase deletion when delete ledger. if true, " | ||
+ "LedgerDeletionService will take over ledger deletion. (Default false)" | ||
) | ||
private boolean topicTwoPhaseDeletionEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should set the default value to false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
} | ||
|
||
@Override | ||
public void close() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to stop the statsProvider
on close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
} | ||
this.statsProvider.start(configuration); | ||
StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion"); | ||
this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the metrics can be exported to the broker Prometheus port?
future.complete(null); | ||
}); | ||
} else if (LedgerComponent.MANAGED_CURSOR == pendingDeleteLedger.getLedgerComponent()) { | ||
future.complete(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add @TODO
flag here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright.
} | ||
future.complete(null); | ||
}); | ||
} else if (LedgerType.OFFLOAD_LEDGER == ledgerType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge these two cases into one? The main differences are the offloadContext
parameter. In managedledger.asyncDeleteLedger
method, we also checked the ledgerType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can be.
reader.ackMessageAsync(message); | ||
future.complete(null); | ||
} else if (ex instanceof PulsarAdminException.ServerSideErrorException) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to deal with this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needn't, The origin logic is: in client side, to check the ServerSideErrorException if contains PendingDeleteLedgerInvalidException
info, it contains it, means that the pending delete message is invalid, we also need ack it.
But now, at server side, we catch PendingDeleteLedgerInvalidException
, make it return success response. So the client side needn't to check the error response info.
I will delete this logic branch.
CompletableFuture<?> appendDeleteLedgerFuture = | ||
appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers); | ||
appendDeleteLedgerFuture.thenAccept(ignore -> { | ||
believedDeleteIds.addAll(deletableLedgers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the ledgerDeletionService
not enabled, the believedDeleteIds
set will keep increasing and doesn't have chance to delete items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch.
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
The pr had no activity for 30 days, mark with Stale label. |
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@hangc0276 @eolivelli Could you help to review it, thanks! Pending for a long time. |
The pr had no activity for 30 days, mark with Stale label. |
Confirmed w/ @codelipenghui, for 3.1, the code will be frozen next week, and this PR will not be merged into 3.1. |
Original issue: #13238
PIP Issue: #16569
Motivation
In current ledger deletion, we divided it into two separate steps. It happens in ManagedLedger and ManagedCursor.
Remove all the waiting to delete ledgers from the ledger list and update the newest ledger list into a meta store.
In the meta store update callback operation, delete the waiting to delete ledgers from storage systems, such as BookKeeper or Tiered storage.
Due to the separate step, we can’t ensure the ledger deletion transaction. If the first step succeeds and the second step fails, it will lead to ledgers that can't be deleted from the storage system forever. The second step may fail by broker restart or storage system deletion failed.
In our customer’s environment, we have found many orphan ledgers cause by the above reason.
Modifications
Introduce two phase deletion to solve the problem.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)