diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index b3e2644570f..ba0a086020b 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -26,7 +26,6 @@ var BitswapCmd = &cmds.Command{ "stat": bitswapStatCmd, "wantlist": showWantlistCmd, "ledger": ledgerCmd, - "reprovide": reprovideCmd, }, } @@ -204,29 +203,3 @@ prints the ledger associated with a given peer. }), }, } - -var reprovideCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "Trigger reprovider.", - ShortDescription: ` -Trigger reprovider to announce our data to network. -`, - }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - nd, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - if !nd.IsOnline { - return ErrNotOnline - } - - err = nd.Reprovider.Trigger(req.Context) - if err != nil { - return err - } - - return nil - }, -} diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index e6958f8ce75..5804c3cc39a 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -128,7 +128,26 @@ into an object of the specified format. return err } +//<<<<<<< HEAD return nil +//======= +// if dopin { +// cids.ForEach(func(c cid.Cid) error { +// nd.Pinning.PinWithMode(c, pin.Recursive) +// return nil +// }) +// +// err := nd.Pinning.Flush() +// if err != nil { +// return err +// } +// } +// +// return cids.ForEach(func(cid cid.Cid) error { +// nd.Provider.Provide(cid) +// return nil +// }) +//>>>>>>> Add provider to ipfs and provide when adding/fetching }, Type: OutputObject{}, Encoders: cmds.EncoderMap{ @@ -175,6 +194,8 @@ format. return err } + //nd.Provider.Provide(obj.Cid()) + var out interface{} = obj if len(rp.Remainder()) > 0 { rem := strings.Split(rp.Remainder(), "/") @@ -215,6 +236,8 @@ var DagResolveCmd = &cmds.Command{ return err } + //nd.Provider.Provide(lastCid) + return cmds.EmitOnce(res, &ResolveOutput{ Cid: rp.Cid(), RemPath: rp.Remainder(), diff --git a/core/commands/provider.go b/core/commands/provider.go new file mode 100644 index 00000000000..364e2f6c621 --- /dev/null +++ b/core/commands/provider.go @@ -0,0 +1,45 @@ +package commands + +import ( + cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + + cmds "github.com/ipfs/go-ipfs-cmds" + cmdkit "github.com/ipfs/go-ipfs-cmdkit" +) + +var ProviderCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Interact with the provider module.", + ShortDescription: ``, + }, + + Subcommands: map[string]*cmds.Command{ + "reprovide": reprovideCmd, + }, +} + +var reprovideCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Trigger reprovider.", + ShortDescription: ` +Trigger reprovider to announce our data to network. +`, + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + nd, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + if !nd.IsOnline { + return ErrNotOnline + } + + err = nd.Reprovider.Trigger(req.Context) + if err != nil { + return err + } + + return nil + }, +} diff --git a/core/commands/refs.go b/core/commands/refs.go index d63786f87d0..14e1942ee4b 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -177,6 +177,9 @@ func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]i if err != nil { return nil, err } + + n.Provider.Provide(o.Cid()) + objects[i] = o } return objects, nil diff --git a/core/commands/root.go b/core/commands/root.go index 0b115234859..d1e77e89cdc 100644 --- a/core/commands/root.go +++ b/core/commands/root.go @@ -57,6 +57,7 @@ ADVANCED COMMANDS stats Various operational stats p2p Libp2p stream mounting filestore Manage the filestore (experimental) + provider Manage the provider system NETWORK COMMANDS id Show info about IPFS peers @@ -140,6 +141,7 @@ var rootSubcommands = map[string]*cmds.Command{ "pin": PinCmd, "ping": PingCmd, "p2p": P2PCmd, + "provider": ProviderCmd, "refs": RefsCmd, "resolve": ResolveCmd, "swarm": SwarmCmd, diff --git a/core/commands/tar.go b/core/commands/tar.go index 488193f185b..308bf820a64 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -43,6 +43,11 @@ represent it. return err } + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err @@ -60,6 +65,7 @@ represent it. } c := node.Cid() + api.Provider().Provide(c) return cmds.EmitOnce(res, &AddEvent{ Name: it.Name(), diff --git a/core/core.go b/core/core.go index 358bab1c876..550d5a72931 100644 --- a/core/core.go +++ b/core/core.go @@ -21,13 +21,13 @@ import ( "time" version "github.com/ipfs/go-ipfs" - rp "github.com/ipfs/go-ipfs/exchange/reprovide" filestore "github.com/ipfs/go-ipfs/filestore" mount "github.com/ipfs/go-ipfs/fuse/mount" namesys "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" p2p "github.com/ipfs/go-ipfs/p2p" pin "github.com/ipfs/go-ipfs/pin" + provider "github.com/ipfs/go-ipfs/provider" repo "github.com/ipfs/go-ipfs/repo" bitswap "github.com/ipfs/go-bitswap" @@ -124,7 +124,8 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Reprovider *rp.Reprovider // the value reprovider system + Reprovider *provider.Reprovider // the value reprovider system + Provider *provider.Provider // the value provider system IpnsRepub *ipnsrp.Republisher AutoNAT *autonat.AutoNATService @@ -324,21 +325,24 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } - var keyProvider rp.KeyChanFunc + // Provider - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = rp.NewBlockstoreProvider(n.Blockstore) - case "roots": - keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true) - case "pinned": - keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) - default: - return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) + // TODO: Specify strategy in cfg + strategy := provider.NewProvideAllStrategy(n.DAG) + tracker := provider.NewTracker(n.Repo.Datastore()) + queue, err := provider.NewQueue("provider", ctx, n.Repo.Datastore()) + if err != nil { + return err + } + n.Provider = provider.NewProvider(ctx, strategy, tracker, queue, n.Blockstore, n.Routing) + go n.Provider.Run() + + // Reprovider - new + + rq, err := provider.NewQueue("reprovider", ctx, n.Repo.Datastore()) + if err != nil { + return err } - n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) reproviderInterval := kReprovideFrequency if cfg.Reprovider.Interval != "" { @@ -350,7 +354,8 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { reproviderInterval = dur } - go n.Reprovider.Run(reproviderInterval) + n.Reprovider = provider.NewReprovider(ctx, rq, tracker, reproviderInterval, n.Blockstore, n.Routing) + n.Reprovider.Run() return nil } diff --git a/core/coreapi/block.go b/core/coreapi/block.go index 10aa0fcb2e0..4c9eeb24d9f 100644 --- a/core/coreapi/block.go +++ b/core/coreapi/block.go @@ -57,6 +57,8 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc api.pinning.PinWithMode(b.Cid(), pin.Recursive) } + api.provider.Provide(b.Cid()) + return &BlockStat{path: coreiface.IpldPath(b.Cid()), size: len(data)}, nil } @@ -71,6 +73,8 @@ func (api *BlockAPI) Get(ctx context.Context, p coreiface.Path) (io.Reader, erro return nil, err } + api.provider.Provide(rp.Cid()) + return bytes.NewReader(b.RawData()), nil } @@ -123,6 +127,8 @@ func (api *BlockAPI) Stat(ctx context.Context, p coreiface.Path) (coreiface.Bloc return nil, err } + api.provider.Provide(b.Cid()) + return &BlockStat{ path: coreiface.IpldPath(b.Cid()), size: len(b.RawData()), diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index c5ba1b56672..04ef44cdc62 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/namesys" @@ -68,6 +69,9 @@ type CoreAPI struct { pubSub *pubsub.PubSub + provider *provider.Provider + reprovider *provider.Reprovider + checkPublishAllowed func() error checkOnline func(allowOffline bool) error @@ -139,6 +143,10 @@ func (api *CoreAPI) PubSub() coreiface.PubSubAPI { return (*PubSubAPI)(api) } +func (api *CoreAPI) Provider() coreiface.ProviderAPI { + return (*ProviderAPI)(api) +} + // WithOptions returns api with global options applied func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, error) { settings := api.parentOpts // make sure to copy @@ -176,6 +184,9 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e pubSub: n.PubSub, + provider: n.Provider, + reprovider: n.Reprovider, + nd: n, parentOpts: settings, } diff --git a/core/coreapi/object.go b/core/coreapi/object.go index e302696bceb..0fdb06f40b2 100644 --- a/core/coreapi/object.go +++ b/core/coreapi/object.go @@ -54,6 +54,9 @@ func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) ( if err != nil { return nil, err } + + api.provider.Provide(n.Cid()) + return n, nil } @@ -134,6 +137,8 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj } } + api.provider.Provide(dagnode.Cid()) + return coreiface.IpfsPath(dagnode.Cid()), nil } @@ -231,6 +236,8 @@ func (api *ObjectAPI) AddLink(ctx context.Context, base coreiface.Path, name str return nil, err } + api.provider.Provide(nnode.Cid()) + return coreiface.IpfsPath(nnode.Cid()), nil } @@ -257,6 +264,8 @@ func (api *ObjectAPI) RmLink(ctx context.Context, base coreiface.Path, link stri return nil, err } + api.provider.Provide(nnode.Cid()) + return coreiface.IpfsPath(nnode.Cid()), nil } @@ -294,6 +303,8 @@ func (api *ObjectAPI) patchData(ctx context.Context, path coreiface.Path, r io.R return nil, err } + api.provider.Provide(pbnd.Cid()) + return coreiface.IpfsPath(pbnd.Cid()), nil } diff --git a/core/coreapi/path.go b/core/coreapi/path.go index 6302ec67fa5..3bf2719e61c 100644 --- a/core/coreapi/path.go +++ b/core/coreapi/path.go @@ -27,6 +27,9 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Nod if err != nil { return nil, err } + + api.provider.Provide(node.Cid()) + return node, nil } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go new file mode 100644 index 00000000000..7be67966255 --- /dev/null +++ b/core/coreapi/provider.go @@ -0,0 +1,15 @@ +package coreapi + +import ( + cid "github.com/ipfs/go-cid" +) + +type ProviderAPI CoreAPI + +func (api *ProviderAPI) Provide(root cid.Cid) error { + return api.provider.Provide(root) +} + +func (api *ProviderAPI) Unprovide(root cid.Cid) error { + return api.provider.Unprovide(root) +} diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index d773fe0c157..bbe8c302e35 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -129,6 +129,9 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options if err != nil { return nil, err } + + api.provider.Provide(nd.Cid()) + return coreiface.IpfsPath(nd.Cid()), nil } @@ -140,6 +143,7 @@ func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.Node, er return nil, err } + api.provider.Provide(nd.Cid()) return unixfile.NewUnixfsFile(ctx, ses.dag, nd) } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index a62aee4cdac..affecaa4864 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -489,6 +489,8 @@ func (i *gatewayHandler) putHandler(w http.ResponseWriter, r *http.Request) { return } + i.api.Provider().Provide(newcid) + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", newcid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix, newcid.String(), newPath), http.StatusCreated) @@ -566,6 +568,8 @@ func (i *gatewayHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { // Redirect to new path ncid := newnode.Cid() + i.api.Provider().Provide(ncid) + i.addUserHeaders(w) // ok, _now_ write user's headers. w.Header().Set("IPFS-Hash", ncid.String()) http.Redirect(w, r, gopath.Join(ipfsPathPrefix+ncid.String(), path.Join(components[:len(components)-1])), http.StatusCreated) diff --git a/go.mod b/go.mod index 7cea50c3cdc..8b9fe5a38d6 100644 --- a/go.mod +++ b/go.mod @@ -114,3 +114,5 @@ require ( gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect gotest.tools/gotestsum v0.3.3 ) + +replace github.com/ipfs/interface-go-ipfs-core v0.0.1 => ../interface-go-ipfs-core diff --git a/provider/provider.go b/provider/provider.go new file mode 100644 index 00000000000..a6e4da65a06 --- /dev/null +++ b/provider/provider.go @@ -0,0 +1,155 @@ +// Package provider implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package provider + +import ( + "context" + "fmt" + cid "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-routing" + "time" +) + +var ( + log = logging.Logger("provider") +) + +const ( + provideOutgoingWorkerLimit = 8 + provideOutgoingTimeout = 15 * time.Second +) + +type Strategy func(context.Context, cid.Cid) <-chan cid.Cid + +// Provider announces blocks to the network, tracks which blocks are +// being provided, and untracks blocks when they're no longer in the blockstore. +type Provider struct { + ctx context.Context + + // strategy for deciding which CIDs, given a CID, should be provided + strategy Strategy + // keeps track of which CIDs have been provided already + tracker *Tracker + // the CIDs for which provide announcements should be made + queue *Queue + // where the blocks live + blockstore bstore.Blockstore + // used to announce providing to the network + contentRouting routing.ContentRouting +} + +func NewProvider(ctx context.Context, strategy Strategy, tracker *Tracker, queue *Queue, blockstore bstore.Blockstore, contentRouting routing.ContentRouting) *Provider { + return &Provider{ + ctx: ctx, + strategy: strategy, + tracker: tracker, + queue: queue, + blockstore: blockstore, + contentRouting: contentRouting, + } +} + +// Start workers to handle provide requests. +func (p *Provider) Run() { + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + cids := p.strategy(p.ctx, root) + + for cid := range cids { + isTracking, err := p.tracker.IsTracking(cid) + if err != nil { + return err + } + if isTracking { + continue + } + if err := p.queue.Enqueue(cid); err != nil { + return err + } + } + + return nil +} + +// Stop providing a block +func (p *Provider) Unprovide(cid cid.Cid) error { + return p.tracker.Untrack(cid) +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + go func() { + for { + select { + case <-p.ctx.Done(): + return + case entry := <-p.queue.Dequeue(): + // skip if already tracking + isTracking, err := p.tracker.IsTracking(entry.cid) + if err != nil { + log.Warningf("Unable to check provider tracking on outgoing: %s, %s", entry.cid, err) + continue + } + if isTracking { + if err := entry.Complete(); err != nil { + log.Warningf("Unable to complete queue entry when already tracking: %s, %s", entry.cid, err) + } + continue + } + + if err := doProvide(p.ctx, p.tracker, p.blockstore, p.contentRouting, entry.cid); err != nil { + log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) + } + + if err := entry.Complete(); err != nil { + log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err) + } + } + } + }() + } +} + +// TODO: better document this provide logic +func doProvide(ctx context.Context, tracker *Tracker, blockstore bstore.Blockstore, contentRouting routing.ContentRouting, key cid.Cid) error { + // if not in blockstore, skip and stop tracking + inBlockstore, err := blockstore.Has(key) + if err != nil { + log.Warningf("Unable to check for presence in blockstore: %s, %s", key, err) + return err + } + if !inBlockstore { + if err := tracker.Untrack(key); err != nil { + log.Warningf("Unable to untrack: %s, %s", key, err) + return err + } + return nil + } + + // announce + fmt.Println("announce - start - ", key) + ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout) + if err := contentRouting.Provide(ctx, key, true); err != nil { + log.Warningf("Failed to provide cid: %s", err) + // TODO: Maybe put these failures onto a failures queue? + cancel() + return err + } + cancel() + fmt.Println("announce - end - ", key) + + // track entry + if err := tracker.Track(key); err != nil { + log.Warningf("Unable to track: %s, %s", key, err) + return err + } + + return nil +} \ No newline at end of file diff --git a/provider/queue.go b/provider/queue.go new file mode 100644 index 00000000000..8f61de294e6 --- /dev/null +++ b/provider/queue.go @@ -0,0 +1,231 @@ +package provider + +import ( + "context" + "errors" + "math" + "strconv" + "strings" + "sync" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" +) + +// Entry allows for the durability in the queue. When a cid is dequeued it is +// not removed from the datastore until you call Complete() on the entry you +// receive. +type Entry struct { + cid cid.Cid + key ds.Key + queue *Queue +} + +func (e *Entry) Complete() error { + return e.queue.remove(e.key) +} + +// Queue provides a durable, FIFO interface to the datastore for storing cids +// +// Durability just means that cids in the process of being provided when a +// crash or shutdown occurs will still be in the queue when the node is +// brought back online. +type Queue struct { + // used to differentiate queues in datastore + // e.g. provider vs reprovider + name string + + ctx context.Context + + tail uint64 + head uint64 + + lock sync.Mutex + datastore ds.Datastore + + dequeue chan *Entry + notEmpty chan struct{} +} + +func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/")) + head, tail, err := getQueueHeadTail(name, ctx, namespaced) + if err != nil { + return nil, err + } + q := &Queue{ + name: name, + ctx: ctx, + head: head, + tail: tail, + lock: sync.Mutex{}, + datastore: namespaced, + dequeue: make(chan *Entry), + notEmpty: make(chan struct{}), + } + q.run() + return q, nil +} + +// Put a cid in the queue +func (q *Queue) Enqueue(cid cid.Cid) error { + q.lock.Lock() + defer q.lock.Unlock() + + wasEmpty := q.IsEmpty() + + nextKey := q.queueKey(q.tail) + + if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { + return err + } + + q.tail++ + + if wasEmpty { + select { + case q.notEmpty <- struct{}{}: + case <-q.ctx.Done(): + } + } + + return nil +} + +// Remove an entry from the queue. +func (q *Queue) Dequeue() <-chan *Entry { + return q.dequeue +} + +func (q *Queue) IsEmpty() bool { + return (q.tail - q.head) == 0 +} + +func (q *Queue) remove(key ds.Key) error { + return q.datastore.Delete(key) +} + +// dequeue items when the dequeue channel is available to +// be written to +func (q *Queue) run() { + go func() { + for { + select { + case <-q.ctx.Done(): + return + default: + } + if q.IsEmpty() { + select { + case <-q.ctx.Done(): + return + // wait for a notEmpty message + case <-q.notEmpty: + } + } + + entry, err := q.next() + if err != nil { + log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) + continue + } + + select { + case <-q.ctx.Done(): + return + case q.dequeue <- entry: + } + } + }() +} + +// Find the next item in the queue, crawl forward if an entry is not +// found in the next spot. +func (q *Queue) next() (*Entry, error) { + q.lock.Lock() + defer q.lock.Unlock() + + var nextKey ds.Key + var value []byte + var err error + for { + if q.head >= q.tail { + return nil, errors.New("no more entries in queue") + } + select { + case <-q.ctx.Done(): + return nil, nil + default: + } + nextKey = q.queueKey(q.head) + value, err = q.datastore.Get(nextKey) + if err == ds.ErrNotFound { + q.head++ + continue + } else if err != nil { + return nil, err + } else { + break + } + } + + id, err := cid.Parse(value) + if err != nil { + return nil, err + } + + entry := &Entry { + cid: id, + key: nextKey, + queue: q, + } + + q.head++ + + return entry, nil +} + +func (q *Queue) queueKey(id uint64) ds.Key { + return ds.NewKey(strconv.FormatUint(id, 10)) +} + +// crawl over the queue entries to find the head and tail +func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) { + query := query.Query{} + results, err := datastore.Query(query) + if err != nil { + return 0, 0, err + } + + var tail uint64 = 0 + var head uint64 = math.MaxUint64 + for entry := range results.Next() { + select { + case <-ctx.Done(): + return 0, 0, nil + default: + } + trimmed := strings.TrimPrefix(entry.Key, "/") + id, err := strconv.ParseUint(trimmed, 10, 64) + if err != nil { + return 0, 0, err + } + + if id < head { + head = id + } + + if (id+1) > tail { + tail = (id+1) + } + } + if err := results.Close(); err != nil { + return 0, 0, err + } + if head == math.MaxUint64 { + head = 0 + } + + return head, tail, nil +} diff --git a/provider/reprovider.go b/provider/reprovider.go new file mode 100644 index 00000000000..bcb68c8da16 --- /dev/null +++ b/provider/reprovider.go @@ -0,0 +1,118 @@ +package provider + +import ( + "context" + "github.com/ipfs/go-ipfs-blockstore" + "github.com/libp2p/go-libp2p-routing" + "time" +) + +var ( + reprovideOutgoingWorkerLimit = 8 +) + +type Reprovider struct { + ctx context.Context + queue *Queue + tracker *Tracker + tick time.Duration + blockstore blockstore.Blockstore + contentRouting routing.ContentRouting + trigger chan struct{} +} + +// Reprovider periodically re-announces the cids that have been provided. These +// reprovides can be run on an interval and/or manually. Reprovider also untracks +// cids that are no longer in the blockstore. +func NewReprovider(ctx context.Context, queue *Queue, tracker *Tracker, tick time.Duration, blockstore blockstore.Blockstore, contentRouting routing.ContentRouting) *Reprovider { + return &Reprovider{ + ctx: ctx, + queue: queue, + tracker: tracker, + tick: tick, + blockstore: blockstore, + contentRouting: contentRouting, + trigger: make(chan struct{}), + } +} + +// Begin listening for triggers and reprovide whatever is +// in the reprovider queue. +func (rp *Reprovider) Run() { + go rp.handleTriggers() + go rp.handleAnnouncements() +} + +// Add all the cids in the tracker to the reprovide queue +func (rp *Reprovider) Reprovide() error { + cids, err := rp.tracker.Tracking(rp.ctx) + if err != nil { + log.Warningf("error obtaining tracking information: %s", err) + return err + } + for c := range cids { + if err := rp.queue.Enqueue(c); err != nil { + log.Warningf("unable to enqueue cid: %s, %s", c, err) + continue + } + } + return nil +} + +// Trigger causes a reprovide +func (rp *Reprovider) Trigger(ctx context.Context) error { + select { + case <-rp.ctx.Done(): + return rp.ctx.Err() + case <-ctx.Done(): + return ctx.Err() + case rp.trigger <- struct{}{}: + } + return nil +} + +func (rp *Reprovider) handleTriggers() { + // dont reprovide immediately. + // may have just started the daemon and shutting it down immediately. + // probability( up another minute | uptime ) increases with uptime. + after := time.After(time.Minute) + for { + if rp.tick == 0 { + after = nil + } + + select { + case <-rp.ctx.Done(): + return + case <-rp.trigger: + case <-after: + } + + err := rp.Reprovide() + if err != nil { + log.Debug(err) + } + + after = time.After(rp.tick) + } +} + +func (rp *Reprovider) handleAnnouncements() { + for workers := 0; workers < reprovideOutgoingWorkerLimit; workers++ { + go func() { + for { + select { + case <-rp.ctx.Done(): + return + case entry := <-rp.queue.Dequeue(): + if err := doProvide(rp.ctx, rp.tracker, rp.blockstore, rp.contentRouting, entry.cid); err != nil { + log.Warningf("Unable to reprovide entry: %s, %s", entry.cid, err) + } + if err := entry.Complete(); err != nil { + log.Warningf("Unable to complete queue entry when reproviding: %s, %s", entry.cid, err) + } + } + } + }() + } +} diff --git a/provider/strategy.go b/provider/strategy.go new file mode 100644 index 00000000000..9c3e0b12e07 --- /dev/null +++ b/provider/strategy.go @@ -0,0 +1,34 @@ +package provider + +// TODO: The strategy module is going to change so that it just +// calls Provide on a given provider instead of returning a channel. + +import ( + "context" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +func NewProvideAllStrategy(dag ipld.DAGService) Strategy { + return func(ctx context.Context, root cid.Cid) <-chan cid.Cid { + cids := make(chan cid.Cid) + go func() { + select { + case <-ctx.Done(): + return + case cids <- root: + } + merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool { + select { + case <-ctx.Done(): + return false + case cids <- root: + } + return true + }) + close(cids) + }() + return cids + } +} diff --git a/provider/tracker.go b/provider/tracker.go new file mode 100644 index 00000000000..0cae8c8a615 --- /dev/null +++ b/provider/tracker.go @@ -0,0 +1,63 @@ +package provider + +import ( + "context" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" +) + +const providerTrackingPrefix = "/provider/tracking/" + +// Keeps track of which cids are being provided. +type Tracker struct { + datastore ds.Datastore +} + +func NewTracker(datastore ds.Datastore) *Tracker { + return &Tracker{ + datastore: namespace.Wrap(datastore, ds.NewKey(providerTrackingPrefix)), + } +} + +func (t *Tracker) IsTracking(cid cid.Cid) (bool, error) { + return t.datastore.Has(providerTrackingKey(cid)) +} + +func (t *Tracker) Track(cid cid.Cid) error { + return t.datastore.Put(providerTrackingKey(cid), cid.Bytes()) +} + +func (t *Tracker) Untrack(cid cid.Cid) error { + return t.datastore.Delete(providerTrackingKey(cid)) +} + +// Returns all the cids that are being tracked. +func (t *Tracker) Tracking(ctx context.Context) (<-chan cid.Cid, error) { + q := query.Query{} + results, err := t.datastore.Query(q) + if err != nil { + return nil, err + } + cids := make(chan cid.Cid) + go func() { + defer close(cids) + for result := range results.Next() { + key, err := cid.Parse(result.Value) + if err != nil { + log.Warningf("unable to parse tracked cid: %s", err) + } + select { + case <-ctx.Done(): + return + case cids <- key: + } + } + }() + return cids, nil +} + +func providerTrackingKey(cid cid.Cid) ds.Key { + return ds.NewKey(cid.String()) +}