From c57717159a8168701ec9a56a2a11780e8f6e7231 Mon Sep 17 00:00:00 2001 From: CrazyCoder <18235787078@163.com> Date: Wed, 6 Nov 2024 13:18:31 +0000 Subject: [PATCH] Optimizing Exception Handling and Logging in `MultiTopicsConsumerImpl` --- .../client/impl/MultiTopicsConsumerImpl.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index ff293af230838..5329cc5eeba2b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -372,7 +372,7 @@ public int minReceiverQueueSize() { @Override protected Message internalReceive() throws PulsarClientException { - Message message; + Message message = null; try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -383,6 +383,12 @@ protected Message internalReceive() throws PulsarClientException { unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); return beforeConsume(message); + } catch (IllegalArgumentException e) { + log.error("Validation failed: Expected instance of TopicMessageImpl but received {}. Message ID: {}", + message != null ? message.getClass().getName() : "null", + message != null ? message.getMessageId() : "N/A", + e); + throw new PulsarClientException("Invalid message type received", e); } catch (Exception e) { ExceptionHandler.handleInterruptedException(e); throw PulsarClientException.unwrap(e); @@ -391,7 +397,7 @@ protected Message internalReceive() throws PulsarClientException { @Override protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarClientException { - Message message; + Message message = null; try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -405,6 +411,12 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC } resumeReceivingFromPausedConsumersIfNeeded(); return message; + } catch (IllegalArgumentException e) { + log.error("Validation failed: Expected instance of TopicMessageImpl but received {}. Message ID: {}", + message != null ? message.getClass().getName() : "null", + message != null ? message.getMessageId() : "N/A", + e); + throw new PulsarClientException("Invalid message type received", e); } catch (Exception e) { ExceptionHandler.handleInterruptedException(e); throw PulsarClientException.unwrap(e);