Skip to content

Commit

Permalink
chore: use go-libp2p-routing-helpers for tracing needs
Browse files Browse the repository at this point in the history
go-libp2p-routing-helpers has an optimized implementation that does nothing if we are not tracing, it also properly log all IO of the request.
  • Loading branch information
Jorropo committed Sep 4, 2023
1 parent 134f7e4 commit f901041
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 55 deletions.
4 changes: 4 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -40,6 +41,9 @@ import (
"go.uber.org/zap"
)

const tracer = tracing.Tracer("go-libp2p-kad-dht")
const dhtName = "IpfsDHT"

var (
logger = logging.Logger("dht")
baseLogger = logger.Desugar()
Expand Down
5 changes: 4 additions & 1 deletion dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *IpfsDHT) Bootstrap(ctx context.Context) (err error) {
_, end := tracer.Bootstrap(dhtName, ctx)
defer func() { end(err) }()

dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
Expand Down
45 changes: 38 additions & 7 deletions dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"

dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-routing-helpers/tracing"

"github.com/ipfs/go-cid"
kb "github.com/libp2p/go-libp2p-kbucket"
Expand All @@ -24,6 +26,9 @@ import (
"github.com/hashicorp/go-multierror"
)

const tracer = tracing.Tracer("go-libp2p-kad-dht/dual")
const dualName = "Dual"

// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
Expand Down Expand Up @@ -158,7 +163,10 @@ func (dht *DHT) WANActive() bool {
}

// Provide adds the given cid to the content routing system.
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) (err error) {
ctx, end := tracer.Provide(dualName, ctx, key, announce)
defer func() { end(err) }()

if dht.WANActive() {
return dht.WAN.Provide(ctx, key, announce)
}
Expand All @@ -174,7 +182,10 @@ func (dht *DHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStat
}

// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) {
ctx, end := tracer.FindProvidersAsync(dualName, ctx, key, count)
defer func() { ch = end(ch, nil) }()

reqCtx, cancel := context.WithCancel(ctx)
outCh := make(chan peer.AddrInfo)

Expand All @@ -185,10 +196,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)
subCtx, evtCh = routing.RegisterForQueryEvents(reqCtx)
}

subCtx, span := internal.StartSpan(subCtx, "Dual.worker")
wanCh := dht.WAN.FindProvidersAsync(subCtx, key, count)
lanCh := dht.LAN.FindProvidersAsync(subCtx, key, count)
zeroCount := (count == 0)
go func() {
defer span.End()

defer cancel()
defer close(outCh)

Expand All @@ -207,11 +221,13 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)
continue
case pi, ok = <-wanCh:
if !ok {
span.AddEvent("wan finished")
wanCh = nil
continue
}
case pi, ok = <-lanCh:
if !ok {
span.AddEvent("lan finished")
lanCh = nil
continue
}
Expand All @@ -238,7 +254,10 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)

// FindPeer searches for a peer with given ID
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (pi peer.AddrInfo, err error) {
ctx, end := tracer.FindPeer(dualName, ctx, pid)
defer func() { end(pi, err) }()

var wg sync.WaitGroup
wg.Add(2)
var wanInfo, lanInfo peer.AddrInfo
Expand Down Expand Up @@ -304,22 +323,31 @@ func combineErrors(erra, errb error) error {

// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state and remain there.
func (dht *DHT) Bootstrap(ctx context.Context) error {
func (dht *DHT) Bootstrap(ctx context.Context) (err error) {
ctx, end := tracer.Bootstrap(dualName, ctx)
defer func() { end(err) }()

erra := dht.WAN.Bootstrap(ctx)
errb := dht.LAN.Bootstrap(ctx)
return combineErrors(erra, errb)
}

// PutValue adds value corresponding to given Key.
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) (err error) {
ctx, end := tracer.PutValue(dualName, ctx, key, val, opts...)
defer func() { end(err) }()

if dht.WANActive() {
return dht.WAN.PutValue(ctx, key, val, opts...)
}
return dht.LAN.PutValue(ctx, key, val, opts...)
}

// GetValue searches for the value corresponding to given Key.
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) {
ctx, end := tracer.GetValue(dualName, ctx, key, opts...)
defer func() { end(result, err) }()

lanCtx, cancelLan := context.WithCancel(ctx)
defer cancelLan()

Expand Down Expand Up @@ -349,7 +377,10 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option)
}

