From cb7db369b7b47a201625953e5eec65dcef294c6f Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Tue, 8 Jan 2019 10:30:55 -0800 Subject: [PATCH] When GC deletes a block also unprovide it License: MIT Signed-off-by: Michael Avila --- core/corerepo/gc.go | 13 +++++++++++-- pin/gc/gc.go | 15 ++++++++++++--- provider/provider.go | 17 +++++++++++++---- provider/strategy.go | 16 ++++++++++------ provider/tracker.go | 4 ++++ 5 files changed, 50 insertions(+), 15 deletions(-) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index ba05f93d2bd6..03e4821d16f1 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-ipfs/core" gc "github.com/ipfs/go-ipfs/pin/gc" repo "github.com/ipfs/go-ipfs/repo" + provider "github.com/ipfs/go-ipfs/provider" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" @@ -29,6 +30,14 @@ type GC struct { Storage uint64 } +type GCCleanup struct { + *provider.Provider +} + +func (c *GCCleanup) Cleanup(cid cid.Cid) error { + return c.Unprovide(cid) +} + func NewGC(n *core.IpfsNode) (*GC, error) { r := n.Repo cfg, err := r.Config() @@ -86,7 +95,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed := gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, roots) + rmed := gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, &GCCleanup{n.Provider}, roots) return CollectResult(ctx, rmed, nil) } @@ -154,7 +163,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result return out } - return gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, roots) + return gc.GC(ctx, n.Blockstore, n.Repo.Datastore(), n.Pinning, &GCCleanup{n.Provider}, roots) } func PeriodicGC(ctx context.Context, node *core.IpfsNode) error { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 3ab3dcdc0d70..8d1edd648466 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -29,6 +29,13 @@ type Result struct { Error error } +// Called with a cid.Cid that has just been removed from the blockstore. +// The intention is to clean up any other information about this block +// such as provider tracking. +type Cleanup interface { + Cleanup(cid.Cid) error +} + // GC performs a mark and sweep garbage collection of the blocks in the blockstore // first, it creates a 'marked' set and adds to it the following: // - all recursively pinned blocks, plus all of their descendants (recursively) @@ -38,8 +45,7 @@ type Result struct { // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn pin.Pinner, bestEffortRoots []cid.Cid) <-chan Result { - +func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn pin.Pinner, cleanup Cleanup, bestEffortRoots []cid.Cid) <-chan Result { elock := log.EventBegin(ctx, "GC.lockWait") unlocker := bs.GCLock() elock.Done() @@ -95,10 +101,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn if err != nil { errors = true output <- Result{Error: &CannotDeleteBlockError{k, err}} - //log.Errorf("Error removing key from blockstore: %s", err) + // log.Errorf("Error removing key from blockstore: %s", err) // continue as error is non-fatal continue loop } + if err := cleanup.Cleanup(k); err != nil { + log.Warningf("Warning: unable to cleanup block: %s", k) + } select { case output <- Result{KeyRemoved: k}: case <-ctx.Done(): diff --git a/provider/provider.go b/provider/provider.go index acb38f3bf46c..a426a9798864 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,7 +18,7 @@ const ( provideOutgoingTimeout = 15 * time.Second ) -type Strategy func(context.Context, chan cid.Cid, cid.Cid) +type Strategy func(context.Context, cid.Cid) <-chan cid.Cid type Provider struct { ctx context.Context @@ -60,7 +60,16 @@ func (p *Provider) Run() { // Provider the given cid using specified strategy. func (p *Provider) Provide(root cid.Cid) { - p.strategy(p.ctx, p.incoming, root) + cids := p.strategy(p.ctx, root) + go func() { + for cid := range cids { + p.incoming <- cid + } + }() +} + +func (p *Provider) Unprovide(cid cid.Cid) error { + return p.tracker.Untrack(cid) } // Announce to the world that a block is provided. @@ -81,7 +90,7 @@ func (p *Provider) handleIncoming() { case key := <-p.incoming: isTracking, err := p.tracker.IsTracking(key) if err != nil { - log.Warning("Unable to check provider tracking: %s", err) + log.Warning("Unable to check provider tracking on incoming: %s", err) continue } @@ -133,7 +142,7 @@ func (p *Provider) handleOutgoing() { case key := <-p.outgoing: isTracking, err := p.tracker.IsTracking(key) if err != nil { - log.Warning("Unable to check provider tracking: %s, %s", key, err) + log.Warning("Unable to check provider tracking on outgoing: %s, %s", key, err) continue } if isTracking { diff --git a/provider/strategy.go b/provider/strategy.go index f35bfe623127..ba5e6179432c 100644 --- a/provider/strategy.go +++ b/provider/strategy.go @@ -8,11 +8,15 @@ import ( ) func NewProvideAllStrategy(dag ipld.DAGService) Strategy { - return func(ctx context.Context, cids chan cid.Cid, root cid.Cid) { - cids <- root - merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool { - cids <- cid - return true - }) + return func(ctx context.Context, root cid.Cid) <-chan cid.Cid { + cids := make(chan cid.Cid) + go func() { + cids <- root + merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), root, func(cid cid.Cid) bool { + cids <- cid + return true + }) + }() + return cids } } diff --git a/provider/tracker.go b/provider/tracker.go index 7fba8f89a123..f3605cd4bc54 100644 --- a/provider/tracker.go +++ b/provider/tracker.go @@ -25,6 +25,10 @@ func (t *Tracker) Track(cid cid.Cid) error { return t.datastore.Put(providerTrackingKey(cid), cid.Bytes()) } +func (t *Tracker) Untrack(cid cid.Cid) error { + return t.datastore.Delete(providerTrackingKey(cid)) +} + func providerTrackingKey(cid cid.Cid) ds.Key { return ds.NewKey(providerTrackingPrefix + cid.String()) }