Skip to content

Commit

Permalink
Add Opencensus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lanzafame authored and anacrolix committed Apr 15, 2019
1 parent 12c9510 commit e4666c5
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 56 deletions.
64 changes: 54 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

"go.opencensus.io/metric/metricdata"

"go.opencensus.io/stats"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
"go.opencensus.io/tag"

"golang.org/x/xerrors"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
Expand Down Expand Up @@ -128,26 +136,62 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}

return &IpfsDHT{
dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
}
dht.ctx = dht.withLocalTags(ctx)

cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
stats.Record(dht.ctx, metrics.RoutingTablePeersAdded.M(1))
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
stats.Record(dht.ctx, metrics.RoutingTablePeersRemoved.M(1))
}
metrics.RoutingTableNumEntries.UpsertEntry(
func() int64 {
return int64(rt.Size())
},
metricdata.LabelValue{dht.localPeerIdTagValue(), true},
metricdata.LabelValue{dht.instanceIdTagValue(), true},
)
return dht
}

func (dht *IpfsDHT) localPeerIdTagValue() string {
return dht.self.String()
}

func (dht *IpfsDHT) instanceIdTagValue() string {
return fmt.Sprintf("%p", dht)
}

func (dht *IpfsDHT) tagMutators() []tag.Mutator {
return []tag.Mutator{
tag.Upsert(metrics.KeyLocalPeerID, dht.localPeerIdTagValue()),
tag.Upsert(metrics.KeyInstanceID, dht.instanceIdTagValue()),
}
}

func (dht *IpfsDHT) withLocalTags(ctx context.Context) context.Context {
ctx, err := tag.New(
ctx,
dht.tagMutators()...,
)
if err != nil {
panic(err)
}
return ctx
}

// putValueToPeer stores the given key/value pair at the peer 'p'
Expand Down
68 changes: 61 additions & 7 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (

ggio "github.com/gogo/protobuf/io"
ctxio "github.com/jbenet/go-context/io"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

var dhtReadMessageTimeout = time.Minute
Expand Down Expand Up @@ -55,6 +58,7 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.Context()

cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
Expand All @@ -76,6 +80,17 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
case nil:
}

startedHandling := time.Now()

stats.RecordWithTags(
ctx,
[]tag.Mutator{
tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
},
metrics.ReceivedMessages.M(1),
metrics.ReceivedMessageSizeBytes.M(int64(req.Size())),
)

handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
logger.Warningf("can't handle received message of type %v", req.GetType())
Expand Down Expand Up @@ -104,21 +119,39 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
return false
}

stats.RecordWithTags(
ctx,
[]tag.Mutator{metrics.UpsertMessageType(&req)},
metrics.InboundRequestHandlingTimeMs.M(time.Since(startedHandling).Seconds()*1000),
)
}
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
// Starts a timer for message write latency, and returns a function to be called immediately before
// writing the message.
func (dht *IpfsDHT) beginMessageWriteLatency(ctx context.Context, m *pb.Message) func() {
now := time.Now()
return func() {
stats.RecordWithTags(
ctx,
append(dht.tagMutators(), metrics.UpsertMessageType(m)),
metrics.MessageWriteLatencyMs.M(time.Since(now).Seconds()*1000),
)
}
}

// sendRequest sends out a request, but also makes sure to measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
dht.recordOutboundMessage(ctx, pmes)
beforeWrite := dht.beginMessageWriteLatency(ctx, pmes)
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
return nil, err
}

start := time.Now()

rpmes, err := ms.SendRequest(ctx, pmes)
rpmes, err := ms.SendRequest(ctx, pmes, beforeWrite)
if err != nil {
return nil, err
}
Expand All @@ -133,18 +166,29 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
dht.recordOutboundMessage(ctx, pmes)
beforeWrite := dht.beginMessageWriteLatency(ctx, pmes)
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err := ms.SendMessage(ctx, pmes, beforeWrite); err != nil {
return err
}
logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
}

func (dht *IpfsDHT) recordOutboundMessage(ctx context.Context, m *pb.Message) {
stats.RecordWithTags(
ctx,
append(dht.tagMutators(), metrics.UpsertMessageType(m)),
metrics.SentMessages.M(1),
metrics.SentMessageSizeBytes.M(int64(m.Size())),
)
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
Expand Down Expand Up @@ -244,7 +288,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
// behaviour.
const streamReuseTries = 3

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message, beforeWrite func()) error {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
Expand All @@ -253,6 +297,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
return err
}

beforeWrite()
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
Expand Down Expand Up @@ -280,7 +325,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
}
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message, beforeWrite func()) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
Expand All @@ -289,6 +334,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
return nil, err
}

beforeWrite()
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
Expand All @@ -303,6 +349,8 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
}
}

startedWaitingForResponse := time.Now()

mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
Expand All @@ -318,6 +366,12 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
}
}

stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(metrics.KeyMessageType, pmes.Type.String()),
},
metrics.OutboundRequestResponseLatencyMs.M(time.Since(startedWaitingForResponse).Seconds()*1000))

logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
Expand Down
14 changes: 8 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,31 @@ require (
github.com/gogo/protobuf v1.2.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-cid v0.0.1
github.com/ipfs/go-datastore v0.0.4
github.com/ipfs/go-datastore v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-todocounter v0.0.1
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/libp2p/go-libp2p v0.0.12
github.com/libp2p/go-libp2p v0.0.2
github.com/libp2p/go-libp2p-crypto v0.0.1
github.com/libp2p/go-libp2p-host v0.0.1
github.com/libp2p/go-libp2p-kbucket v0.0.1
github.com/libp2p/go-libp2p-net v0.0.2
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-libp2p-peerstore v0.0.2
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.0.1
github.com/libp2p/go-libp2p-protocol v0.0.1
github.com/libp2p/go-libp2p-record v0.0.1
github.com/libp2p/go-libp2p-routing v0.0.1
github.com/libp2p/go-libp2p-swarm v0.0.2
github.com/libp2p/go-libp2p-swarm v0.0.1
github.com/libp2p/go-testutil v0.0.1
github.com/mr-tron/base58 v1.1.0
github.com/multiformats/go-multiaddr v0.0.1
github.com/multiformats/go-multiaddr-dns v0.0.2
github.com/multiformats/go-multistream v0.0.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
go.opencensus.io v0.19.2
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading

0 comments on commit e4666c5

Please sign in to comment.