Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ipni checks #66

Merged
merged 11 commits into from
Sep 19, 2024
167 changes: 117 additions & 50 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

vole "github.com/ipfs-shipyard/vole/lib"
"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand Down Expand Up @@ -38,9 +40,14 @@
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10
const (
// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
maxProvidersCount = 10

ipniSource = "IPNI"
dhtSource = "Amino DHT"
)

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
Expand Down Expand Up @@ -135,27 +142,64 @@
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
Source string
}

// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability
func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) {
cid, err := cid.Decode(cidStr)
// runCidCheck finds providers of a given CID, using the DHT and IPNI
// concurrently. A check of connectivity and Bitswap availability is performed
// for each provider found.
func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) {
crClient, err := client.New(ipniURL, client.WithStreamResultsRequired())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create content router client: %w", err)

Check warning on line 154 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L154

Added line #L154 was not covered by tests
}
routerClient := contentrouter.NewContentRoutingClient(crClient)

out := make([]providerOutput, 0, MaxProvidersCount)
queryCtx, cancelQuery := context.WithCancel(ctx)
defer cancelQuery()

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount)
// Find providers with DHT and IPNI concurrently.
provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)
ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)

out := make([]providerOutput, 0, maxProvidersCount)
var wg sync.WaitGroup
var mu sync.Mutex
var providersCount int
var done bool

for !done {
var provider peer.AddrInfo
var open bool
var source string

select {
case provider, open = <-provsCh:
if !open {
provsCh = nil
if ipniProvsCh == nil {
done = true
}
continue
}
source = dhtSource
case provider, open = <-ipniProvsCh:
if !open {
ipniProvsCh = nil
if provsCh == nil {
done = true

Check warning on line 190 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L190

Added line #L190 was not covered by tests
}
continue
}
source = ipniSource

Check warning on line 194 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L194

Added line #L194 was not covered by tests
}
providersCount++
if providersCount == maxProvidersCount {
done = true

Check warning on line 198 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L198

Added line #L198 was not covered by tests
}

for provider := range provsCh {
wg.Add(1)
go func(provider peer.AddrInfo) {
go func(provider peer.AddrInfo, src string) {
defer wg.Done()

outputAddrs := []string{}
Expand Down Expand Up @@ -183,6 +227,7 @@
ID: provider.ID.String(),
Addrs: outputAddrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
Source: src,
}

testHost, err := d.createTestHost()
Expand All @@ -196,7 +241,7 @@
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15)
defer dialCancel()

testHost.Connect(dialCtx, provider)
_ = testHost.Connect(dialCtx, provider)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")

Expand All @@ -205,7 +250,7 @@
} else {
// since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter
p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String())
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr)
provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cidKey, p2pAddr)

for _, c := range testHost.Network().ConnsToPeer(provider.ID) {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String())
Expand All @@ -215,8 +260,9 @@
mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}(provider, source)
}
cancelQuery()

// Wait for all goroutines to finish
wg.Wait()
Expand All @@ -225,44 +271,41 @@
}

type peerCheckOutput struct {
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
ConnectionError string
PeerFoundInDHT map[string]int
ProviderRecordFromPeerInDHT bool
ProviderRecordFromPeerInIPNI bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
return nil, err
}

ai, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
return nil, err
}
func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *peer.AddrInfo, c cid.Cid, ipniURL string) (*peerCheckOutput, error) {
addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)

// User has only passed a PeerID without any maddrs
onlyPeerID := len(ai.Addrs) == 0
var inDHT, inIPNI bool
var wg sync.WaitGroup
wg.Add(2)
go func() {
inDHT = providerRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)
wg.Done()
}()
go func() {
inIPNI = providerRecordFromPeerInIPNI(ctx, ipniURL, c, ai.ID)
wg.Done()
}()
wg.Wait()

