Skip to content

Commit

Permalink
coreapi/dht: Review, cleanup
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Mar 31, 2018
1 parent 77f6604 commit e326b7f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 145 deletions.
145 changes: 11 additions & 134 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import (
dag "github.com/ipfs/go-ipfs/merkledag"

routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
ipdht "gx/ipfs/QmY1y2M1aCcVhy8UuTbZJBvuFbegZm47f9cDAdgxiehQfx/go-libp2p-kad-dht"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
Expand All @@ -23,73 +20,16 @@ var ErrNotDHT = errors.New("routing service is not a DHT")

type DhtAPI CoreAPI

func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) {
dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

outChan := make(chan ma.Multiaddr)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

go func() {
defer close(outChan)

sendAddrs := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
for _, addr := range response.Addrs {
select {
case outChan <- addr:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

for event := range events {
if event.Type == notif.FinalPeer {
err := sendAddrs(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
pi, err := dht.FindPeer(ctx, peer.ID(p))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}

notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
Responses: []*pstore.PeerInfo{&pi},
})
}()

return outChan, nil
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
return api.node.Routing.FindPeer(ctx, p)
}

func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) {
func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}

dht, ok := api.node.Routing.(*ipdht.IpfsDHT)
if !ok {
return nil, ErrNotDHT
}

p, err = api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
Expand All @@ -99,50 +39,10 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...

numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
return nil, fmt.Errorf("number of providers to find must be greater than 0")
}

outChan := make(chan peer.ID)
events := make(chan *notif.QueryEvent)
ctx = notif.RegisterForQueryEvents(ctx, events)

pchan := dht.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)

sendProviders := func(responses []*pstore.PeerInfo) error {
for _, response := range responses {
select {
case outChan <- response.ID:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

for event := range events {
if event.Type == notif.Provider {
err := sendProviders(event.Responses)
if err != nil {
return
}
}
}
}()

go func() {
defer close(events)
for p := range pchan {
np := p
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Provider,
Responses: []*pstore.PeerInfo{&np},
})
}
}()

return outChan, nil
return api.node.Routing.FindProvidersAsync(ctx, c, numProviders), nil
}

func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
Expand All @@ -155,43 +55,19 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
return errors.New("cannot provide in offline mode")
}

if len(api.node.PeerHost.Network().Conns()) == 0 {
return errors.New("cannot provide, no connected peers")
}

c := path.Cid()

has, err := api.node.Blockstore.Has(c)
has, err := api.node.Blockstore.Has(path.Cid())
if err != nil {
return err
}

if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
return fmt.Errorf("block %s not found locally, cannot provide", path.Cid())
}

//TODO: either remove or use
//outChan := make(chan interface{})

//events := make(chan *notif.QueryEvent)
//ctx = notif.RegisterForQueryEvents(ctx, events)

/*go func() {
defer close(outChan)
for range events {
select {
case <-ctx.Done():
return
default:
}
}
}()*/

//defer close(events)
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c})
err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{path.Cid()})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{path.Cid()})
}
if err != nil {
return err
Expand All @@ -211,10 +87,11 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error {
provided := cid.NewSet()
provided := cid.NewSet() //TODO: Use a bloom filter
for _, c := range cids {
kset := cid.NewSet()

//TODO: After https://github.com/ipfs/go-ipfs/pull/4333 is merged, use n.Provider for this
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
Expand Down
23 changes: 15 additions & 8 deletions core/coreapi/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
)
Expand All @@ -25,7 +26,10 @@ func TestDhtFindPeer(t *testing.T) {
t.Fatal(err)
}

addr := <-out
var addr ma.Multiaddr
if len(out.Addrs) > 0 {
addr = out.Addrs[0]
}

if addr == nil || addr.String() != "/ip4/127.0.0.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", addr)
Expand All @@ -36,7 +40,10 @@ func TestDhtFindPeer(t *testing.T) {
t.Fatal(err)
}

addr = <-out
addr = nil
if len(out.Addrs) > 0 {
addr = out.Addrs[0]
}

if addr == nil || addr.String() != "/ip4/127.0.2.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", addr)
Expand All @@ -62,8 +69,8 @@ func TestDhtFindProviders(t *testing.T) {

provider := <-out

if provider.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String())
if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}

Expand Down Expand Up @@ -91,8 +98,8 @@ func TestDhtProvide(t *testing.T) {

provider := <-out

if provider.String() != "<peer.ID >" {
t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String())
if provider.ID.String() != "<peer.ID >" {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}

err = apis[0].Dht().Provide(ctx, p)
Expand All @@ -107,7 +114,7 @@ func TestDhtProvide(t *testing.T) {

provider = <-out

if provider.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String())
if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}
6 changes: 3 additions & 3 deletions core/coreapi/interface/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
)

// DhtAPI specifies the interface to the DHT
type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a
// Peer ID
FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error)
FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)

// FindProviders finds peers in the DHT who can provide a specific value
// given a key.
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan peer.ID, error) //TODO: is path the right choice here?
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error)

// Provide announces to the network that you are providing given values
Provide(context.Context, Path, ...options.DhtProvideOption) error
Expand Down

0 comments on commit e326b7f

Please sign in to comment.