Skip to content

Commit

Permalink
Add fixed period repo GC + test
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
  • Loading branch information
rht committed Nov 10, 2015
1 parent ece43a5 commit 37f80d5
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 15 deletions.
32 changes: 30 additions & 2 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
peer "github.com/ipfs/go-ipfs/p2p/peer"
Expand All @@ -36,6 +37,7 @@ const (
ipnsMountKwd = "mount-ipns"
unrestrictedApiAccessKwd = "unrestricted-api"
unencryptTransportKwd = "disable-transport-encryption"
enableGCKwd = "enable-gc"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)
Expand Down Expand Up @@ -114,6 +116,7 @@ future version, along with this notice. Please move to setting the HTTP Headers.
cmds.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount)"),
cmds.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes"),
cmds.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)"),
cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection"),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
Expand Down Expand Up @@ -277,15 +280,23 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
}
}

// repo blockstore GC - if --enable-gc flag is present
err, gcErrc := maybeRunGC(req, node)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

fmt.Printf("Daemon is ready\n")
// collect long-running errors and block for shutdown
// TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown
for err := range merge(apiErrc, gwErrc) {
for err := range merge(apiErrc, gwErrc, gcErrc) {
if err != nil {
log.Error(err)
res.SetError(err, cmds.ErrNormal)
return
}
}
return
}

// serveHTTPApi collects options, creates listener, prints status message and starts serving requests
Expand Down Expand Up @@ -478,6 +489,23 @@ func mountFuse(req cmds.Request) error {
return nil
}

func maybeRunGC(req cmds.Request, node *core.IpfsNode) (error, <-chan error) {
enableGC, _, err := req.Option(enableGCKwd).Bool()
if err != nil {
return err, nil
}
if !enableGC {
return nil, nil
}

errc := make(chan error)
go func() {
errc <- corerepo.PeriodicGC(req.Context(), node)
close(errc)
}()
return nil, errc
}

// merge does fan-in of multiple read-only error channels
// taken from http://blog.golang.org/pipelines
func merge(cs ...<-chan error) <-chan error {
Expand Down
2 changes: 1 addition & 1 deletion commands/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *Context) GetConfig() (*config.Config, error) {
}

// GetNode returns the node of the current Command exection
// context. It may construct it with the providied function.
// context. It may construct it with the provided function.
func (c *Context) GetNode() (*core.IpfsNode, error) {
var err error
if c.node == nil {
Expand Down
8 changes: 8 additions & 0 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ remains to be implemented.
// see comment above
return nil
}

log.Debugf("Total size of file being added: %v\n", size)
req.Values()["size"] = size

Expand All @@ -100,6 +101,13 @@ remains to be implemented.
res.SetError(err, cmds.ErrNormal)
return
}
// check if repo will exceed storage limit if added
// TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated)
// TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon
//if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil {
// res.SetError(err, cmds.ErrNormal)
// return
//}

progress, _, _ := req.Option(progressOptionName).Bool()
trickle, _, _ := req.Option(trickleOptionName).Bool()
Expand Down
5 changes: 5 additions & 0 deletions core/commands/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/corerepo"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"

context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
Expand Down Expand Up @@ -44,6 +45,10 @@ it contains.
return
}

if err := corerepo.ConditionalGC(req.Context(), node, length); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetLength(length)

reader := io.MultiReader(readers...)
Expand Down
142 changes: 138 additions & 4 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,85 @@
package corerepo

import (
"errors"
"time"

humanize "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/go-humanize"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/core"

repo "github.com/ipfs/go-ipfs/repo"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)

var log = logging.Logger("corerepo")

var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")

type KeyRemoved struct {
Key key.Key
}

type GC struct {
Node *core.IpfsNode
Repo repo.Repo
StorageMax uint64
StorageGC uint64
SlackGB uint64
Storage uint64
}

func NewGC(n *core.IpfsNode) (*GC, error) {
r := n.Repo
cfg, err := r.Config()
if err != nil {
return nil, err
}

// check if cfg has these fields initialized
// TODO: there should be a general check for all of the cfg fields
// maybe distinguish between user config file and default struct?
if cfg.Datastore.StorageMax == "" {
r.SetConfigKey("Datastore.StorageMax", "10GB")
cfg.Datastore.StorageMax = "10GB"
}
if cfg.Datastore.StorageGCWatermark == 0 {
r.SetConfigKey("Datastore.StorageGCWatermark", 90)
cfg.Datastore.StorageGCWatermark = 90
}

storageMax, err := humanize.ParseBytes(cfg.Datastore.StorageMax)
if err != nil {
return nil, err
}
storageGC := storageMax * uint64(cfg.Datastore.StorageGCWatermark) / 100

// calculate the slack space between StorageMax and StorageGCWatermark
// used to limit GC duration
slackGB := (storageMax - storageGC) / 10e9
if slackGB < 1 {
slackGB = 1
}

return &GC{
Node: n,
Repo: r,
StorageMax: storageMax,
StorageGC: storageGC,
SlackGB: slackGB,
}, nil
}

