Skip to content

Commit

Permalink
Expose replicated from filed on message struct (apache#251)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran <rxl@apache.org>
  • Loading branch information
wolfstudy authored May 17, 2020
1 parent b04a842 commit e1c3822
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
msgID: msgID,
payLoad: payload,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
}
} else {
Expand All @@ -436,6 +437,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
msgID: msgID,
payLoad: payload,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
}
}
Expand Down
9 changes: 9 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type message struct {
properties map[string]string
topic string
replicationClusters []string
replicatedFrom string
redeliveryCount uint32
}

Expand Down Expand Up @@ -180,6 +181,14 @@ func (msg *message) RedeliveryCount() uint32 {
return msg.redeliveryCount
}

func (msg *message) IsReplicated() bool {
return msg.replicatedFrom != ""
}

func (msg *message) GetReplicatedFrom() string {
return msg.replicatedFrom
}

func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
Expand Down
6 changes: 6 additions & 0 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type Message interface {
// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
// redelivery count will be recalculated.
RedeliveryCount() uint32

// Check whether the message is replicated from other cluster.
IsReplicated() bool

// Get name of cluster, from which the message is replicated.
GetReplicatedFrom() string
}

// MessageID identifier for a particular message
Expand Down

0 comments on commit e1c3822

Please sign in to comment.