Skip to content

Commit

Permalink
Add fixed-period repo GC
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
  • Loading branch information
rht committed Jul 22, 2015
1 parent d37ecbb commit a4704b0
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 6 deletions.
74 changes: 74 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
_ "expvar"
"fmt"
"net/http"
Expand All @@ -9,18 +10,22 @@ import (
"sort"
"strings"
"sync"
"time"

_ "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics/runtime"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
peer "github.com/ipfs/go-ipfs/p2p/peer"
repo "github.com/ipfs/go-ipfs/repo"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
util "github.com/ipfs/go-ipfs/util"
)
Expand All @@ -35,10 +40,13 @@ const (
ipnsMountKwd = "mount-ipns"
unrestrictedApiAccessKwd = "unrestricted-api"
unencryptTransportKwd = "disable-transport-encryption"
disableGCKwd = "disable-gc"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)

var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?")

var daemonCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Run a network-connected IPFS node",
Expand Down Expand Up @@ -86,6 +94,7 @@ run this then restart the daemon:
cmds.StringOption(ipnsMountKwd, "Path to the mountpoint for IPNS (if using --mount)"),
cmds.BoolOption(unrestrictedApiAccessKwd, "Allow API access to unlisted hashes"),
cmds.BoolOption(unencryptTransportKwd, "Disable transport encryption (for debugging protocols)"),
cmds.BoolOption(disableGCKwd, "Disable automatic periodic repo garbage collection"),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
Expand Down Expand Up @@ -246,6 +255,14 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
}
}

// repo blockstore GC
go func() {
if err := runGC(req, repo, node); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}()

// collect long-running errors and block for shutdown
// TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown
for err := range merge(apiErrc, gwErrc) {
Expand Down Expand Up @@ -461,3 +478,60 @@ func merge(cs ...<-chan error) <-chan error {
}()
return out
}

func runGC(req cmds.Request, repo repo.Repo, node *core.IpfsNode) error {
disableGC, _, err := req.Option(disableGCKwd).Bool()
if err != nil {
return err
}
if disableGC {
return nil
}

cfg, err := req.InvocContext().GetConfig()
if err != nil {
return fmt.Errorf("runGC: GetConfig() failed: %s", err)
}
ctx := req.Context()

// calculate swap size, the slack space between StorageMax and StorageGCWatermark
// used to limit GC duration
swapGiB := (cfg.Datastore.StorageMax - cfg.Datastore.StorageGCWatermark) / 10e9
if swapGiB < 1 {
swapGiB = 1
}

for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(cfg.Datastore.GCTimeoutMinute) * time.Minute):
// Check fs repo disk usage
diskUsage, err := repo.GetDiskUsage()
if err != nil {
return err
}

if diskUsage > cfg.Datastore.StorageGCWatermark {
// Do GC here
fmt.Println("Starting repo GC...")
// 1 minute is sufficient for ~1GB unlink() blocks each of 100kb in SSD
_ctx, cancel := context.WithTimeout(ctx, time.Duration(swapGiB)*time.Minute)
defer cancel()
if err := corerepo.GarbageCollect(node, _ctx); err != nil {
return err
}
newDiskUsage, err := repo.GetDiskUsage()
if err != nil {
return err
}
// TODO: Add unit to "Released x KB/MB/GB/TB/PB"
fmt.Println("Repo GC done. Released %d bytes", newDiskUsage-diskUsage)
}

if diskUsage > cfg.Datastore.StorageMax {
log.Error(ErrMaxStorageExceeded)
}
}
}
}
3 changes: 1 addition & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
}
for k := range keychan { // rely on AllKeysChan to close chan
if !n.Pinning.IsPinned(k) {
err := n.Blockstore.DeleteBlock(k)
if err != nil {
if err := n.Blockstore.DeleteBlock(k); err != nil {
return err
}
}
Expand Down
7 changes: 5 additions & 2 deletions repo/config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ const DefaultDataStoreDirectory = "datastore"

// Datastore tracks the configuration of the datastore.
type Datastore struct {
Type string
Path string
Type string
Path string
StorageMax int64 // in MB
StorageGCWatermark int64 // in MB
GCTimeoutMinute int64
}

// DataStorePath returns the default data store path given a configuration root
Expand Down
7 changes: 5 additions & 2 deletions repo/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ func datastoreConfig() (*Datastore, error) {
return nil, err
}
return &Datastore{
Path: dspath,
Type: "leveldb",
Path: dspath,
Type: "leveldb",
StorageMax: 10 * 1024, // 10 GB
StorageGCWatermark: 9 * 1024, // 9 GB
GCTimeoutMinute: 60,
}, nil
}

Expand Down
14 changes: 14 additions & 0 deletions repo/fsrepo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"syscall"

ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs"
Expand Down Expand Up @@ -562,6 +563,19 @@ func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
return d
}

// GetDiskUsage computes the disk space taken by the repo in MB
func (r *FSRepo) GetDiskUsage() (int64, error) {
// TODO: This should be cross-platform
// See http://wendal.net/2012/1224.html
pth, err := config.PathRoot()
if err != nil {
return 0, err
}
var stat syscall.Statfs_t
syscall.Statfs(pth, &stat)
return int64(stat.Bsize) * int64(stat.Bavail) / (1024 * 1024), nil
}

var _ io.Closer = &FSRepo{}
var _ repo.Repo = &FSRepo{}

Expand Down
2 changes: 2 additions & 0 deletions repo/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) {

func (m *Mock) Datastore() ds.ThreadSafeDatastore { return m.D }

func (m *Mock) GetDiskUsage() (int64, error) { return 0, nil }

func (m *Mock) Close() error { return errTODO }
1 change: 1 addition & 0 deletions repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Repo interface {
GetConfigKey(key string) (interface{}, error)

Datastore() datastore.ThreadSafeDatastore
GetDiskUsage() (int64, error)

io.Closer
}

0 comments on commit a4704b0

Please sign in to comment.