diff --git a/core/builder.go b/core/builder.go index 3ddc58f3c33..13dce637d5f 100644 --- a/core/builder.go +++ b/core/builder.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/base64" "errors" + "github.com/ipfs/go-ipfs/provider" "os" "syscall" "time" @@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = resolver.NewBasicResolver(n.DAG) + // Provider + queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore()) + if err != nil { + return err + } + n.Provider = provider.NewProvider(ctx, queue, n.Routing) + if cfg.Online { if err := n.startLateOnlineServices(ctx); err != nil { return err diff --git a/core/core.go b/core/core.go index 0817118b371..0d41b737874 100644 --- a/core/core.go +++ b/core/core.go @@ -28,6 +28,7 @@ import ( ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" p2p "github.com/ipfs/go-ipfs/p2p" pin "github.com/ipfs/go-ipfs/pin" + provider "github.com/ipfs/go-ipfs/provider" repo "github.com/ipfs/go-ipfs/repo" bitswap "github.com/ipfs/go-bitswap" @@ -124,6 +125,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes + Provider provider.Provider // the value provider system Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher @@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } + // Provider + + n.Provider.Run() + + // Reprovider + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index c5ba1b56672..f22803f9216 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -21,6 +21,7 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" bserv "github.com/ipfs/go-blockservice" @@ -66,6 +67,8 @@ type CoreAPI struct { namesys namesys.NameSystem routing routing.IpfsRouting + provider provider.Provider + pubSub *pubsub.PubSub checkPublishAllowed func() error @@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e exchange: n.Exchange, routing: n.Routing, + provider: n.Provider, + pubSub: n.PubSub, nd: n, @@ -210,6 +215,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e subApi.routing = offlineroute.NewOfflineRouter(subApi.repo.Datastore(), subApi.recordValidator) subApi.namesys = namesys.NewNameSystem(subApi.routing, subApi.repo.Datastore(), cs) + subApi.provider = provider.NewOfflineProvider() subApi.peerstore = nil subApi.peerHost = nil diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 1976517dd90..df478732c16 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin return fmt.Errorf("pin: %s", err) } + if err := api.provider.Provide(dagNode.Cid()); err != nil { + return err + } + return api.pinning.Flush() } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go new file mode 100644 index 00000000000..8148c87892e --- /dev/null +++ b/core/coreapi/provider.go @@ -0,0 +1,13 @@ +package coreapi + +import ( + cid "github.com/ipfs/go-cid" +) + +// ProviderAPI brings Provider behavior to CoreAPI +type ProviderAPI CoreAPI + +// Provide the given cid using the current provider +func (api *ProviderAPI) Provide(cid cid.Cid) error { + return api.provider.Provide(cid) +} diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index e26c755b935..c840280bbca 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options if err != nil { return nil, err } + + if err := api.provider.Provide(nd.Cid()); err != nil { + return nil, err + } + return coreiface.IpfsPath(nd.Cid()), nil } diff --git a/go.mod b/go.mod index 06f13475bbe..28671188a74 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/ipfs/go-fs-lock v0.0.1 github.com/ipfs/go-ipfs-addr v0.0.1 github.com/ipfs/go-ipfs-blockstore v0.0.1 + github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 github.com/ipfs/go-ipfs-cmdkit v0.0.1 github.com/ipfs/go-ipfs-cmds v0.0.1 diff --git a/provider/offline.go b/provider/offline.go new file mode 100644 index 00000000000..029ddfa9889 --- /dev/null +++ b/provider/offline.go @@ -0,0 +1,16 @@ +package provider + +import "github.com/ipfs/go-cid" + +type offlineProvider struct{} + +// NewOfflineProvider creates a Provider that does nothing +func NewOfflineProvider() Provider { + return &offlineProvider{} +} + +func (op *offlineProvider) Run() {} + +func (op *offlineProvider) Provide(cid cid.Cid) error { + return nil +} diff --git a/provider/provider.go b/provider/provider.go new file mode 100644 index 00000000000..f9aa4ed7820 --- /dev/null +++ b/provider/provider.go @@ -0,0 +1,71 @@ +// Package provider implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package provider + +import ( + "context" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-routing" +) + +var log = logging.Logger("provider") + +const provideOutgoingWorkerLimit = 8 + +// Provider announces blocks to the network +type Provider interface { + // Run is used to begin processing the provider work + Run() + // Provide takes a cid and makes an attempt to announce it to the network + Provide(cid.Cid) error +} + +type provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *Queue + // used to announce providing to the network + contentRouting routing.ContentRouting +} + +// NewProvider creates a provider that announces blocks to the network using a content router +func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider { + return &provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + } +} + +// Start workers to handle provide requests. +func (p *provider) Run() { + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *provider) Provide(root cid.Cid) error { + p.queue.Enqueue(root) + return nil +} + +// Handle all outgoing cids by providing (announcing) them +func (p *provider) handleAnnouncements() { + for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + go func() { + for p.ctx.Err() == nil { + select { + case <-p.ctx.Done(): + return + case c := <-p.queue.Dequeue(): + log.Info("announce - start - ", c) + if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { + log.Warningf("Unable to provide entry: %s, %s", c, err) + } + log.Info("announce - end - ", c) + } + } + }() + } +} diff --git a/provider/provider_test.go b/provider/provider_test.go new file mode 100644 index 00000000000..7ef007b03a7 --- /dev/null +++ b/provider/provider_test.go @@ -0,0 +1,79 @@ +package provider + +import ( + "context" + "math/rand" + "testing" + "time" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + sync "github.com/ipfs/go-datastore/sync" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +type mockRouting struct { + provided chan cid.Cid +} + +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + r.provided <- cid + return nil +} + +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { + return nil +} + +func mockContentRouting() *mockRouting { + r := mockRouting{} + r.provided = make(chan cid.Cid) + return &r +} + +func TestAnnouncement(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + provider := NewProvider(ctx, queue, r) + provider.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = provider.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} diff --git a/provider/queue.go b/provider/queue.go new file mode 100644 index 00000000000..a3268e10933 --- /dev/null +++ b/provider/queue.go @@ -0,0 +1,181 @@ +package provider + +import ( + "context" + "math" + "strconv" + "strings" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" +) + +// Queue provides a durable, FIFO interface to the datastore for storing cids +// +// Durability just means that cids in the process of being provided when a +// crash or shutdown occurs will still be in the queue when the node is +// brought back online. +type Queue struct { + // used to differentiate queues in datastore + // e.g. provider vs reprovider + name string + ctx context.Context + tail uint64 + head uint64 + ds datastore.Datastore // Must be threadsafe + dequeue chan cid.Cid + enqueue chan cid.Cid +} + +// NewQueue creates a queue for cids +func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) + head, tail, err := getQueueHeadTail(ctx, name, namespaced) + if err != nil { + return nil, err + } + q := &Queue{ + name: name, + ctx: ctx, + head: head, + tail: tail, + ds: namespaced, + dequeue: make(chan cid.Cid), + enqueue: make(chan cid.Cid), + } + q.work() + return q, nil +} + +// Enqueue puts a cid in the queue +func (q *Queue) Enqueue(cid cid.Cid) { + select { + case q.enqueue <- cid: + case <-q.ctx.Done(): + } +} + +// Dequeue returns a channel that if listened to will remove entries from the queue +func (q *Queue) Dequeue() <-chan cid.Cid { + return q.dequeue +} + +type entry struct { + cid cid.Cid + key datastore.Key +} + +// Look for next Cid in the queue and return it. Skip over gaps and mangled data +func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { + for { + if q.head >= q.tail { + return datastore.Key{}, cid.Undef + } + + key := q.queueKey(q.head) + value, err := q.ds.Get(key) + + if err == datastore.ErrNotFound { + log.Warningf("Error missing entry in queue: %s", key) + q.head++ // move on + continue + } else if err != nil { + log.Warningf("Error fetching from queue: %s", err) + continue + } + + c, err := cid.Parse(value) + if err != nil { + log.Warningf("Error marshalling Cid from queue: ", err) + q.head++ + err = q.ds.Delete(key) + continue + } + + return key, c + } +} + +// Run dequeues and enqueues when available. +func (q *Queue) work() { + go func() { + var k datastore.Key = datastore.Key{} + var c cid.Cid = cid.Undef + + for { + if c == cid.Undef { + k, c = q.nextEntry() + } + + // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + var dequeue chan cid.Cid + if c != cid.Undef { + dequeue = q.dequeue + } + + select { + case toQueue := <-q.enqueue: + nextKey := q.queueKey(q.tail) + + if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { + log.Errorf("Failed to enqueue cid: %s", err) + continue + } + + q.tail++ + case dequeue <- c: + err := q.ds.Delete(k) + + if err != nil { + log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) + continue + } + c = cid.Undef + q.head++ + case <-q.ctx.Done(): + return + } + } + }() +} + +func (q *Queue) queueKey(id uint64) datastore.Key { + return datastore.NewKey(strconv.FormatUint(id, 10)) +} + +// crawl over the queue entries to find the head and tail +func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) { + q := query.Query{} + results, err := datastore.Query(q) + if err != nil { + return 0, 0, err + } + + var tail uint64 + var head uint64 = math.MaxUint64 + for entry := range results.Next() { + trimmed := strings.TrimPrefix(entry.Key, "/") + id, err := strconv.ParseUint(trimmed, 10, 64) + if err != nil { + return 0, 0, err + } + + if id < head { + head = id + } + + if (id + 1) > tail { + tail = (id + 1) + } + } + if err := results.Close(); err != nil { + return 0, 0, err + } + if head == math.MaxUint64 { + head = 0 + } + + return head, tail, nil +} diff --git a/provider/queue_test.go b/provider/queue_test.go new file mode 100644 index 00000000000..e1b74878ea5 --- /dev/null +++ b/provider/queue_test.go @@ -0,0 +1,131 @@ +package provider + +import ( + "context" + "testing" + "time" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + sync "github.com/ipfs/go-datastore/sync" +) + +func makeCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, n) + for i := 0; i < 10; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { + for _, c := range cids { + select { + case dequeued := <-q.dequeue: + if c != dequeued { + t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) + } + + case <-time.After(time.Second * 1): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} + +func TestBasicOperation(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + + for _, c := range cids { + queue.Enqueue(c) + } + + assertOrdered(cids, queue, t) +} + +func TestSparseDatastore(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Delete(queue.queueKey(5)) + if err != nil { + t.Fatal(err) + } + + err = queue.ds.Delete(queue.queueKey(6)) + if err != nil { + t.Fatal(err) + } + + expected := append(cids[:5], cids[7:]...) + assertOrdered(expected, queue, t) +} + +func TestMangledData(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Put(queue.queueKey(5), []byte("borked")) + + expected := append(cids[:5], cids[6:]...) + assertOrdered(expected, queue, t) +} + +func TestInitialization(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + assertOrdered(cids[:5], queue, t) + + // make a new queue, same data + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + assertOrdered(cids[5:], queue, t) +}