From c48ab8661701390f24286a2a80ebbb6355ecbad6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 21 Nov 2022 14:21:28 +0800 Subject: [PATCH] Enhance compareTo --- .../PersistentAcknowledgmentsGroupingTracker.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 9ac27f5fb39fe..fef0bcb8906f1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -663,13 +663,18 @@ public synchronized void reset() { } public synchronized int compareTo(MessageId messageId) { - // The position of a message in the batch (BatchMessageIdImpl) must precede the batch itself (MessageIdImpl) if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) { - int result = ((BatchMessageIdImpl) this.messageId).toMessageIdImpl().compareTo(messageId); - return (result == 0) ? -1 : result; + final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId; + final MessageIdImpl rhs = (MessageIdImpl) messageId; + return MessageIdImpl.messageIdCompare( + lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(), + rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE); } else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){ - int result = this.messageId.compareTo(((BatchMessageIdImpl) messageId).toMessageIdImpl()); - return (result == 0) ? 1 : result; + final MessageIdImpl lhs = this.messageId; + final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId; + return MessageIdImpl.messageIdCompare( + lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE, + rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex()); } else { return this.messageId.compareTo(messageId); }