Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide root node immediately on add and pin add #6068

Merged
merged 13 commits into from
Mar 20, 2019
8 changes: 8 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/base64"
"errors"
"github.com/ipfs/go-ipfs/provider"
"os"
"syscall"
"time"
Expand Down Expand Up @@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = resolver.NewBasicResolver(n.DAG)

// Provider
queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore())
if err != nil {
return err
}
n.Provider = provider.NewProvider(ctx, queue, n.Routing)

if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
p2p "github.com/ipfs/go-ipfs/p2p"
pin "github.com/ipfs/go-ipfs/pin"
provider "github.com/ipfs/go-ipfs/provider"
repo "github.com/ipfs/go-ipfs/repo"

bitswap "github.com/ipfs/go-bitswap"
Expand Down Expand Up @@ -124,6 +125,7 @@ type IpfsNode struct {
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.Provider // the value provider system
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher

Expand Down Expand Up @@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
return err
}

// Provider

n.Provider.Run()

// Reprovider

var keyProvider rp.KeyChanFunc

switch cfg.Reprovider.Strategy {
Expand Down
6 changes: 6 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"

bserv "github.com/ipfs/go-blockservice"
Expand Down Expand Up @@ -66,6 +67,8 @@ type CoreAPI struct {
namesys namesys.NameSystem
routing routing.IpfsRouting

provider provider.Provider

pubSub *pubsub.PubSub

checkPublishAllowed func() error
Expand Down Expand Up @@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
exchange: n.Exchange,
routing: n.Routing,

provider: n.Provider,

pubSub: n.PubSub,

nd: n,
Expand Down Expand Up @@ -210,6 +215,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

subApi.routing = offlineroute.NewOfflineRouter(subApi.repo.Datastore(), subApi.recordValidator)
subApi.namesys = namesys.NewNameSystem(subApi.routing, subApi.repo.Datastore(), cs)
subApi.provider = provider.NewOfflineProvider()

subApi.peerstore = nil
subApi.peerHost = nil
Expand Down
4 changes: 4 additions & 0 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin
return fmt.Errorf("pin: %s", err)
}

if err := api.provider.Provide(dagNode.Cid()); err != nil {
return err
}

return api.pinning.Flush()
}

Expand Down
13 changes: 13 additions & 0 deletions core/coreapi/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package coreapi

import (
cid "github.com/ipfs/go-cid"
)

// ProviderAPI brings Provider behavior to CoreAPI
type ProviderAPI CoreAPI

// Provide the given cid using the current provider
func (api *ProviderAPI) Provide(cid cid.Cid) error {
return api.provider.Provide(cid)
}
5 changes: 5 additions & 0 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
if err != nil {
return nil, err
}

if err := api.provider.Provide(nd.Cid()); err != nil {
return nil, err
}

return coreiface.IpfsPath(nd.Cid()), nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-ipfs-addr v0.0.1
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-cmdkit v0.0.1
github.com/ipfs/go-ipfs-cmds v0.0.1
Expand Down
16 changes: 16 additions & 0 deletions provider/offline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package provider

import "github.com/ipfs/go-cid"

type offlineProvider struct{}

// NewOfflineProvider creates a Provider that does nothing
func NewOfflineProvider() Provider {
return &offlineProvider{}
}

func (op *offlineProvider) Run() {}

func (op *offlineProvider) Provide(cid cid.Cid) error {
return nil
}
71 changes: 71 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Package provider implements structures and methods to provide blocks,
// keep track of which blocks are provided, and to allow those blocks to
// be reprovided.
package provider

import (
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-routing"
)

var log = logging.Logger("provider")

const provideOutgoingWorkerLimit = 8

// Provider announces blocks to the network
type Provider interface {
// Run is used to begin processing the provider work
Run()
// Provide takes a cid and makes an attempt to announce it to the network
Provide(cid.Cid) error
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
}

type provider struct {
ctx context.Context
// the CIDs for which provide announcements should be made
queue *Queue
// used to announce providing to the network
contentRouting routing.ContentRouting
}

// NewProvider creates a provider that announces blocks to the network using a content router
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
return &provider{
ctx: ctx,
queue: queue,
contentRouting: contentRouting,
}
}

// Start workers to handle provide requests.
func (p *provider) Run() {
p.handleAnnouncements()
}

// Provide the given cid using specified strategy.
func (p *provider) Provide(root cid.Cid) error {
p.queue.Enqueue(root)
return nil
}

// Handle all outgoing cids by providing (announcing) them
func (p *provider) handleAnnouncements() {
for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
go func() {
for p.ctx.Err() == nil {
select {
case <-p.ctx.Done():
return
case c := <-p.queue.Dequeue():
log.Info("announce - start - ", c)
if err := p.contentRouting.Provide(p.ctx, c, true); err != nil {
log.Warningf("Unable to provide entry: %s, %s", c, err)
}
log.Info("announce - end - ", c)
}
}
}()
}
}
79 changes: 79 additions & 0 deletions provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package provider

import (
"context"
"math/rand"
"testing"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
sync "github.com/ipfs/go-datastore/sync"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
pstore "github.com/libp2p/go-libp2p-peerstore"
)

var blockGenerator = blocksutil.NewBlockGenerator()

type mockRouting struct {
provided chan cid.Cid
}

func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error {
r.provided <- cid
return nil
}

func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo {
return nil
}

func mockContentRouting() *mockRouting {
r := mockRouting{}
r.provided = make(chan cid.Cid)
return &r
}

func TestAnnouncement(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

r := mockContentRouting()

provider := NewProvider(ctx, queue, r)
provider.Run()

cids := cid.NewSet()

for i := 0; i < 100; i++ {
c := blockGenerator.Next().Cid()
cids.Add(c)
}

go func() {
for _, c := range cids.Keys() {
err = provider.Provide(c)
// A little goroutine stirring to exercise some different states
r := rand.Intn(10)
time.Sleep(time.Microsecond * time.Duration(r))
}
}()

for cids.Len() > 0 {
select {
case cp := <-r.provided:
if !cids.Has(cp) {
t.Fatal("Wrong CID provided")
}
cids.Remove(cp)
case <-time.After(time.Second * 5):
t.Fatal("Timeout waiting for cids to be provided.")
}
}
}
Loading