Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signed Peer Records #630

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type IpfsDHT struct {
successfulOutboundQueryGracePeriod time.Duration

fixLowPeersChan chan struct{}

// peerstore interface that supports signed peer records for peer addresses
ca peerstore.CertifiedAddrBook
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -286,6 +289,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.ProviderManager = pm

ca, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
if !ok {
return nil, errors.New("peerstore is not a certified addr book")
}
dht.ca = ca

return dht, nil
}

Expand Down Expand Up @@ -392,14 +401,16 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, *queryResponse, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}

// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
unsigned := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
signed := pb.PBSignedPeerRecordsToPeerRecords(pmes.GetSignedCloserPeers())
q := &queryResponse{signedPeerRecords: signed, unsignedPeers: unsigned}

if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
Expand All @@ -413,11 +424,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
err = errInvalidRecord
record = new(recpb.Record)
}
return record, peers, err
return record, q, err
}

if len(peers) > 0 {
return nil, peers, nil
if len(q.signedPeerRecords) > 0 || len(q.unsignedPeers) > 0 {
return nil, q, nil
}

return nil, nil, routing.ErrNotFound
Expand Down
23 changes: 23 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/record"

pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -84,6 +86,16 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)

Comment on lines 88 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to send all unsigned addresses from our peerstore instead of just taking the signed addresses and extracting them?

// add signed peer records too as we support them
var signedRecords []*record.Envelope
for i := range closer {
rec := dht.ca.GetPeerRecord(closer[i])
if rec != nil {
signedRecords = append(signedRecords, rec)
}
}
resp.SignedCloserPeers = pb.PeerSignedRecordsToPBSignedPeerRecords(dht.host.Network(), signedRecords)
}

return resp, nil
Expand Down Expand Up @@ -303,6 +315,17 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.M
}

resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)

Comment on lines 317 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

// add signed records too as we support them.
var signedRecords []*record.Envelope
for i := range closest {
rec := dht.ca.GetPeerRecord(closest[i])
if rec != nil {
signedRecords = append(signedRecords, rec)
}
}
resp.SignedCloserPeers = pb.PeerSignedRecordsToPBSignedPeerRecords(dht.host.Network(), signedRecords)

return resp, nil
}

Expand Down
32 changes: 27 additions & 5 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/ipfs/go-cid"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-base32"
)

Expand Down Expand Up @@ -66,6 +67,25 @@ func (lk loggableKeyBytes) String() string {
return k
}

func getAllSeenPeers(unsigned []*peer.AddrInfo, signed map[peer.ID]pb.RawSignedPeerRecordConn) []*peer.AddrInfo {
seenpeers := make(map[peer.ID]struct{}, len(signed)+len(unsigned))
allPeers := make([]*peer.AddrInfo, 0, len(signed)+len(unsigned))

for i := range unsigned {
seenpeers[unsigned[i].ID] = struct{}{}
allPeers = append(allPeers, unsigned[i])
}

for pid := range signed {
if _, ok := seenpeers[pid]; ok {
continue
}
seenpeers[pid] = struct{}{}
allPeers = append(allPeers, &peer.AddrInfo{ID: pid, Addrs: signed[pid].Addrs})
}
return allPeers
}

// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
// the K closest peers to the given key.
//
Expand All @@ -77,7 +97,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}
//TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key,
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
func(ctx context.Context, p peer.ID) (*queryResponse, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
Expand All @@ -89,16 +109,18 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

signedPeers := pb.PBSignedPeerRecordsToPeerRecords(pmes.GetSignedCloserPeers())
unsignedPeers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
Responses: getAllSeenPeers(unsignedPeers, signedPeers),
})

return peers, err
return &queryResponse{unsignedPeers: unsignedPeers, signedPeerRecords: signedPeers}, err
},
func() bool { return false },
)
Expand Down
Loading