Skip to content

Commit

Permalink
Optimizing Exception Handling and Logging in MultiTopicsConsumerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhaoGuorui666 committed Nov 6, 2024
1 parent 570cb44 commit c577171
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public int minReceiverQueueSize() {

@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
Message<T> message = null;
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
Expand All @@ -383,6 +383,12 @@ protected Message<T> internalReceive() throws PulsarClientException {
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return beforeConsume(message);
} catch (IllegalArgumentException e) {
log.error("Validation failed: Expected instance of TopicMessageImpl but received {}. Message ID: {}",
message != null ? message.getClass().getName() : "null",
message != null ? message.getMessageId() : "N/A",
e);
throw new PulsarClientException("Invalid message type received", e);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand All @@ -391,7 +397,7 @@ protected Message<T> internalReceive() throws PulsarClientException {

@Override
protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
Message<T> message = null;
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
Expand All @@ -405,6 +411,12 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (IllegalArgumentException e) {
log.error("Validation failed: Expected instance of TopicMessageImpl but received {}. Message ID: {}",
message != null ? message.getClass().getName() : "null",
message != null ? message.getMessageId() : "N/A",
e);
throw new PulsarClientException("Invalid message type received", e);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand Down

0 comments on commit c577171

Please sign in to comment.