From e1c38224cd60bc7be197dc55b55f4b8149d035bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Sun, 17 May 2020 23:14:25 +0800 Subject: [PATCH] Expose replicated from filed on message struct (#251) Signed-off-by: xiaolong.ran --- pulsar/consumer_partition.go | 2 ++ pulsar/impl_message.go | 9 +++++++++ pulsar/message.go | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 498f2ae0063c2..4c40ae9147e84 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -424,6 +424,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID: msgID, payLoad: payload, replicationClusters: msgMeta.GetReplicateTo(), + replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), } } else { @@ -436,6 +437,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID: msgID, payLoad: payload, replicationClusters: msgMeta.GetReplicateTo(), + replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), } } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 9b85c8a0555e8..3a5d4b6e5d6b1 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -145,6 +145,7 @@ type message struct { properties map[string]string topic string replicationClusters []string + replicatedFrom string redeliveryCount uint32 } @@ -180,6 +181,14 @@ func (msg *message) RedeliveryCount() uint32 { return msg.redeliveryCount } +func (msg *message) IsReplicated() bool { + return msg.replicatedFrom != "" +} + +func (msg *message) GetReplicatedFrom() string { + return msg.replicatedFrom +} + func newAckTracker(size int) *ackTracker { var batchIDs *big.Int if size <= 64 { diff --git a/pulsar/message.go b/pulsar/message.go index 8505321742ddd..6be35d2c95f03 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -93,6 +93,12 @@ type Message interface { // Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker // redelivery count will be recalculated. RedeliveryCount() uint32 + + // Check whether the message is replicated from other cluster. + IsReplicated() bool + + // Get name of cluster, from which the message is replicated. + GetReplicatedFrom() string } // MessageID identifier for a particular message