Skip to content

Commit

Permalink
Update for new p2p.Run API
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 1, 2024
1 parent 97c30da commit 4ca598b
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 9 deletions.
1 change: 1 addition & 0 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
nodeName,
g.gk,
g.obsvC,
g.batchObsvC.writeC,
g.signedInC.writeC,
g.obsvReqC.writeC,
g.gossipSendC,
Expand Down
7 changes: 3 additions & 4 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
pubsub.WithEventTracer(ourTracer),
// TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that.
// pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithPeerOutboundQueueSize(1000000),
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -688,11 +687,11 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if batchObsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil {
if params.batchObsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if components.WarnChannelOverflow {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
Expand Down
15 changes: 14 additions & 1 deletion node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
// obsvC is optional and can be set with `WithSignedObservationListener`.
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

// obsvReqC is optional and can be set with `WithObservationRequestListener`.
obsvReqC chan<- *gossipv1.ObservationRequest

Expand Down Expand Up @@ -95,14 +98,22 @@ func NewRunParams(
return p, nil
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation messages.
// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvC = obsvC
return nil
}
}

// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
p.batchObsvC = batchObsvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
Expand Down Expand Up @@ -148,6 +159,7 @@ func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqC chan<- *gossipv1.ObservationRequest,
gossipSendC chan []byte,
Expand All @@ -169,6 +181,7 @@ func WithGuardianOptions(
p.nodeName = nodeName
p.gk = gk
p.obsvC = obsvC
p.batchObsvC = batchObsvC
p.signedInC = signedInC
p.obsvReqC = obsvReqC
p.gossipSendC = gossipSendC
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NotNil(t, gk)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
gossipSendC := make(chan []byte, 42)
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
nodeName,
gk,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
gossipSendC,
Expand Down
1 change: 1 addition & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.nodeName,
g.gk,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
g.sendC,
Expand Down
8 changes: 4 additions & 4 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func (p *Processor) broadcastSignature(
MessageId: msgId,
}

p.postObservationToBatch(ourObs)
// p.postObservationToBatch(ourObs) // TODO: This will be enabled as part of the gossip split PR.

// Post the observation in its own gossip message. TODO: Remove this once everyone has migrated to batches.
// Post the observation in its own gossip message.
obsv := gossipv1.SignedObservation{
Addr: addr,
Hash: digest.Bytes(),
Expand All @@ -70,7 +70,7 @@ func (p *Processor) broadcastSignature(
}

// Broadcast the observation.
p.gossipSendC <- msg // TODO: Get rid of this
p.gossipSendC <- msg
observationsBroadcast.Inc()

hash := hex.EncodeToString(digest.Bytes())
Expand All @@ -85,7 +85,7 @@ func (p *Processor) broadcastSignature(
}

p.state.signatures[hash].ourObservation = o
p.state.signatures[hash].ourObs = ourObs
// p.state.signatures[hash].ourObs = ourObs // TODO: This will be enabled as part of the gossip split PR.
p.state.signatures[hash].ourMsg = msg
p.state.signatures[hash].txHash = txhash
p.state.signatures[hash].source = o.GetEmitterChain().String()
Expand Down

0 comments on commit 4ca598b

Please sign in to comment.