Skip to content

Commit

Permalink
feat: simplify message broadcasts (#7)
Browse files Browse the repository at this point in the history
* Simplify message passing

* Add unit tests for message marshalling, payload manipulation
  • Loading branch information
zivkovicmilos authored Feb 28, 2024
1 parent 8b73fc0 commit 43c8d4e
Show file tree
Hide file tree
Showing 14 changed files with 774 additions and 720 deletions.
67 changes: 19 additions & 48 deletions core/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ func (t *Tendermint) buildProposalMessage(proposal []byte) *types.ProposalMessag
)

// Build the proposal message (assumes the node will sign it)
return &types.ProposalMessage{
message := &types.ProposalMessage{
View: &types.View{
Height: height,
Round: round,
},
From: t.node.ID(),
Sender: t.node.ID(),
Proposal: proposal,
ProposalRound: validRound,
}

// Sign the message
message.Signature = t.signer.Sign(message.GetSignaturePayload())

return message
}

// buildPrevoteMessage builds a prevote message using the given proposal identifier
Expand All @@ -35,14 +40,19 @@ func (t *Tendermint) buildPrevoteMessage(id []byte) *types.PrevoteMessage {
processID = t.node.ID()
)

return &types.PrevoteMessage{
message := &types.PrevoteMessage{
View: &types.View{
Height: height,
Round: round,
},
From: processID,
Sender: processID,
Identifier: id,
}

// Sign the message
message.Signature = t.signer.Sign(message.GetSignaturePayload())

return message
}

// buildPrecommitMessage builds a precommit message using the given precommit identifier
Expand All @@ -57,56 +67,17 @@ func (t *Tendermint) buildPrecommitMessage(id []byte) *types.PrecommitMessage {
processID = t.node.ID()
)

return &types.PrecommitMessage{
message := &types.PrecommitMessage{
View: &types.View{
Height: height,
Round: round,
},
From: processID,
Sender: processID,
Identifier: id,
}
}

// broadcastProposal signs and broadcasts the given proposal message
func (t *Tendermint) broadcastProposal(proposal *types.ProposalMessage) {
message := &types.Message{
Type: types.MessageType_PROPOSAL,
Signature: t.signer.Sign(proposal.Marshal()),
Payload: &types.Message_ProposalMessage{
ProposalMessage: proposal,
},
}

// Broadcast the proposal message
t.broadcast.Broadcast(message)
}

// broadcastPrevote signs and broadcasts the given prevote message
func (t *Tendermint) broadcastPrevote(prevote *types.PrevoteMessage) {
message := &types.Message{
Type: types.MessageType_PREVOTE,
Signature: t.signer.Sign(prevote.Marshal()),
Payload: &types.Message_PrevoteMessage{
PrevoteMessage: prevote,
},
}

// Broadcast the prevote message
t.broadcast.Broadcast(message)
}

// broadcastPrecommit signs and broadcasts the given precommit message
//
//nolint:unused // Temporarily unused
func (t *Tendermint) broadcastPrecommit(precommit *types.PrecommitMessage) {
message := &types.Message{
Type: types.MessageType_PRECOMMIT,
Signature: t.signer.Sign(precommit.Marshal()),
Payload: &types.Message_PrecommitMessage{
PrecommitMessage: precommit,
},
}
// Sign the message
message.Signature = t.signer.Sign(message.GetSignaturePayload())

// Broadcast the precommit message
t.broadcast.Broadcast(message)
return message
}
104 changes: 47 additions & 57 deletions core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,93 +8,83 @@ import (
)

var (
ErrMessageNotSet = errors.New("message not set")
ErrMessagePayloadNotSet = errors.New("message payload not set")
ErrInvalidMessageSignature = errors.New("invalid message signature")
ErrMessageFromNonValidator = errors.New("message is from a non-validator")
ErrEarlierHeightMessage = errors.New("message is for an earlier height")
ErrEarlierRoundMessage = errors.New("message is for an earlier round")
)

