diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 7923706eca..83a65a4c6e 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -9,6 +9,8 @@ import ( "sync" "sync/atomic" + "github.com/0xPolygon/go-ibft/messages" + "github.com/0xPolygon/go-ibft/messages/proto" "github.com/0xPolygon/pbft-consensus" "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/consensus/polybft/bitmap" @@ -36,11 +38,6 @@ var ( errQuorumNotReached = errors.New("quorum not reached for commitment message") ) -// Transport is an abstraction of network layer -type Transport interface { - Gossip(message interface{}) -} - // txPoolInterface is an abstraction of transaction pool type txPoolInterface interface { Prepare() @@ -74,14 +71,14 @@ type epochMetadata struct { // runtimeConfig is a struct that holds configuration data for given consensus runtime type runtimeConfig struct { - PolyBFTConfig *PolyBFTConfig - DataDir string - Transport Transport - Key *wallet.Key - State *State - blockchain blockchainBackend - polybftBackend polybftBackend - txPool txPoolInterface + PolyBFTConfig *PolyBFTConfig + DataDir string + BridgeTransport BridgeTransport + Key *wallet.Key + State *State + blockchain blockchainBackend + polybftBackend polybftBackend + txPool txPoolInterface } // consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management @@ -111,21 +108,14 @@ type consensusRuntime struct { } // newConsensusRuntime creates and starts a new consensus runtime instance with event tracking -func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRuntime, error) { +func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) *consensusRuntime { runtime := &consensusRuntime{ state: config.State, config: config, logger: log.Named("consensus_runtime"), } - if runtime.IsBridgeEnabled() { - err := runtime.startEventTracker() - if err != nil { - return nil, err - } - } - - return runtime, nil + return runtime } // getEpoch returns current epochMetadata in a thread-safe manner. @@ -142,7 +132,7 @@ func (c *consensusRuntime) IsBridgeEnabled() bool { // AddLog is an implementation of eventSubscription interface, // and is called from the event tracker when an event is final on the rootchain -func (c *consensusRuntime) AddLog(eventLog *ethgo.Log) { +func (c *consensusRuntime) AddLog(eventLog *ethgo.Log) { //nolint c.logger.Info( "Add State sync event", "block", eventLog.BlockNumber, @@ -462,7 +452,7 @@ func (c *consensusRuntime) buildCommitment(epoch, fromIndex uint64) (*Commitment NodeID: c.config.Key.NodeID(), EpochNumber: epoch, } - c.config.Transport.Gossip(msg) + c.config.BridgeTransport.Multicast(msg) c.logger.Debug("[buildCommitment] Built commitment", "from", commitment.FromIndex, "to", commitment.ToIndex) @@ -889,3 +879,82 @@ func validateVote(vote *MessageSignature, epoch *epochMetadata) error { return nil } + +func (c *consensusRuntime) BuildProposal(blockNumber uint64) []byte { + panic("not implemented") +} + +// InsertBlock inserts a proposal with the specified committed seals +func (c *consensusRuntime) InsertBlock(proposal []byte, committedSeals []*messages.CommittedSeal) { + panic("not implemented") +} + +// ID returns the validator's ID +func (c *consensusRuntime) ID() []byte { + panic("not implemented") +} + +// MaximumFaultyNodes returns the maximum number of faulty nodes based +// on the validator set. +func (c *consensusRuntime) MaximumFaultyNodes() uint64 { + panic("not implemented") +} + +// Quorum returns what is the quorum size for the +// specified block height. +func (c *consensusRuntime) Quorum(blockHeight uint64) uint64 { + panic("not implemented") +} + +// BuildPrePrepareMessage builds a PREPREPARE message based on the passed in proposal +func (c *consensusRuntime) BuildPrePrepareMessage( + proposal []byte, + certificate *proto.RoundChangeCertificate, + view *proto.View, +) *proto.Message { + panic("not implemented") +} + +// BuildPrepareMessage builds a PREPARE message based on the passed in proposal +func (c *consensusRuntime) BuildPrepareMessage(proposalHash []byte, view *proto.View) *proto.Message { + panic("not implemented") +} + +// BuildCommitMessage builds a COMMIT message based on the passed in proposal +func (c *consensusRuntime) BuildCommitMessage(proposalHash []byte, view *proto.View) *proto.Message { + panic("not implemented") +} + +// BuildRoundChangeMessage builds a ROUND_CHANGE message based on the passed in proposal +func (c *consensusRuntime) BuildRoundChangeMessage( + proposal []byte, + certificate *proto.PreparedCertificate, + view *proto.View, +) *proto.Message { + panic("not implemented") +} + +// IsValidBlock checks if the proposed block is child of parent +func (c *consensusRuntime) IsValidBlock(block []byte) bool { + panic("not implemented") +} + +// IsValidSender checks if signature is from sender +func (c *consensusRuntime) IsValidSender(msg *proto.Message) bool { + panic("not implemented") +} + +// IsProposer checks if the passed in ID is the Proposer for current view (sequence, round) +func (c *consensusRuntime) IsProposer(id []byte, height, round uint64) bool { + panic("not implemented") +} + +// IsValidProposalHash checks if the hash matches the proposal +func (c *consensusRuntime) IsValidProposalHash(proposal, hash []byte) bool { + panic("not implemented") +} + +// IsValidCommittedSeal checks if the seal for the proposal is valid +func (c *consensusRuntime) IsValidCommittedSeal(proposal []byte, committedSeal *messages.CommittedSeal) bool { + panic("not implemented") +} diff --git a/consensus/polybft/consensus_runtime_test.go b/consensus/polybft/consensus_runtime_test.go index fcf91e7caf..8095c97725 100644 --- a/consensus/polybft/consensus_runtime_test.go +++ b/consensus/polybft/consensus_runtime_test.go @@ -1,5 +1,6 @@ package polybft +/* import ( "bytes" "fmt" @@ -834,8 +835,7 @@ func Test_NewConsensusRuntime(t *testing.T) { Key: key, blockchain: &blockchainMock{}, } - runtime, err := newConsensusRuntime(hclog.NewNullLogger(), config) - assert.NoError(t, err) + runtime := newConsensusRuntime(hclog.NewNullLogger(), config) assert.False(t, runtime.isActiveValidator()) assert.Equal(t, runtime.config.DataDir, tmpDir) @@ -1761,3 +1761,4 @@ func insertTestStateSyncEvents(t *testing.T, numberOfEvents int, startIndex uint return stateSyncs } +*/ diff --git a/consensus/polybft/ibft_consensus.go b/consensus/polybft/ibft_consensus.go new file mode 100644 index 0000000000..5f7ebf743c --- /dev/null +++ b/consensus/polybft/ibft_consensus.go @@ -0,0 +1,43 @@ +package polybft + +import ( + "context" + + "github.com/0xPolygon/go-ibft/core" +) + +// IBFTConsensusWrapper is a convenience wrapper for the go-ibft package +type IBFTConsensusWrapper struct { + *core.IBFT +} + +func newIBFTConsensusWrapper( + logger core.Logger, + backend core.Backend, + transport core.Transport, +) *IBFTConsensusWrapper { + return &IBFTConsensusWrapper{ + IBFT: core.NewIBFT(logger, backend, transport), + } +} + +// runSequence starts the underlying consensus mechanism for the given height. +// It may be called by a single thread at any given time +// It returns channel which will be closed after c.IBFT.RunSequence is done +// and stopSequence function which can be used to halt c.IBFT.RunSequence routine from outside +func (c *IBFTConsensusWrapper) runSequence(height uint64) (<-chan struct{}, func()) { + sequenceDone := make(chan struct{}) + ctx, cancelSequence := context.WithCancel(context.Background()) + + go func() { + c.IBFT.RunSequence(ctx, height) + cancelSequence() + close(sequenceDone) + }() + + return sequenceDone, func() { + // stopSequence terminates the running IBFT sequence gracefully and waits for it to return + cancelSequence() + <-sequenceDone // waits until c.IBFT.RunSequenc routine finishes + } +} diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index 103d1990e0..cd08642f1c 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -284,14 +284,14 @@ func (s *stateProviderMock) Txn(ethgo.Address, ethgo.Key, []byte) (contract.Txn, return nil, nil } -var _ Transport = &transportMock{} +var _ BridgeTransport = &transportMock{} type transportMock struct { mock.Mock } -func (t *transportMock) Gossip(message interface{}) { - _ = t.Called(message) +func (t *transportMock) Multicast(msg interface{}) { + _ = t.Called(msg) } type testValidators struct { diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index d2b7267a8a..08bd5742ce 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -2,19 +2,15 @@ package polybft import ( - "context" "encoding/json" - "errors" "fmt" "os" "path/filepath" "time" - "github.com/0xPolygon/pbft-consensus" "github.com/0xPolygon/polygon-edge/chain" "github.com/0xPolygon/polygon-edge/command/rootchain/helper" "github.com/0xPolygon/polygon-edge/consensus" - "github.com/0xPolygon/polygon-edge/consensus/polybft/proto" "github.com/0xPolygon/polygon-edge/consensus/polybft/wallet" "github.com/0xPolygon/polygon-edge/contracts" "github.com/0xPolygon/polygon-edge/helper/progress" @@ -24,8 +20,6 @@ import ( "github.com/0xPolygon/polygon-edge/syncer" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" - "github.com/libp2p/go-libp2p/core/peer" - "go.opentelemetry.io/otel" ) const ( @@ -73,8 +67,8 @@ type Polybft struct { // close closes all the pbft consensus closeCh chan struct{} - // pbft is the pbft engine - pbft *pbft.Pbft + // ibft is the ibft engine + ibft *IBFTConsensusWrapper // state is reference to the struct which encapsulates consensus data persistence logic state *State @@ -101,7 +95,7 @@ type Polybft struct { syncer syncer.Syncer // topic for pbft consensus - pbftTopic *network.Topic + consensusTopic *network.Topic // topic for pbft consensus bridgeTopic *network.Topic @@ -177,48 +171,28 @@ func (p *Polybft) Initialize() error { executor: p.config.Executor, } - // initialize pbft engine - opts := []pbft.ConfigOption{ - pbft.WithLogger(p.logger.Named("Pbft"). - StandardLogger(&hclog.StandardLoggerOptions{}), - ), - pbft.WithTracer(otel.Tracer("Pbft")), - } - - // create pbft topic - p.pbftTopic, err = p.config.Network.NewTopic(pbftProto, &proto.GossipMessage{}) - if err != nil { - return fmt.Errorf("failed to create pbft topic. Error: %w", err) + // create bridge and consensus topics + if err := p.createTopics(); err != nil { + return err } - p.pbft = pbft.New(p.key, &pbftTransportWrapper{topic: p.pbftTopic}, opts...) - - // check pbft topic - listen for transport messages and relay them to pbft - err = p.pbftTopic.Subscribe(func(obj interface{}, from peer.ID) { - gossipMsg, _ := obj.(*proto.GossipMessage) - - var msg *pbft.MessageReq - if err := json.Unmarshal(gossipMsg.Data, &msg); err != nil { - p.logger.Error("pbft topic message received error", "err", err) + // set pbft topic, it will be check if/when the bridge is enabled + p.initRuntime() - return - } + // initialize pbft engine + // opts := []pbft.ConfigOption{ + // pbft.WithLogger(p.logger.Named("Pbft"). + // StandardLogger(&hclog.StandardLoggerOptions{}), + // ), + // pbft.WithTracer(otel.Tracer("Pbft")), + // } - p.pbft.PushMessage(msg) - }) + p.ibft = newIBFTConsensusWrapper(p.logger, p.runtime, p) - if err != nil { + if err := p.subscribeToIbftTopic(); err != nil { return fmt.Errorf("topic subscription failed: %w", err) } - // create bridge topic - bridgeTopic, err := p.config.Network.NewTopic(bridgeProto, &proto.TransportMessage{}) - if err != nil { - return fmt.Errorf("failed to create bridge topic. Error: %w", err) - } - // set pbft topic, it will be check if/when the bridge is enabled - p.bridgeTopic = bridgeTopic - // set block time p.blockTime = time.Duration(p.config.BlockTime) @@ -260,11 +234,16 @@ func (p *Polybft) startSyncing() error { } go func() { - nullHandler := func(b *types.Block) bool { + blockHandler := func(b *types.Block) bool { + // TODO: rename NotifyProposalInserted + // parameter should we header and not StateBlock also + // txpool.ResetWithHeaders(b.Header) should be called + p.runtime.NotifyProposalInserted(&StateBlock{Block: b}) + return false } - if err := p.syncer.Sync(nullHandler); err != nil { + if err := p.syncer.Sync(blockHandler); err != nil { panic(fmt.Errorf("failed to sync blocks. Error: %w", err)) } }() @@ -288,45 +267,33 @@ func (p *Polybft) startSealing() error { return nil } -// startRuntime starts consensus runtime -func (p *Polybft) startRuntime() error { +// initRuntime creates consensus runtime +func (p *Polybft) initRuntime() { runtimeConfig := &runtimeConfig{ - PolyBFTConfig: p.consensusConfig, - Key: p.key, - DataDir: p.dataDir, - Transport: &bridgeTransportWrapper{ - topic: p.bridgeTopic, - logger: p.logger.Named("bridge_transport"), - }, - State: p.state, - blockchain: p.blockchain, - polybftBackend: p, - txPool: p.config.TxPool, + PolyBFTConfig: p.consensusConfig, + Key: p.key, + DataDir: p.dataDir, + BridgeTransport: &runtimeTransportWrapper{p.bridgeTopic, p.logger}, + State: p.state, + blockchain: p.blockchain, + polybftBackend: p, + txPool: p.config.TxPool, } - runtime, err := newConsensusRuntime(p.logger, runtimeConfig) - if err != nil { - return err - } - - p.runtime = runtime - - if runtime.IsBridgeEnabled() { - err := p.bridgeTopic.Subscribe(func(obj interface{}, from peer.ID) { - msg, _ := obj.(*proto.TransportMessage) - var transportMsg *TransportMessage - if err := json.Unmarshal(msg.Data, &transportMsg); err != nil { - p.logger.Warn("Failed to deliver message", "err", err) + p.runtime = newConsensusRuntime(p.logger, runtimeConfig) +} - return - } +// startRuntime starts consensus runtime +func (p *Polybft) startRuntime() error { + if p.runtime.IsBridgeEnabled() { + // start bridge event tracker + if err := p.runtime.startEventTracker(); err != nil { + return fmt.Errorf("starting event tracker failed:%w", err) + } - if _, err := p.runtime.deliverMessage(transportMsg); err != nil { - p.logger.Warn("Failed to deliver message", "err", err) - } - }) - if err != nil { - return fmt.Errorf("topic subscription failed:%w", err) + // subscribe to bridge topic + if err := p.runtime.subscribeToBridgeTopic(p.bridgeTopic); err != nil { + return fmt.Errorf("bridge topic subscription failed: %w", err) } } @@ -340,87 +307,69 @@ func (p *Polybft) startPbftProcess() { return } - // subscribe to new block events - var ( - newBlockSub = p.blockchain.SubscribeEvents() - syncerBlockCh = make(chan uint64) - ) + newBlockSub := p.blockchain.SubscribeEvents() + defer newBlockSub.Close() + + syncerBlockCh := make(chan struct{}) - // Receive a notification every time syncer manages - // to insert a valid block. go func() { eventCh := newBlockSub.GetEventCh() for { select { - case ev := <-eventCh: - currentBlockNum := p.blockchain.CurrentHeader().Number - if ev.Source == "syncer" { - if ev.NewChain[0].Number < currentBlockNum { - continue - } - } - - if p.isSynced() { - syncerBlockCh <- currentBlockNum - } - case <-p.closeCh: return + case ev := <-eventCh: + // The blockchain notification system can eventually deliver + // stale block notifications. These should be ignored + if ev.Source == "syncer" && ev.NewChain[0].Number > p.blockchain.CurrentHeader().Number { + syncerBlockCh <- struct{}{} + } } } }() - defer newBlockSub.Close() - -SYNC: - if !p.isSynced() { - <-syncerBlockCh - } - - lastBlock := p.blockchain.CurrentHeader() - p.logger.Info("startPbftProcess", - "header hash", lastBlock.Hash, - "header number", lastBlock.Number) + var ( + sequenceCh <-chan struct{} + stopSequence func() + ) - currentValidators, err := p.GetValidators(lastBlock.Number, nil) - if err != nil { - p.logger.Error("failed to query current validator set", "block number", lastBlock.Number, "error", err) - } + for { + latest := p.blockchain.CurrentHeader().Number - p.runtime.setIsActiveValidator(currentValidators.ContainsNodeID(p.key.NodeID())) + currentValidators, err := p.GetValidators(latest, nil) + if err != nil { + p.logger.Error("failed to query current validator set", "block number", latest, "error", err) + } - if !p.runtime.isActiveValidator() { - // inactive validator is not part of the consensus protocol and it should just perform syncing - goto SYNC - } + isValidator := currentValidators.ContainsNodeID(p.key.NodeID()) + p.runtime.setIsActiveValidator(isValidator) - // we have to start the bridge snapshot when we have finished syncing - if err := p.runtime.restartEpoch(lastBlock); err != nil { - p.logger.Error("failed to restart epoch", "error", err) + p.config.TxPool.SetSealing(isValidator) // update tx pool - goto SYNC - } + if isValidator { + _, err := p.runtime.FSM() // Nemanja: what to do if it is an error + if err != nil { + p.logger.Error("failed to create fsm", "block number", latest, "error", err) - for { - if err := p.runCycle(); err != nil { - if errors.Is(err, errNotAValidator) { - p.logger.Info("Node is no longer in validator set") - } else { - p.logger.Error("an error occurred while running a state machine cycle.", "error", err) + continue } - goto SYNC + sequenceCh, stopSequence = p.ibft.runSequence(latest + 1) } - switch p.pbft.GetState() { - case pbft.SyncState: - // we need to go back to sync - goto SYNC - case pbft.DoneState: - // everything worked, move to the next iteration - default: - // stopped + select { + case <-syncerBlockCh: + if isValidator { + stopSequence() + p.logger.Info("canceled sequence", "sequence", latest+1) + } + case <-sequenceCh: + case <-p.closeCh: + if isValidator { + stopSequence() + } + return } } @@ -428,36 +377,7 @@ SYNC: // isSynced return true if the current header from the local storage corresponds to the highest block of syncer func (p *Polybft) isSynced() bool { - // TODO: Check could we change following condition to this: - // p.syncer.GetSyncProgression().CurrentBlock >= p.syncer.GetSyncProgression().HighestBlock - syncProgression := p.syncer.GetSyncProgression() - - return syncProgression == nil || - p.blockchain.CurrentHeader().Number >= syncProgression.HighestBlock -} - -// runCycle runs a single cycle of the state machine and indicates if node should exit the consensus or keep on running -func (p *Polybft) runCycle() error { - ff, err := p.runtime.FSM() - if err != nil { - return err - } - - if err = p.pbft.SetBackend(ff); err != nil { - return err - } - - // this cancel is not sexy - ctx, cancelFn := context.WithCancel(context.Background()) - - go func() { - <-p.closeCh - cancelFn() - }() - - p.pbft.Run(ctx) - - return nil + return false // TODO: remove this method } func (p *Polybft) waitForNPeers() bool { @@ -468,8 +388,7 @@ func (p *Polybft) waitForNPeers() bool { case <-time.After(2 * time.Second): } - numPeers := len(p.config.Network.Peers()) - if numPeers >= minSyncPeers { + if len(p.config.Network.Peers()) >= minSyncPeers { break } } @@ -617,44 +536,3 @@ func (p *Polybft) PreCommitState(_ *types.Header, _ *state.Transition) error { // Not required return nil } - -type pbftTransportWrapper struct { - topic *network.Topic -} - -func (p *pbftTransportWrapper) Gossip(msg *pbft.MessageReq) error { - data, err := json.Marshal(msg) - if err != nil { - return err - } - - return p.topic.Publish( - &proto.GossipMessage{ - Data: data, - }) -} - -type bridgeTransportWrapper struct { - topic *network.Topic - logger hclog.Logger -} - -func (b *bridgeTransportWrapper) Gossip(msg interface{}) { - data, err := json.Marshal(msg) - if err != nil { - b.logger.Warn("Failed to marshal bridge message", "err", err) - - return - } - - protoMsg := &proto.GossipMessage{ - Data: data, - } - - err = b.topic.Publish(protoMsg) - if err != nil { - b.logger.Warn("Failed to gossip bridge message", "err", err) - } -} - -var _ polybftBackend = &Polybft{} diff --git a/consensus/polybft/transport.go b/consensus/polybft/transport.go new file mode 100644 index 0000000000..ee59fa8c3c --- /dev/null +++ b/consensus/polybft/transport.go @@ -0,0 +1,117 @@ +package polybft + +import ( + "encoding/json" + "fmt" + + "github.com/0xPolygon/go-ibft/messages/proto" + pbftproto "github.com/0xPolygon/polygon-edge/consensus/polybft/proto" + "github.com/0xPolygon/polygon-edge/network" + "github.com/0xPolygon/polygon-edge/types" + "github.com/hashicorp/go-hclog" + "github.com/libp2p/go-libp2p/core/peer" +) + +// Bridge transport is an abstraction of network layer for a bridge +type BridgeTransport interface { + Multicast(msg interface{}) +} + +type runtimeTransportWrapper struct { + bridgeTopic *network.Topic + logger hclog.Logger +} + +var _ BridgeTransport = (*runtimeTransportWrapper)(nil) + +// Multicast publishes any message as pbftproto.TransportMessage +func (g *runtimeTransportWrapper) Multicast(msg interface{}) { + data, err := json.Marshal(msg) + if err != nil { + g.logger.Warn("failed to marshal bridge message", "err", err) + + return + } + + err = g.bridgeTopic.Publish(&pbftproto.TransportMessage{Data: data}) + if err != nil { + g.logger.Warn("failed to gossip bridge message", "err", err) + } +} + +// subscribeToBridgeTopic subscribes for bridge topic +func (cr *consensusRuntime) subscribeToBridgeTopic(topic *network.Topic) error { + return topic.Subscribe(func(obj interface{}, _ peer.ID) { + msg, ok := obj.(*pbftproto.TransportMessage) + if !ok { + cr.logger.Warn("failed to deliver message, invalid msg", "obj", obj) + + return + } + + var transportMsg *TransportMessage + + if err := json.Unmarshal(msg.Data, &transportMsg); err != nil { + cr.logger.Warn("failed to deliver message", "err", err, "obj", obj) + + return + } + + if _, err := cr.deliverMessage(transportMsg); err != nil { + cr.logger.Warn("failed to deliver message", "err", err) + } + }) +} + +// subscribeToIbftTopic subscribes to ibft topic +func (p *Polybft) subscribeToIbftTopic() error { + return p.consensusTopic.Subscribe(func(obj interface{}, _ peer.ID) { + // this check is from ibft impl + if !p.runtime.isActiveValidator() { + return + } + + msg, ok := obj.(*proto.Message) + if !ok { + p.logger.Error("consensus engine: invalid type assertion for message request") + + return + } + + p.ibft.AddMessage(msg) + + p.logger.Debug( + "validator message received", + "type", msg.Type.String(), + "height", msg.GetView().Height, + "round", msg.GetView().Round, + "addr", types.BytesToAddress(msg.From).String(), + ) + }) +} + +// createTopics create all topics for a PolyBft instance +func (p *Polybft) createTopics() (err error) { + if p.consensusConfig.IsBridgeEnabled() { + // create bridge topic + p.bridgeTopic, err = p.config.Network.NewTopic(bridgeProto, &pbftproto.TransportMessage{}) + if err != nil { + return fmt.Errorf("failed to create bridge topic. Error: %w", err) + } + } + + // create pbft topic + p.consensusTopic, err = p.config.Network.NewTopic(pbftProto, &proto.Message{}) + if err != nil { + return fmt.Errorf("failed to create pbft topic. Error: %w", err) + } + + return nil +} + +// Multicast is implementation of core.Transport interface +func (p *Polybft) Multicast(msg *proto.Message) { + if err := p.consensusTopic.Publish(msg); err != nil { + p.logger.Warn("failed to multicast consensus message", "err", err) + } +}