func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel() // in case error occurs during operation
keychan, err := n.Blockstore.AllKeysChan(ctx)
if err != nil {
return err
}
for k := range keychan { // rely on AllKeysChan to close chan
if !n.Pinning.IsPinned(k) {
err := n.Blockstore.DeleteBlock(k)
if err != nil {
if err := n.Blockstore.DeleteBlock(k); err != nil {
return err
}
}
Expand Down Expand Up @@ -66,3 +121,82 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
}()
return output, nil
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
cfg, err := node.Repo.Config()
if err != nil {
return err
}

if cfg.Datastore.GCPeriod == "" {
node.Repo.SetConfigKey("Datastore.GCPeriod", "1h")
cfg.Datastore.GCPeriod = "1h"
}

period, err := time.ParseDuration(cfg.Datastore.GCPeriod)
if err != nil {
return err
}
if int64(period) == 0 {
// if duration is 0, it means GC is disabled.
return nil
}

gc, err := NewGC(node)
if err != nil {
return err
}

for {
select {
case <-ctx.Done():
return nil
case <-time.After(period):
// the private func maybeGC doesn't compute storageMax, storageGC, slackGC so that they are not re-computed for every cycle
if err := gc.maybeGC(ctx, 0); err != nil {
return err
}
}
}
}

func ConditionalGC(ctx context.Context, node *core.IpfsNode, offset uint64) error {
gc, err := NewGC(node)
if err != nil {
return err
}
return gc.maybeGC(ctx, offset)
}

func (gc *GC) maybeGC(ctx context.Context, offset uint64) error {
storage, err := gc.Repo.GetStorageUsage()
if err != nil {
return err
}

if storage+offset > gc.StorageMax {
err := ErrMaxStorageExceeded
log.Error(err)
return err
}

if storage+offset > gc.StorageGC {
// Do GC here
log.Info("Starting repo GC...")
defer log.EventBegin(ctx, "repoGC").Done()
// 1 minute is sufficient for ~1GB unlink() blocks each of 100kb in SSD
_ctx, cancel := context.WithTimeout(ctx, time.Duration(gc.SlackGB)*time.Minute)
defer cancel()

if err := GarbageCollect(gc.Node, _ctx); err != nil {
return err
}
newStorage, err := gc.Repo.GetStorageUsage()
if err != nil {
return err
}
log.Infof("Repo GC done. Released %s\n", humanize.Bytes(uint64(storage-newStorage)))
return nil
}
return nil
}
7 changes: 5 additions & 2 deletions repo/config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ const DefaultDataStoreDirectory = "datastore"

// Datastore tracks the configuration of the datastore.
type Datastore struct {
Type string
Path string
Type string
Path string
StorageMax string // in B, kB, kiB, MB, ...
StorageGCWatermark int64 // in percentage to multiply on StorageMax
GCPeriod string // in ns, us, ms, s, m, h
}

// DataStorePath returns the default data store path given a configuration root
Expand Down
7 changes: 5 additions & 2 deletions repo/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ func datastoreConfig() (*Datastore, error) {
return nil, err
}
return &Datastore{
Path: dspath,
Type: "leveldb",
Path: dspath,
Type: "leveldb",
StorageMax: "10GB",
StorageGCWatermark: 90, // 90%
GCPeriod: "1h",
}, nil
}

Expand Down
18 changes: 16 additions & 2 deletions repo/fsrepo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
mfsr "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
dir "github.com/ipfs/go-ipfs/thirdparty/dir"
u "github.com/ipfs/go-ipfs/util"
util "github.com/ipfs/go-ipfs/util"
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
Expand Down Expand Up @@ -166,7 +165,7 @@ func open(repoPath string) (repo.Repo, error) {
}

func newFSRepo(rpath string) (*FSRepo, error) {
expPath, err := u.TildeExpansion(filepath.Clean(rpath))
expPath, err := util.TildeExpansion(filepath.Clean(rpath))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -587,6 +586,21 @@ func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
return d
}

// GetStorageUsage computes the storage space taken by the repo in bytes
func (r *FSRepo) GetStorageUsage() (uint64, error) {
pth, err := config.PathRoot()
if err != nil {
return 0, err
}

var du uint64
err = filepath.Walk(pth, func(p string, f os.FileInfo, err error) error {
du += uint64(f.Size())
return nil
})
return du, err
}

var _ io.Closer = &FSRepo{}
var _ repo.Repo = &FSRepo{}

Expand Down
2 changes: 2 additions & 0 deletions repo/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) {

func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D }

func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil }

func (m *Mock) Close() error { return errTODO }

func (m *Mock) SetAPIAddr(addr string) error { return errTODO }
1 change: 1 addition & 0 deletions repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Repo interface {
GetConfigKey(key string) (interface{}, error)

Datastore() datastore.ThreadSafeDatastore
GetStorageUsage() (uint64, error)

// SetAPIAddr sets the API address in the repo.
SetAPIAddr(addr string) error
Expand Down
4 changes: 2 additions & 2 deletions test/sharness/lib/test-lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ disk_usage() {
FreeBSD)
DU="du -s -A -B 1"
;;
Darwin | DragonFly)
DU="du"
Darwin | DragonFly | *)
DU="du -s"
;;
esac
$DU "$1" | awk "{print \$1}"
Expand Down
1 change: 1 addition & 0 deletions test/sharness/t0080-repo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ test_expect_success "'ipfs pin rm' output looks good" '
'

test_expect_failure "ipfs repo gc fully reverse ipfs add" '
ipfs repo gc &&
random 100000 41 >gcfile &&
disk_usage "$IPFS_PATH/blocks" >expected &&
hash=`ipfs add -q gcfile` &&
Expand Down

0 comments on commit 37f80d5

Please sign in to comment.