// SearchValue searches for better values from this value
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) {
ctx, end := tracer.SearchValue(dualName, ctx, key, opts...)
defer func() { ch, err = end(ch, err) }()

p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.SearchValue(ctx, key, opts...)
}
Expand Down
56 changes: 34 additions & 22 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -50,6 +51,9 @@ import (

var logger = logging.Logger("fullrtdht")

const tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt")
const dhtName = "FullRT"

const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.`

// FullRT is an experimental DHT client that is under development. Expect breaking changes to occur in this client
Expand Down Expand Up @@ -358,7 +362,12 @@ func (dht *FullRT) Close() error {
return dht.ProviderManager.Close()
}

func (dht *FullRT) Bootstrap(ctx context.Context) error {
func (dht *FullRT) Bootstrap(ctx context.Context) (err error) {
_, end := tracer.Bootstrap(dhtName, ctx)
defer func() { end(err) }()

// TODO: This should block until the first crawl finish.

return nil
}

Expand Down Expand Up @@ -454,6 +463,9 @@ func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
ctx, end := tracer.PutValue(dhtName, ctx, key, value, opts...)
defer func() { end(err) }()

if !dht.enableValues {
return routing.ErrNotSupported
}
Expand Down Expand Up @@ -518,7 +530,10 @@ type RecvdVal struct {
}

// GetValue searches for the value corresponding to given Key.
func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Option) (result []byte, err error) {
ctx, end := tracer.GetValue(dhtName, ctx, key, opts...)
defer func() { end(result, err) }()

if !dht.enableValues {
return nil, routing.ErrNotSupported
}
Expand Down Expand Up @@ -552,14 +567,9 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt
}

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
var good bool
defer func() {
if !good {
span.End()
}
}()
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (ch <-chan []byte, err error) {
ctx, end := tracer.SearchValue(dhtName, ctx, key, opts...)
defer func() { ch, err = end(ch, err) }()

if !dht.enableValues {
return nil, routing.ErrNotSupported
Expand All @@ -579,9 +589,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
valCh, lookupRes := dht.getValues(ctx, key, stopCh)

out := make(chan []byte)
good = true
go func() {
defer span.End()
defer close(out)

best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
Expand Down Expand Up @@ -789,8 +797,8 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str

// Provide makes this node announce that it can provide a value for the given key
func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
ctx, span := internal.StartSpan(ctx, "FullRT.Provide", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Bool("Broadcast", brdcst)))
defer span.End()
ctx, end := tracer.Provide(dhtName, ctx, key, brdcst)
defer func() { end(err) }()

if !dht.enableProviders {
return routing.ErrNotSupported
Expand Down Expand Up @@ -932,9 +940,9 @@ func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer
return numSuccess
}

func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
ctx, span := internal.StartSpan(ctx, "FullRT.ProvideMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()
func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) (err error) {
ctx, end := tracer.ProvideMany(dhtName, ctx, keys)
defer func() { end(err) }()

if !dht.enableProviders {
return routing.ErrNotSupported
Expand Down Expand Up @@ -1220,7 +1228,10 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) (ch <-chan peer.AddrInfo) {
ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count)
defer func() { ch = end(ch, nil) }()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -1241,7 +1252,8 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key)))
// use a span here because unlike tracer.FindProvidersAsync we know who told us about it and that intresting to log.
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine")
defer span.End()

defer close(peerOut)
Expand Down Expand Up @@ -1351,9 +1363,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
}

// FindPeer searches for a peer with given ID.
func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()
func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo, err error) {
ctx, end := tracer.FindPeer(dhtName, ctx, id)
defer func() { end(pi, err) }()

if err := id.Validate(); err != nil {
return peer.AddrInfo{}, err
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/libp2p/go-libp2p v0.30.0
github.com/libp2p/go-libp2p-kbucket v0.6.3
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.2
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/libp2p/go-libp2p-xor v0.1.0
github.com/libp2p/go-msgio v0.3.0
Expand All @@ -39,6 +39,7 @@ require (
)

require (
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU=
github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down Expand Up @@ -253,8 +255,8 @@ github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEH
github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs=
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk=
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 h1:sirOYVD0wGWjkDwHZvinunIpaqPLBXkcnXApVHwZFGA=
github.com/libp2p/go-libp2p-routing-helpers v0.7.0/go.mod h1:R289GUxUMzRXIbWGSuUUTPrlVJZ3Y/pPz495+qgXJX8=
github.com/libp2p/go-libp2p-routing-helpers v0.7.2 h1:xJMFyhQ3Iuqnk9Q2dYE1eUTzsah7NLw3Qs2zjUV78T0=
github.com/libp2p/go-libp2p-routing-helpers v0.7.2/go.mod h1:cN4mJAD/7zfPKXBcs9ze31JGYAZgzdABEm+q/hkswb8=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-libp2p-xor v0.1.0 h1:hhQwT4uGrBcuAkUGXADuPltalOdpf9aag9kaYNT2tLA=
Expand Down
Loading

0 comments on commit f901041

Please sign in to comment.