c, err := cid.Decode(cidStr)
if err != nil {
return nil, err
out := &peerCheckOutput{
ProviderRecordFromPeerInDHT: inDHT,
ProviderRecordFromPeerInIPNI: inIPNI,
PeerFoundInDHT: addrMap,
}

out := &peerCheckOutput{}

connectionFailed := false

out.ProviderRecordFromPeerInDHT = ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
var connectionFailed bool

// If peerID given,but no addresses check the DHT
if onlyPeerID {
if len(ai.Addrs) == 0 {
if peerAddrDHTErr != nil {
// PeerID is not resolvable via the DHT
connectionFailed = true
Expand All @@ -288,7 +331,7 @@
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

testHost.Connect(dialCtx, *ai)
_ = testHost.Connect(dialCtx, *ai)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
Expand Down Expand Up @@ -343,9 +386,6 @@
return nil, err
}

wg := sync.WaitGroup{}
wg.Add(len(closestPeers))

resCh := make(chan *peer.AddrInfo, len(closestPeers))

numSuccessfulResponses := execOnMany(ctx, 0.3, time.Second*3, func(ctx context.Context, peerToQuery peer.ID) error {
Expand Down Expand Up @@ -380,7 +420,7 @@
return addrMap, nil
}

func ProviderRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
func providerRecordFromPeerInDHT(ctx context.Context, d kademlia, c cid.Cid, p peer.ID) bool {
queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
provsCh := d.FindProvidersAsync(queryCtx, c, 0)
Expand All @@ -399,6 +439,33 @@
}
}

func providerRecordFromPeerInIPNI(ctx context.Context, ipniURL string, c cid.Cid, p peer.ID) bool {
crClient, err := client.New(ipniURL, client.WithStreamResultsRequired())
if err != nil {
log.Printf("failed to creat content router client: %s\n", err)
return false

Check warning on line 446 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L445-L446

Added lines #L445 - L446 were not covered by tests
}
routerClient := contentrouter.NewContentRoutingClient(crClient)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()

provsCh := routerClient.FindProvidersAsync(queryCtx, c, 0)
for {
select {
case prov, ok := <-provsCh:
if !ok {
return false
}
if prov.ID == p {
return true

Check warning on line 461 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L460-L461

Added lines #L460 - L461 were not covered by tests
}
case <-ctx.Done():
return false

Check warning on line 464 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L463-L464

Added lines #L463 - L464 were not covered by tests
}
}
}

// Taken from the FullRT DHT client implementation
//
// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
Expand Down
37 changes: 20 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ require (
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/libp2p/go-libp2p v0.36.2
github.com/libp2p/go-libp2p v0.36.3
github.com/libp2p/go-libp2p-kad-dht v0.26.1
github.com/libp2p/go-libp2p-mplex v0.9.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.4
github.com/libp2p/go-msgio v0.3.0
github.com/multiformats/go-multiaddr v0.13.0
github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_golang v1.20.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.3
)
Expand Down Expand Up @@ -55,6 +55,7 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -95,7 +96,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/miekg/dns v1.1.61 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand All @@ -110,20 +111,20 @@ require (
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo/v2 v2.19.1 // indirect
github.com/onsi/ginkgo/v2 v2.20.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/interceptor v0.1.30 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.8 // indirect
github.com/pion/sctp v1.8.20 // indirect
github.com/pion/rtp v1.8.9 // indirect
github.com/pion/sctp v1.8.33 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.20 // indirect
github.com/pion/stun v0.6.1 // indirect
Expand All @@ -137,18 +138,20 @@ require (
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.45.2 // indirect
github.com/quic-go/quic-go v0.46.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/smartystreets/assertions v1.13.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.55.0 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/wlynxg/anet v0.0.3 // indirect
github.com/wlynxg/anet v0.0.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
Expand All @@ -160,19 +163,19 @@ require (
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.22.1 // indirect
go.uber.org/dig v1.18.0 // indirect
go.uber.org/fx v1.22.2 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/tools v0.24.0 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading
Loading