Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics via Prometheus #313

Closed
wants to merge 10 commits into from
Closed
22 changes: 12 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}

return &IpfsDHT{
dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
Expand All @@ -148,6 +139,17 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
}
cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
routingTablePeersAdded.WithLabelValues(dht.instanceLabelValues()...).Inc()
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
routingTablePeersRemoved.WithLabelValues(dht.instanceLabelValues()...).Inc()
}
dht.initRoutingTableNumEntriesGaugeFunc()
return dht
}

// putValueToPeer stores the given key/value pair at the peer 'p'
Expand Down
80 changes: 67 additions & 13 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
case nil:
}

startedHandling := time.Now()

receivedMessages.WithLabelValues(dht.messageLabelValues(&req)...).Inc()
receivedMessageSizeBytes.WithLabelValues(dht.messageLabelValues(&req)...).Observe(float64(req.Size()))

handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
logger.Warningf("can't handle received message of type %v", req.GetType())
Expand Down Expand Up @@ -103,48 +108,95 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
logger.Debugf("error writing response: %v", err)
return false
}
inboundRequestHandlingTimeSeconds.WithLabelValues(dht.messageLabelValues(&req)...).Observe(time.Since(startedHandling).Seconds())
}
}

// Starts a timer for message write latency, and returns a function to be called immediately before
// writing the message.
func (dht *IpfsDHT) beginMessageWriteLatency(ctx context.Context, m *pb.Message) func() {
now := time.Now()
return func() {
messageWriteLatencySeconds.WithLabelValues(dht.messageLabelValues(m)...).Observe(time.Since(now).Seconds())
}
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
func (dht *IpfsDHT) newStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
t := time.Now()
s, err := dht.host.NewStream(ctx, p, dht.protocols...)
if err == nil {
newStreamTimeSeconds.WithLabelValues(dht.instanceLabelValues()...).Observe(time.Since(t).Seconds())
} else {
newStreamTimeErrorSeconds.WithLabelValues(dht.instanceLabelValues()...).Observe(time.Since(t).Seconds())
}
return s, err
}

func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, req *pb.Message) (resp *pb.Message, err error) {
dht.recordOutboundMessage(ctx, req)
beforeWrite := dht.beginMessageWriteLatency(ctx, req)
started := time.Now()
defer func() {
var errStr string
if err != nil {
errStr = err.Error()
}
outboundRequestResponseLatencySeconds.WithLabelValues(
append(dht.messageLabelValues(req), errStr)...,
).Observe(time.Since(started).Seconds())
}()
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
return nil, err
}

start := time.Now()

rpmes, err := ms.SendRequest(ctx, pmes)
resp, err = ms.SendRequest(ctx, req, beforeWrite)
if err != nil {
return nil, err
return
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)
dht.updateFromMessage(ctx, p, resp)

dht.peerstore.RecordLatency(p, time.Since(start))
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
return rpmes, nil
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, resp)
return resp, nil
}

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) (err error) {
dht.recordOutboundMessage(ctx, pmes)
beforeWrite := dht.beginMessageWriteLatency(ctx, pmes)
started := time.Now()
defer func() {
var errStr string
if err != nil {
errStr = err.Error()
}
sendMessageLatencySeconds.WithLabelValues(
append(dht.messageLabelValues(pmes), errStr)...,
).Observe(time.Since(started).Seconds())
}()
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err := ms.SendMessage(ctx, pmes, beforeWrite); err != nil {
return err
}
logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
}

func (dht *IpfsDHT) recordOutboundMessage(ctx context.Context, m *pb.Message) {
lvs := dht.messageLabelValues(m)
sentMessages.WithLabelValues(lvs...).Inc()
sentMessageSizeBytes.WithLabelValues(lvs...).Observe(float64(m.Size()))
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
Expand Down Expand Up @@ -227,7 +279,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
nstr, err := ms.dht.newStream(ctx, ms.p)
if err != nil {
return err
}
Expand All @@ -244,7 +296,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
// behaviour.
const streamReuseTries = 3

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message, beforeWrite func()) error {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
Expand All @@ -253,6 +305,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
return err
}

beforeWrite()
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
Expand Down Expand Up @@ -280,7 +333,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
}
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message, beforeWrite func()) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
Expand All @@ -289,6 +342,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
return nil, err
}

beforeWrite()
if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/libp2p/go-libp2p-host v0.0.1
github.com/libp2p/go-libp2p-kbucket v0.0.1
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-libp2p-peer v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.0.1
github.com/libp2p/go-libp2p-protocol v0.0.1
github.com/libp2p/go-libp2p-record v0.0.1
Expand All @@ -26,7 +26,11 @@ require (
github.com/multiformats/go-multiaddr v0.0.1
github.com/multiformats/go-multiaddr-dns v0.0.2
github.com/multiformats/go-multistream v0.0.1
github.com/prometheus/client_golang v0.9.3-0.20190325082328-7490f0a74525
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 // indirect
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3
gopkg.in/yaml.v2 v2.2.2 // indirect
)
Loading