Skip to content

Commit

Permalink
feat(telem)_: track total waku message bandwidth
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 3, 2024
1 parent 8a7f24b commit cd3fe37
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
github.com/waku-org/go-waku v0.8.1-0.20241129034315-afe7ed16466d
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 h1:p7tehUW7f+D6pvMJYop2yJV03SJU2fFUusmSnKL3uow=
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-waku v0.8.1-0.20241129034315-afe7ed16466d h1:BKAmjTFYHrKbPH/ej4v9rfHEeJQibWFpC0T0owXQZyQ=
github.com/waku-org/go-waku v0.8.1-0.20241129034315-afe7ed16466d/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
Expand Down
22 changes: 22 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages"
// MVDS ack received for a sent message
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -145,6 +147,10 @@ func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash s
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash})
}

func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -196,6 +202,10 @@ type MessageDeliveryConfirmed struct {
MessageHash string
}

type SentMessageTotal struct {
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -392,6 +402,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: MessageDeliveryConfirmedMetric,
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
}
case SentMessageTotal:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -567,6 +583,12 @@ func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed Messag
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *json.RawMessage {
postBody := c.commonPostBody()
postBody["size"] = sentMessageTotal.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
# github.com/waku-org/go-waku v0.8.1-0.20241129034315-afe7ed16466d
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests
Expand Down
22 changes: 17 additions & 5 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics"

commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/api/missing"
Expand All @@ -72,8 +73,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"

commonapi "github.com/waku-org/go-waku/waku/v2/api/common"

gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/types"
Expand Down Expand Up @@ -119,6 +118,7 @@ type ITelemetryClient interface {
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
PushMessageDeliveryConfirmed(ctx context.Context, messageHash string)
PushSentMessageTotal(ctx context.Context, messageSize uint32)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1114,24 +1114,32 @@ func (w *Waku) Start() error {
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
defer peerTelemetryTicker.Stop()

sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
dialErrSub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
if err != nil {
w.logger.Error("failed to subscribe to dial errors", zap.Error(err))
return
}
defer sub.Close()
defer dialErrSub.Close()

messageSentSub, err := w.node.Host().EventBus().Subscribe(new(publish.MessageSent))
if err != nil {
w.logger.Error("failed to subscribe to message sent events", zap.Error(err))
return
}

for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
case dialErr := <-sub.Out():
case dialErr := <-dialErrSub.Out():
errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error())
for _, dialError := range errors {
w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols})
}
case messageSent := <-messageSentSub.Out():
w.statusTelemetryClient.PushSentMessageTotal(w.ctx, messageSent.(publish.MessageSent).Size)
}
}
}()
Expand Down Expand Up @@ -1301,6 +1309,10 @@ func (w *Waku) startMessageSender() error {
return err
}

if w.cfg.TelemetryServerURL != "" {
sender.WithMessageSentEmitter(w.node.Host())
}

if w.cfg.EnableStoreConfirmationForMessagesSent {
msgStoredChan := make(chan gethcommon.Hash, 1000)
msgExpiredChan := make(chan gethcommon.Hash, 1000)
Expand Down

0 comments on commit cd3fe37

Please sign in to comment.