// AddMessage verifies and adds a new message to the consensus engine
func (t *Tendermint) AddMessage(message *types.Message) error {
// AddProposalMessage verifies and adds a new proposal message to the consensus engine
func (t *Tendermint) AddProposalMessage(message *types.ProposalMessage) error {
// Verify the incoming message
if err := t.verifyMessage(message); err != nil {
return fmt.Errorf("unable to verify message, %w", err)
return fmt.Errorf("unable to verify proposal message, %w", err)
}

// Add the message to the store
t.store.AddMessage(message)
t.store.AddProposalMessage(message)

return nil
}

// verifyMessage verifies the incoming consensus message (base verification)
func (t *Tendermint) verifyMessage(message *types.Message) error {
// Make sure the message is present
if message == nil {
return ErrMessageNotSet
// AddPrevoteMessage verifies and adds a new prevote message to the consensus engine
func (t *Tendermint) AddPrevoteMessage(message *types.PrevoteMessage) error {
// Verify the incoming message
if err := t.verifyMessage(message); err != nil {
return fmt.Errorf("unable to verify proposal message, %w", err)
}

// Make sure the message payload is present
if message.Payload == nil {
return ErrMessagePayloadNotSet
}
// Add the message to the store
t.store.AddPrevoteMessage(message)

// Get the signature payload
signPayload, err := message.GetSignaturePayload()
if err != nil {
return fmt.Errorf("unable to get message signature payload, %w", err)
}
return nil
}

// Make sure the signature is valid
if !t.signer.IsValidSignature(signPayload, message.Signature) {
return ErrInvalidMessageSignature
// AddPrecommitMessage verifies and adds a new precommit message to the consensus engine
func (t *Tendermint) AddPrecommitMessage(message *types.PrecommitMessage) error {
// Verify the incoming message
if err := t.verifyMessage(message); err != nil {
return fmt.Errorf("unable to verify proposal message, %w", err)
}

// Extract individual message data
var (
sender []byte
view *types.View
)
// Add the message to the store
t.store.AddPrecommitMessage(message)

return nil
}

switch message.Type {
case types.MessageType_PROPOSAL:
// Get the proposal message
payload := message.GetProposalMessage()
if payload == nil || !payload.IsValid() {
return types.ErrInvalidMessagePayload
}

sender = payload.GetFrom()
view = payload.GetView()
case types.MessageType_PREVOTE:
// Get the prevote message
payload := message.GetPrevoteMessage()
if payload == nil || !payload.IsValid() {
return types.ErrInvalidMessagePayload
}

sender = payload.GetFrom()
view = payload.GetView()
case types.MessageType_PRECOMMIT:
// Get the precommit message
payload := message.GetPrecommitMessage()
if payload == nil || !payload.IsValid() {
return types.ErrInvalidMessagePayload
}

sender = payload.GetFrom()
view = payload.GetView()
type message interface {
GetView() *types.View
GetSender() []byte
GetSignature() []byte
GetSignaturePayload() []byte
Verify() error
}

// verifyMessage is the common base message verification
func (t *Tendermint) verifyMessage(message message) error {
// Check if the message is valid
if err := message.Verify(); err != nil {
return fmt.Errorf("unable to verify message, %w", err)
}

// Make sure the message sender is a validator
if !t.verifier.IsValidator(sender) {
if !t.verifier.IsValidator(message.GetSender()) {
return ErrMessageFromNonValidator
}

// Get the signature payload
signPayload := message.GetSignaturePayload()

// Make sure the signature is valid
if !t.signer.IsValidSignature(signPayload, message.GetSignature()) {
return ErrInvalidMessageSignature
}

// Make sure the message view is valid
var (
view = message.GetView()

currentHeight = t.state.view.GetHeight() // TODO make thread safe
currentRound = t.state.view.GetRound() // TODO make thread safe
)
Expand Down
28 changes: 23 additions & 5 deletions core/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@ package core

import "github.com/gnolang/go-tendermint/messages/types"

type broadcastDelegate func(*types.Message)
type (
broadcastProposalDelegate func(*types.ProposalMessage)
broadcastPrevoteDelegate func(*types.PrevoteMessage)
broadcastPrecommitDelegate func(*types.PrecommitMessage)
)

type mockBroadcast struct {
broadcastFn broadcastDelegate
broadcastProposalFn broadcastProposalDelegate
broadcastPrevoteFn broadcastPrevoteDelegate
broadcastPrecommitFn broadcastPrecommitDelegate
}

func (m *mockBroadcast) BroadcastProposal(message *types.ProposalMessage) {
if m.broadcastProposalFn != nil {
m.broadcastProposalFn(message)
}
}

func (m *mockBroadcast) BroadcastPrevote(message *types.PrevoteMessage) {
if m.broadcastPrevoteFn != nil {
m.broadcastPrevoteFn(message)
}
}

func (m *mockBroadcast) Broadcast(message *types.Message) {
if m.broadcastFn != nil {
m.broadcastFn(message)
func (m *mockBroadcast) BroadcastPrecommit(message *types.PrecommitMessage) {
if m.broadcastPrecommitFn != nil {
m.broadcastPrecommitFn(message)
}
}

Expand Down
46 changes: 11 additions & 35 deletions core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,19 @@ func newStore() *store {
}
}

// AddMessage adds a new message to the store
func (s *store) AddMessage(message *types.Message) {
switch message.Type {
case types.MessageType_PROPOSAL:
// Parse the propose message
wrappedMessage, ok := message.Payload.(*types.Message_ProposalMessage)
if !ok {
return
}

// Get the proposal
proposal := wrappedMessage.ProposalMessage

s.proposeMessages.AddMessage(proposal.View, proposal.From, proposal)
case types.MessageType_PREVOTE:
// Parse the prevote message
wrappedMessage, ok := message.Payload.(*types.Message_PrevoteMessage)
if !ok {
return
}

// Get the prevote
prevote := wrappedMessage.PrevoteMessage

s.prevoteMessages.AddMessage(prevote.View, prevote.From, prevote)
case types.MessageType_PRECOMMIT:
// Parse the precommit message
wrappedMessage, ok := message.Payload.(*types.Message_PrecommitMessage)
if !ok {
return
}
// AddProposalMessage adds a proposal message to the store
func (s *store) AddProposalMessage(proposal *types.ProposalMessage) {
s.proposeMessages.AddMessage(proposal.View, proposal.Sender, proposal)
}

// Get the precommit
precommit := wrappedMessage.PrecommitMessage
// AddPrevoteMessage adds a prevote message to the store
func (s *store) AddPrevoteMessage(prevote *types.PrevoteMessage) {
s.prevoteMessages.AddMessage(prevote.View, prevote.Sender, prevote)
}

s.precommitMessages.AddMessage(precommit.View, precommit.From, precommit)
}
// AddPrecommitMessage adds a precommit message to the store
func (s *store) AddPrecommitMessage(precommit *types.PrecommitMessage) {
s.precommitMessages.AddMessage(precommit.View, precommit.Sender, precommit)
}

// SubscribeToPropose subscribes to incoming PROPOSE messages
Expand Down
10 changes: 6 additions & 4 deletions core/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (t *Tendermint) startRound(ctx context.Context) {
// Broadcast the proposal to other consensus nodes
//
// 19: broadcast <PROPOSAL, hp, roundP, proposal, validRoundP>
t.broadcastProposal(proposeMessage)
t.broadcast.BroadcastProposal(proposeMessage)

// TODO make thread safe
// Save the accepted proposal in the state.
Expand All @@ -227,7 +227,7 @@ func (t *Tendermint) startRound(ctx context.Context) {
// Build and broadcast the prevote message
//
// 24/30: broadcast <PREVOTE, hP, roundP, id(v)>
t.broadcastPrevote(t.buildPrevoteMessage(id))
t.broadcast.BroadcastPrevote(t.buildPrevoteMessage(id))

// Since the current process is the proposer,
// it can directly move to the prevote state
Expand Down Expand Up @@ -265,7 +265,8 @@ func (t *Tendermint) runStates(ctx context.Context) []byte {
// 27: stepP ← prevote
//
// - The proposer for view (hP, roundP) has proposed a value that was accepted in some previous round
// 28: upon <PROPOSAL, hP, roundP, v, vr> from proposer(hP, roundP) AND 2f + 1 <PREVOTE, hP, vr, id(v)> while stepP = propose ∧ (vr >= 0 ∧ vr < roundP) do
// 28: upon <PROPOSAL, hP, roundP, v, vr> from proposer(hP, roundP) AND 2f + 1 <PREVOTE, hP, vr, id(v)>
// while stepP = propose ∧ (vr >= 0 ∧ vr < roundP) do
// 29: if valid(v) ∧ (lockedRoundP ≤ vr ∨ lockedValueP = v) then
// 30: broadcast <PREVOTE, hp, roundP, id(v)>
// 31: else
Expand Down Expand Up @@ -333,7 +334,8 @@ func (t *Tendermint) runPrevote(ctx context.Context) {
// This state handles the following situations:
//
// - A validator has received 2F+1 PRECOMMIT messages with a valid ID for the previously accepted proposal
// 49: upon <PROPOSAL, hP, r, v, ∗> from proposer(hP, r) AND 2f + 1 <PRECOMMIT, hP, r, id(v)> while decisionP[hP] = nil do
// 49: upon <PROPOSAL, hP, r, v, ∗> from proposer(hP, r) AND 2f + 1 <PRECOMMIT, hP, r, id(v)>
// while decisionP[hP] = nil do
// 50: if valid(v) then
// 51: decisionP[hp] = v
// 52: hP ← hP + 1
Expand Down
Loading

0 comments on commit 43c8d4e

Please sign in to comment.