-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304
[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304
Changes from 4 commits
a019e62
f2688c6
605dbdc
cc41bbc
6463d33
71c370c
8f431b6
52a0895
a24f6d5
957a614
a0f5d7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -19,6 +19,8 @@ | |||||
package org.apache.bookkeeper.mledger.impl; | ||||||
|
||||||
import java.util.List; | ||||||
import java.util.concurrent.CompletableFuture; | ||||||
|
||||||
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; | ||||||
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; | ||||||
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; | ||||||
|
@@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn | |||||
* @throws MetaStoreException | ||||||
*/ | ||||||
Iterable<String> getManagedLedgers() throws MetaStoreException; | ||||||
|
||||||
/** | ||||||
* Check path exists. | ||||||
* | ||||||
* @param path {@link String} | ||||||
* @return a future represents the result of the operation. | ||||||
* an instance of {@link Boolean} is returned | ||||||
* if the operation succeeds. | ||||||
*/ | ||||||
CompletableFuture<Boolean> exists(String path); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1925,43 +1925,68 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { | |
final String topic = command.getTopic(); | ||
final int txnAction = command.getTxnAction().getValue(); | ||
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); | ||
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic, | ||
txnID, txnAction); | ||
} | ||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); | ||
if (topicFuture != null) { | ||
topicFuture.whenComplete((optionalTopic, t) -> { | ||
if (!optionalTopic.isPresent()) { | ||
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " | ||
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( | ||
requestId, ServerError.ServiceNotReady, | ||
"Topic " + topic + " is not found.")); | ||
return; | ||
} | ||
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark()) | ||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); | ||
topicFuture.thenAccept(optionalTopic -> { | ||
if (optionalTopic.isPresent()) { | ||
optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark) | ||
.whenComplete((ignored, throwable) -> { | ||
if (throwable != null) { | ||
log.error("Handle endTxnOnPartition {} failed.", topic, throwable); | ||
log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], " | ||
+ "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( | ||
requestId, BrokerServiceException.getClientErrorCode(throwable), | ||
throwable.getMessage())); | ||
throwable.getMessage(), | ||
txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
return; | ||
} | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, | ||
txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
}); | ||
}); | ||
} else { | ||
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, " | ||
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.")); | ||
} | ||
|
||
} else { | ||
getBrokerService().getManagedLedgerFactory() | ||
.checkManagedLedgerInitializedBefore(TopicName.get(topic).getPersistenceNamingEncoding()) | ||
.thenAccept((b) -> { | ||
if (b) { | ||
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " | ||
+ "txnId: [{}], txnAction: [{}]", topic, | ||
txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is a typo, what about: "The topic " + topic + " does not exist in broker." |
||
txnID.getMostSigBits(), txnID.getLeastSigBits())); | ||
} else { | ||
log.warn("handleEndTxnOnPartition fail ! The topic {} have not been created, " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: "has not been created" |
||
+ "txnId: [{}], txnAction: [{}]", | ||
topic, txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, | ||
txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
} | ||
}).exceptionally(e -> { | ||
log.error("handleEndTxnOnPartition fail ! topic {} , " | ||
+ "txnId: [{}], txnAction: [{}]", topic, txnID, | ||
TxnAction.valueOf(txnAction), e.getCause()); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( | ||
requestId, ServerError.ServiceNotReady, | ||
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
return null; | ||
}); | ||
} | ||
}).exceptionally(e -> { | ||
log.error("handleEndTxnOnPartition fail ! topic {} , " | ||
+ "txnId: [{}], txnAction: [{}]", topic, txnID, | ||
TxnAction.valueOf(txnAction), e.getCause()); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( | ||
requestId, ServerError.ServiceNotReady, | ||
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
return null; | ||
}); | ||
} | ||
|
||
@Override | ||
|
@@ -1972,70 +1997,82 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { | |
final String topic = command.getSubscription().getTopic(); | ||
final String subName = command.getSubscription().getSubscription(); | ||
final int txnAction = command.getTxnAction().getValue(); | ||
final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); | ||
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, | ||
log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, subName, | ||
new TxnID(txnidMostBits, txnidLeastBits), txnAction); | ||
} | ||
|
||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); | ||
if (topicFuture != null) { | ||
topicFuture.thenAccept(optionalTopic -> { | ||
|
||
if (!optionalTopic.isPresent()) { | ||
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " | ||
+ "[{}], txnAction: [{}]", topic, | ||
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnidLeastBits, txnidMostBits, | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.")); | ||
return; | ||
} | ||
|
||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); | ||
topicFuture.thenAccept(optionalTopic -> { | ||
if (optionalTopic.isPresent()) { | ||
Subscription subscription = optionalTopic.get().getSubscription(subName); | ||
if (subscription == null) { | ||
log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnidLeastBits, txnidMostBits, | ||
ServerError.ServiceNotReady, | ||
"Topic " + optionalTopic.get().getName() | ||
+ " subscription " + subName + " is not exist.")); | ||
log.warn("handleEndTxnOnSubscription fail! " | ||
+ "topic {} subscription {} is not exist. txnId: [{}], txnAction: [{}]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "is not exist" -> "does not exist" |
||
optionalTopic.get().getName(), subName, txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush( | ||
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); | ||
return; | ||
} | ||
|
||
CompletableFuture<Void> completableFuture = | ||
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, | ||
command.getTxnidLeastBitsOfLowWatermark()); | ||
completableFuture.whenComplete((ignored, throwable) -> { | ||
if (throwable != null) { | ||
log.error("Handle end txn on subscription failed for request {}", requestId, throwable); | ||
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark); | ||
completableFuture.whenComplete((ignored, e) -> { | ||
if (e != null) { | ||
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" | ||
+ "txnId: [{}], txnAction: [{}]", topic, subName, | ||
txnID, TxnAction.valueOf(txnAction), e.getCause()); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnidLeastBits, txnidMostBits, | ||
BrokerServiceException.getClientErrorCode(throwable), | ||
BrokerServiceException.getClientErrorCode(e), | ||
"Handle end txn on subscription failed.")); | ||
return; | ||
} | ||
ctx.writeAndFlush( | ||
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); | ||
}); | ||
}).exceptionally(e -> { | ||
log.error("Handle end txn on subscription failed for request {}", requestId, e); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnidLeastBits, txnidMostBits, | ||
ServerError.ServiceNotReady, | ||
"Handle end txn on subscription failed.")); | ||
return null; | ||
}); | ||
} else { | ||
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " | ||
+ "[{}], txnAction: [{}]", topic, | ||
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); | ||
} else { | ||
getBrokerService().getManagedLedgerFactory() | ||
.checkManagedLedgerInitializedBefore(TopicName.get(topic).getPersistenceNamingEncoding()) | ||
.thenAccept((b) -> { | ||
if (b) { | ||
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, " | ||
+ "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName, | ||
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.")); | ||
} else { | ||
log.warn("handleEndTxnOnSubscription fail ! The topic {} have not been created, " | ||
+ "subscription: {} txnId: [{}], txnAction: [{}]", | ||
topic, subName, txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, | ||
txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
} | ||
}).exceptionally(e -> { | ||
log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}" | ||
+ "txnId: [{}], txnAction: [{}]", topic, subName, | ||
txnID, TxnAction.valueOf(txnAction), e.getCause()); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), | ||
ServerError.ServiceNotReady, e.getMessage())); | ||
return null; | ||
}); | ||
} | ||
}).exceptionally(e -> { | ||
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" | ||
+ "txnId: [{}], txnAction: [{}]", topic, subName, | ||
txnID, TxnAction.valueOf(txnAction), e.getCause()); | ||
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( | ||
requestId, txnidLeastBits, txnidMostBits, | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.")); | ||
} | ||
"Handle end txn on subscription failed.")); | ||
return null; | ||
}); | ||
} | ||
|
||
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.