Skip to content

Commit

Permalink
feat: add basic tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Apr 28, 2022
1 parent f056971 commit 23090b7
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 67 deletions.
17 changes: 15 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
Expand Down Expand Up @@ -636,6 +638,9 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
Expand All @@ -645,14 +650,17 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
case <-ctx.Done():
return
}
}
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
_, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand All @@ -667,7 +675,10 @@ func (dht *IpfsDHT) fixRTIfNeeded() {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
_, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand Down Expand Up @@ -816,6 +827,8 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping")
defer span.End()
return dht.protoMessenger.Ping(ctx, p)
}

Expand Down
40 changes: 28 additions & 12 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ import (
"github.com/libp2p/go-libp2p-xor/kademlia"
kadkey "github.com/libp2p/go-libp2p-xor/key"
"github.com/libp2p/go-libp2p-xor/trie"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var Tracer = otel.Tracer("")

var logger = logging.Logger("fullrtdht")

// FullRT is an experimental DHT client that is under development. Expect breaking changes to occur in this client
Expand Down Expand Up @@ -338,6 +335,9 @@ func (dht *FullRT) Bootstrap(ctx context.Context) error {

// CheckPeers return (success, total)
func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int) {
ctx, span := internal.StartSpan(ctx, "FullRT.CheckPeers", trace.WithAttributes(attribute.Int("NumPeers", len(peers))))
defer span.End()

var peerAddrs chan interface{}
var total int
if len(peers) == 0 {
Expand Down Expand Up @@ -395,8 +395,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
}

func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := Tracer.Start(ctx, "GetClosestPeers")
_ = ctx // not used, but we want to assign it _just_ in case we use it.
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

kbID := kb.ConvertKey(key)
Expand Down Expand Up @@ -525,6 +524,9 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

if !dht.enableValues {
return nil, routing.ErrNotSupported
}
Expand All @@ -545,6 +547,9 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
out := make(chan []byte)
go func() {
defer close(out)
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker")
defer span.End()

best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
return
Expand All @@ -566,7 +571,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
dht.updatePeerValues(ctx, key, best, updatePeers)
cancel()
}()
Expand Down Expand Up @@ -750,7 +755,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str

// Provide makes this node announce that it can provide a value for the given key
func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
ctx, span := Tracer.Start(ctx, "Provide")
ctx, span := internal.StartSpan(ctx, "FullRT.Provide", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Bool("Broadcast", brdcst)))
defer span.End()

if !dht.enableProviders {
Expand Down Expand Up @@ -828,9 +833,6 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
ctx, span := Tracer.Start(ctx, "execOnMany")
defer span.End()

if len(peers) == 0 {
return 0
}
Expand Down Expand Up @@ -894,7 +896,7 @@ func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer
}

func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
ctx, span := Tracer.Start(ctx, "ProvideMany")
ctx, span := internal.StartSpan(ctx, "FullRT.ProvideMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()

if !dht.enableProviders {
Expand Down Expand Up @@ -930,6 +932,9 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
}

func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
ctx, span := internal.StartSpan(ctx, "FullRT.PutMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()

if !dht.enableValues {
return routing.ErrNotSupported
}
Expand Down Expand Up @@ -958,7 +963,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
}

func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
ctx, span := Tracer.Start(ctx, "bulkMessageSend", trace.WithAttributes(attribute.Int("numKeys", len(keys))))
ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend")
defer span.End()

if len(keys) == 0 {
Expand Down Expand Up @@ -1160,6 +1165,9 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c)))
defer span.End()

if !dht.enableProviders {
return nil, routing.ErrNotSupported
} else if !c.Defined() {
Expand All @@ -1179,6 +1187,9 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count)))
defer span.End()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -1200,6 +1211,8 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting")
defer span.End()

findAll := count == 0
var ps *peer.Set
Expand Down Expand Up @@ -1288,6 +1301,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.

// FindPeer searches for a peer with given ID.
func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

if err := id.Validate(); err != nil {
return peer.AddrInfo{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ require (
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/otel v1.6.3
go.opentelemetry.io/otel/trace v1.6.3
go.uber.org/zap v1.19.0
)
23 changes: 13 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
Expand Down Expand Up @@ -234,8 +239,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -848,8 +854,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -888,14 +895,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw=
go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw=
go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel v1.6.3 h1:FLOfo8f9JzFVFVyU+MSRJc2HdEAXQgm7pIv2uFKRSZE=
go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI=
go.opentelemetry.io/otel/trace v1.6.3 h1:IqN4L+5b0mPNjdXIiZ90Ni4Bl5BRkDQywePLWemd9bc=
go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
13 changes: 13 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...)
}
6 changes: 6 additions & 0 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

kb "github.com/libp2p/go-libp2p-kbucket"
)
Expand All @@ -17,6 +20,9 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down
7 changes: 7 additions & 0 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-kad-dht/internal"
peerstoreImpl "github.com/libp2p/go-libp2p-peerstore"

lru "github.com/hashicorp/golang-lru/simplelru"
Expand Down Expand Up @@ -231,6 +232,9 @@ func (pm *ProviderManager) run(ctx context.Context, proc goprocess.Process) {

// AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error {
ctx, span := internal.StartSpan(ctx, "ProviderManager.AddProvider")
defer span.End()

if provInfo.ID != pm.self { // don't add own addrs.
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL)
}
Expand Down Expand Up @@ -278,6 +282,9 @@ func mkProvKey(k []byte) string {
// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "ProviderManager.GetProviders")
defer span.End()

gp := &getProv{
ctx: ctx,
key: k,
Expand Down
Loading

0 comments on commit 23090b7

Please sign in to comment.