From 2884c8434350538339bef4e9e2e4da8657548e3e Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 20 Jan 2017 10:48:23 -0800 Subject: [PATCH] Implement basic filestore 'no-copy' functionality License: MIT Signed-off-by: Jeromy --- blocks/blockstore/blockstore.go | 6 +- commands/files/file.go | 6 +- commands/files/multipartfile.go | 11 +- commands/files/readerfile.go | 17 ++- commands/files/serialfile.go | 3 +- commands/http/multifilereader.go | 3 + core/builder.go | 12 ++- core/commands/add.go | 47 +++++--- core/core.go | 4 + core/coreunix/add.go | 2 + core/coreunix/add_test.go | 15 ++- filestore/filestore.go | 169 +++++++++++++++++++++++++++++ filestore/filestore_test.go | 104 ++++++++++++++++++ filestore/fsrefstore.go | 177 +++++++++++++++++++++++++++++++ filestore/pb/Makefile | 10 ++ filestore/pb/dataobj.pb.go | 67 ++++++++++++ filestore/pb/dataobj.proto | 9 ++ importer/helpers/dagbuilder.go | 10 +- importer/helpers/helpers.go | 10 +- pin/gc/gc.go | 2 +- repo/config/config.go | 3 +- repo/config/experiments.go | 5 + repo/fsrepo/fsrepo.go | 14 +++ repo/mock.go | 3 + repo/repo.go | 3 + test/sharness/t0270-filestore.sh | 93 ++++++++++++++++ 26 files changed, 769 insertions(+), 36 deletions(-) create mode 100644 filestore/filestore.go create mode 100644 filestore/filestore_test.go create mode 100644 filestore/fsrefstore.go create mode 100644 filestore/pb/Makefile create mode 100644 filestore/pb/dataobj.pb.go create mode 100644 filestore/pb/dataobj.proto create mode 100644 repo/config/experiments.go create mode 100755 test/sharness/t0270-filestore.sh diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 17ab24b3e8a..e34c3a8ee88 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -163,7 +163,11 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) { } func (s *blockstore) DeleteBlock(k *cid.Cid) error { - return s.datastore.Delete(dshelp.CidToDsKey(k)) + err := s.datastore.Delete(dshelp.CidToDsKey(k)) + if err == ds.ErrNotFound { + return ErrNotFound + } + return err } // AllKeysChan runs a query for keys from the blockstore. diff --git a/commands/files/file.go b/commands/files/file.go index 22d8ac2d00a..949180716af 100644 --- a/commands/files/file.go +++ b/commands/files/file.go @@ -19,10 +19,10 @@ type File interface { // they are not directories io.ReadCloser - // FileName returns a filename path associated with this file + // FileName returns a filename associated with this file FileName() string - // FullPath returns the full path in the os associated with this file + // FullPath returns the full path used when adding this file FullPath() string // IsDirectory returns true if the File is a directory (and therefore @@ -57,6 +57,6 @@ type SizeFile interface { } type FileInfo interface { - FullPath() string + AbsPath() string Stat() os.FileInfo } diff --git a/commands/files/multipartfile.go b/commands/files/multipartfile.go index b71dd7fe600..21e0d44c143 100644 --- a/commands/files/multipartfile.go +++ b/commands/files/multipartfile.go @@ -14,6 +14,7 @@ const ( applicationDirectory = "application/x-directory" applicationSymlink = "application/symlink" + applicationFile = "application/octet-stream" contentTypeHeader = "Content-Type" ) @@ -34,7 +35,8 @@ func NewFileFromPart(part *multipart.Part) (File, error) { } contentType := part.Header.Get(contentTypeHeader) - if contentType == applicationSymlink { + switch contentType { + case applicationSymlink: out, err := ioutil.ReadAll(part) if err != nil { return nil, err @@ -44,6 +46,13 @@ func NewFileFromPart(part *multipart.Part) (File, error) { Target: string(out), name: f.FileName(), }, nil + case applicationFile: + return &ReaderFile{ + reader: part, + filename: f.FileName(), + abspath: part.Header.Get("abspath"), + fullpath: f.FullPath(), + }, nil } var err error diff --git a/commands/files/readerfile.go b/commands/files/readerfile.go index 7458e82dd22..863641479df 100644 --- a/commands/files/readerfile.go +++ b/commands/files/readerfile.go @@ -4,6 +4,7 @@ import ( "errors" "io" "os" + "path/filepath" ) // ReaderFile is a implementation of File created from an `io.Reader`. @@ -11,12 +12,22 @@ import ( type ReaderFile struct { filename string fullpath string + abspath string reader io.ReadCloser stat os.FileInfo } func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile { - return &ReaderFile{filename, path, reader, stat} + return &ReaderFile{filename, path, path, reader, stat} +} + +func NewReaderPathFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) (*ReaderFile, error) { + abspath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + + return &ReaderFile{filename, path, abspath, reader, stat}, nil } func (f *ReaderFile) IsDirectory() bool { @@ -35,6 +46,10 @@ func (f *ReaderFile) FullPath() string { return f.fullpath } +func (f *ReaderFile) AbsPath() string { + return f.abspath +} + func (f *ReaderFile) Read(p []byte) (int, error) { return f.reader.Read(p) } diff --git a/commands/files/serialfile.go b/commands/files/serialfile.go index 22de5c1b484..0c5fbe776a8 100644 --- a/commands/files/serialfile.go +++ b/commands/files/serialfile.go @@ -23,13 +23,14 @@ type serialFile struct { } func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) { + switch mode := stat.Mode(); { case mode.IsRegular(): file, err := os.Open(path) if err != nil { return nil, err } - return NewReaderFile(name, path, file, stat), nil + return NewReaderPathFile(name, path, file, stat) case mode.IsDir(): // for directories, stat all of the contents first, so we know what files to // open when NextFile() is called diff --git a/commands/http/multifilereader.go b/commands/http/multifilereader.go index 1df121211e0..b09fe98c529 100644 --- a/commands/http/multifilereader.go +++ b/commands/http/multifilereader.go @@ -95,6 +95,9 @@ func (mfr *MultiFileReader) Read(buf []byte) (written int, err error) { header.Set("Content-Disposition", fmt.Sprintf("file; filename=\"%s\"", filename)) header.Set("Content-Type", contentType) + if rf, ok := file.(*files.ReaderFile); ok { + header.Set("abspath", rf.AbsPath()) + } _, err := mfr.mpWriter.CreatePart(header) if err != nil { diff --git a/core/builder.go b/core/builder.go index f659720c793..0a5f501effa 100644 --- a/core/builder.go +++ b/core/builder.go @@ -12,6 +12,7 @@ import ( bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bserv "github.com/ipfs/go-ipfs/blockservice" offline "github.com/ipfs/go-ipfs/exchange/offline" + filestore "github.com/ipfs/go-ipfs/filestore" dag "github.com/ipfs/go-ipfs/merkledag" path "github.com/ipfs/go-ipfs/path" pin "github.com/ipfs/go-ipfs/pin" @@ -166,8 +167,8 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { TempErrFunc: isTooManyFDError, } - var err error bs := bstore.NewBlockstore(rds) + opts := bstore.DefaultCacheOpts() conf, err := n.Repo.Config() if err != nil { @@ -184,7 +185,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } - n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker()) + n.BaseBlocks = cbs + n.GCLocker = bstore.NewGCLocker() + n.Blockstore = bstore.NewGCBlockstore(cbs, n.GCLocker) + + if conf.Experimental.FilestoreEnabled { + n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager()) + n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker) + } rcfg, err := n.Repo.Config() if err != nil { diff --git a/core/commands/add.go b/core/commands/add.go index 871a67c6c50..2b0478ebb20 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -7,6 +7,7 @@ import ( "github.com/ipfs/go-ipfs/core/coreunix" "gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb" + bstore "github.com/ipfs/go-ipfs/blocks/blockstore" blockservice "github.com/ipfs/go-ipfs/blockservice" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" @@ -23,16 +24,18 @@ import ( var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded") const ( - quietOptionName = "quiet" - silentOptionName = "silent" - progressOptionName = "progress" - trickleOptionName = "trickle" - wrapOptionName = "wrap-with-directory" - hiddenOptionName = "hidden" - onlyHashOptionName = "only-hash" - chunkerOptionName = "chunker" - pinOptionName = "pin" - rawLeavesOptionName = "raw-leaves" + quietOptionName = "quiet" + silentOptionName = "silent" + progressOptionName = "progress" + trickleOptionName = "trickle" + wrapOptionName = "wrap-with-directory" + hiddenOptionName = "hidden" + onlyHashOptionName = "only-hash" + chunkerOptionName = "chunker" + pinOptionName = "pin" + rawLeavesOptionName = "raw-leaves" + noCopyOptionName = "nocopy" + fstoreCacheOptionName = "fscache" ) var AddCmd = &cmds.Command{ @@ -78,6 +81,8 @@ You can now refer to the added file in a gateway, like so: cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true), cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"), + cmds.BoolOption(noCopyOptionName, "Add the file using filestore. (experimental)"), + cmds.BoolOption(fstoreCacheOptionName, "Check the filestore for pre-existing blocks. (experimental)"), }, PreRun: func(req cmds.Request) error { quiet, _, _ := req.Option(quietOptionName).Bool() @@ -140,6 +145,13 @@ You can now refer to the added file in a gateway, like so: chunker, _, _ := req.Option(chunkerOptionName).String() dopin, _, _ := req.Option(pinOptionName).Bool() rawblks, _, _ := req.Option(rawLeavesOptionName).Bool() + nocopy, _, _ := req.Option(noCopyOptionName).Bool() + fscache, _, _ := req.Option(fstoreCacheOptionName).Bool() + + if nocopy && !rawblks { + res.SetError(fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well"), cmds.ErrNormal) + return + } if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ @@ -154,14 +166,20 @@ You can now refer to the added file in a gateway, like so: n = nilnode } - dserv := n.DAG + addblockstore := n.Blockstore + if !fscache && !nocopy { + addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker) + } + + exch := n.Exchange local, _, _ := req.Option("local").Bool() if local { - offlineexch := offline.Exchange(n.Blockstore) - bserv := blockservice.New(n.Blockstore, offlineexch) - dserv = dag.NewDAGService(bserv) + exch = offline.Exchange(addblockstore) } + bserv := blockservice.New(addblockstore, exch) + dserv := dag.NewDAGService(bserv) + outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) @@ -180,6 +198,7 @@ You can now refer to the added file in a gateway, like so: fileAdder.Pin = dopin fileAdder.Silent = silent fileAdder.RawLeaves = rawblks + fileAdder.NoCopy = nocopy if hash { md := dagtest.Mock() diff --git a/core/core.go b/core/core.go index 4df31f5fb68..039715e953d 100644 --- a/core/core.go +++ b/core/core.go @@ -28,6 +28,7 @@ import ( bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" rp "github.com/ipfs/go-ipfs/exchange/reprovide" + filestore "github.com/ipfs/go-ipfs/filestore" mount "github.com/ipfs/go-ipfs/fuse/mount" merkledag "github.com/ipfs/go-ipfs/merkledag" mfs "github.com/ipfs/go-ipfs/mfs" @@ -110,6 +111,9 @@ type IpfsNode struct { // Services Peerstore pstore.Peerstore // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) + Filestore *filestore.Filestore // the filestore blockstore + BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping + GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc Blocks bserv.BlockService // the block service, get/add blocks. DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 460efdce36b..322d1b5ca90 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -103,6 +103,7 @@ type Adder struct { RawLeaves bool Silent bool Wrap bool + NoCopy bool Chunker string root node.Node mr *mfs.Root @@ -124,6 +125,7 @@ func (adder Adder) add(reader io.Reader) (node.Node, error) { Dagserv: adder.dagService, RawLeaves: adder.RawLeaves, Maxlinks: ihelper.DefaultLinksPerBlock, + NoCopy: adder.NoCopy, } if adder.Trickle { diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index 90677f341ca..9b95ed7d0b2 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -193,6 +193,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { adder.Out = make(chan interface{}) adder.Progress = true adder.RawLeaves = rawLeaves + adder.NoCopy = true data := make([]byte, 5*1024*1024) rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error @@ -210,12 +211,18 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { for _ = range adder.Out { } - if bs.countAtOffsetZero != 2 { - t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero) + exp := 0 + nonOffZero := 0 + if rawLeaves { + exp = 1 + nonOffZero = 19 } - if bs.countAtOffsetNonZero != 19 { + if bs.countAtOffsetZero != exp { + t.Fatalf("expected %d blocks with an offset at zero (one root and one leafh), got %d", exp, bs.countAtOffsetZero) + } + if bs.countAtOffsetNonZero != nonOffZero { // note: the exact number will depend on the size and the sharding algo. used - t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero) + t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero) } } diff --git a/filestore/filestore.go b/filestore/filestore.go new file mode 100644 index 00000000000..668b6149cde --- /dev/null +++ b/filestore/filestore.go @@ -0,0 +1,169 @@ +package filestore + +import ( + "context" + + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/blockstore" + posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo" + + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid" +) + +var log = logging.Logger("filestore") + +type Filestore struct { + fm *FileManager + bs blockstore.Blockstore +} + +func NewFilestore(bs blockstore.Blockstore, fm *FileManager) *Filestore { + return &Filestore{fm, bs} +} + +func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + ctx, cancel := context.WithCancel(ctx) + + a, err := f.bs.AllKeysChan(ctx) + if err != nil { + return nil, err + } + + out := make(chan *cid.Cid) + go func() { + defer cancel() + defer close(out) + + var done bool + for !done { + select { + case c, ok := <-a: + if !ok { + done = true + continue + } + select { + case out <- c: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + + // Can't do these at the same time because the abstractions around + // leveldb make us query leveldb for both operations. We apparently + // cant query leveldb concurrently + b, err := f.fm.AllKeysChan(ctx) + if err != nil { + log.Error("error querying filestore: ", err) + return + } + + done = false + for !done { + select { + case c, ok := <-b: + if !ok { + done = true + continue + } + select { + case out <- c: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return out, nil +} + +func (f *Filestore) DeleteBlock(c *cid.Cid) error { + err1 := f.bs.DeleteBlock(c) + if err1 != nil && err1 != blockstore.ErrNotFound { + return err1 + } + + if err2 := f.fm.DeleteBlock(c); err2 != nil { + // if we successfully removed something from the blockstore, but the + // filestore didnt have it, return success + if err1 == nil && err2 != blockstore.ErrNotFound { + return nil + } + return err2 + } + + return nil +} + +func (f *Filestore) Get(c *cid.Cid) (blocks.Block, error) { + blk, err := f.bs.Get(c) + switch err { + default: + return nil, err + case nil: + return blk, nil + case blockstore.ErrNotFound: + // try filestore + } + + return f.fm.Get(c) +} + +func (f *Filestore) Has(c *cid.Cid) (bool, error) { + has, err := f.bs.Has(c) + if err != nil { + return false, err + } + + if has { + return true, nil + } + + return f.fm.Has(c) +} + +func (f *Filestore) Put(b blocks.Block) error { + switch b := b.(type) { + case *posinfo.FilestoreNode: + return f.fm.Put(b) + default: + return f.bs.Put(b) + } +} + +func (f *Filestore) PutMany(bs []blocks.Block) error { + var normals []blocks.Block + var fstores []*posinfo.FilestoreNode + + for _, b := range bs { + switch b := b.(type) { + case *posinfo.FilestoreNode: + fstores = append(fstores, b) + default: + normals = append(normals, b) + } + } + + if len(normals) > 0 { + err := f.bs.PutMany(normals) + if err != nil { + return err + } + } + + if len(fstores) > 0 { + err := f.fm.PutMany(fstores) + if err != nil { + return err + } + } + return nil +} + +var _ blockstore.Blockstore = (*Filestore)(nil) diff --git a/filestore/filestore_test.go b/filestore/filestore_test.go new file mode 100644 index 00000000000..87f180003bd --- /dev/null +++ b/filestore/filestore_test.go @@ -0,0 +1,104 @@ +package filestore + +import ( + "bytes" + "context" + "io/ioutil" + "math/rand" + "testing" + + "github.com/ipfs/go-ipfs/blocks/blockstore" + dag "github.com/ipfs/go-ipfs/merkledag" + posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo" + + ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid" +) + +func newTestFilestore(t *testing.T) (string, *Filestore) { + mds := ds.NewMapDatastore() + + testdir, err := ioutil.TempDir("", "filestore-test") + if err != nil { + t.Fatal(err) + } + fm := NewFileManager(mds, testdir) + + bs := blockstore.NewBlockstore(mds) + fstore := NewFilestore(bs, fm) + return testdir, fstore +} + +func makeFile(dir string, data []byte) (string, error) { + f, err := ioutil.TempFile(dir, "file") + if err != nil { + return "", err + } + + _, err = f.Write(data) + if err != nil { + return "", err + } + + return f.Name(), nil +} + +func TestBasicFilestore(t *testing.T) { + dir, fs := newTestFilestore(t) + + buf := make([]byte, 1000) + rand.Read(buf) + + fname, err := makeFile(dir, buf) + if err != nil { + t.Fatal(err) + } + + var cids []*cid.Cid + for i := 0; i < 100; i++ { + n := &posinfo.FilestoreNode{ + PosInfo: &posinfo.PosInfo{ + FullPath: fname, + Offset: uint64(i * 10), + }, + Node: dag.NewRawNode(buf[i*10 : (i+1)*10]), + } + + err := fs.Put(n) + if err != nil { + t.Fatal(err) + } + cids = append(cids, n.Node.Cid()) + } + + for i, c := range cids { + blk, err := fs.Get(c) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) { + t.Fatal("data didnt match on the way out") + } + } + + kch, err := fs.AllKeysChan(context.Background()) + if err != nil { + t.Fatal(err) + } + + out := make(map[string]struct{}) + for c := range kch { + out[c.KeyString()] = struct{}{} + } + + if len(out) != len(cids) { + t.Fatal("mismatch in number of entries") + } + + for _, c := range cids { + if _, ok := out[c.KeyString()]; !ok { + t.Fatal("missing cid: ", c) + } + } +} diff --git a/filestore/fsrefstore.go b/filestore/fsrefstore.go new file mode 100644 index 00000000000..351c81124ee --- /dev/null +++ b/filestore/fsrefstore.go @@ -0,0 +1,177 @@ +package filestore + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/blockstore" + pb "github.com/ipfs/go-ipfs/filestore/pb" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo" + + ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + dsns "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/namespace" + dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query" + proto "gx/ipfs/QmT6n4mspWYEya864BhCUJEgyxiRfmiSY9ruQwTUNpRKaM/protobuf/proto" + cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid" +) + +var FilestorePrefix = ds.NewKey("filestore") + +type FileManager struct { + ds ds.Batching + root string +} + +type CorruptReferenceError struct { + Err error +} + +func (c CorruptReferenceError) Error() string { + return c.Err.Error() +} + +func NewFileManager(ds ds.Batching, root string) *FileManager { + return &FileManager{dsns.Wrap(ds, FilestorePrefix), root} +} + +func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + q := dsq.Query{KeysOnly: true} + q.Prefix = FilestorePrefix.String() + + res, err := f.ds.Query(q) + if err != nil { + return nil, err + } + + out := make(chan *cid.Cid) + go func() { + defer close(out) + for { + v, ok := res.NextSync() + if !ok { + return + } + + k := ds.RawKey(v.Key) + c, err := dshelp.DsKeyToCid(k) + if err != nil { + log.Error("decoding cid from filestore: %s", err) + continue + } + + select { + case out <- c: + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + +func (f *FileManager) DeleteBlock(c *cid.Cid) error { + err := f.ds.Delete(dshelp.CidToDsKey(c)) + if err == ds.ErrNotFound { + return blockstore.ErrNotFound + } + return err +} + +func (f *FileManager) Get(c *cid.Cid) (blocks.Block, error) { + o, err := f.ds.Get(dshelp.CidToDsKey(c)) + switch err { + case ds.ErrNotFound: + return nil, blockstore.ErrNotFound + default: + return nil, err + case nil: + // + } + + data, ok := o.([]byte) + if !ok { + return nil, fmt.Errorf("stored filestore dataobj was not a []byte") + } + + var dobj pb.DataObj + if err := proto.Unmarshal(data, &dobj); err != nil { + return nil, err + } + + out, err := f.readDataObj(&dobj) + if err != nil { + return nil, err + } + + return blocks.NewBlockWithCid(out, c) +} + +func (f *FileManager) readDataObj(d *pb.DataObj) ([]byte, error) { + abspath := filepath.Join(f.root, d.GetFilePath()) + + fi, err := os.Open(abspath) + if err != nil { + return nil, &CorruptReferenceError{err} + } + defer fi.Close() + + _, err = fi.Seek(int64(d.GetOffset()), os.SEEK_SET) + if err != nil { + return nil, &CorruptReferenceError{err} + } + + outbuf := make([]byte, d.GetSize_()) + _, err = io.ReadFull(fi, outbuf) + if err != nil { + return nil, &CorruptReferenceError{err} + } + + return outbuf, nil +} + +func (f *FileManager) Has(c *cid.Cid) (bool, error) { + // NOTE: interesting thing to consider. Has doesnt validate the data. + // So the data on disk could be invalid, and we could think we have it. + dsk := dshelp.CidToDsKey(c) + return f.ds.Has(dsk) +} + +func (f *FileManager) Put(b *posinfo.FilestoreNode) error { + var dobj pb.DataObj + + if !filepath.HasPrefix(b.PosInfo.FullPath, f.root) { + return fmt.Errorf("cannot add filestore references outside ipfs root") + } + + p, err := filepath.Rel(f.root, b.PosInfo.FullPath) + if err != nil { + return err + } + + dobj.FilePath = proto.String(p) + dobj.Offset = proto.Uint64(b.PosInfo.Offset) + dobj.Size_ = proto.Uint64(uint64(len(b.RawData()))) + + data, err := proto.Marshal(&dobj) + if err != nil { + return err + } + + return f.ds.Put(dshelp.CidToDsKey(b.Cid()), data) +} + +func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error { + // TODO: this better + for _, b := range bs { + if err := f.Put(b); err != nil { + return err + } + } + return nil +} diff --git a/filestore/pb/Makefile b/filestore/pb/Makefile new file mode 100644 index 00000000000..5101a482d20 --- /dev/null +++ b/filestore/pb/Makefile @@ -0,0 +1,10 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gogo_out=. $< + +clean: + rm *.pb.go diff --git a/filestore/pb/dataobj.pb.go b/filestore/pb/dataobj.pb.go new file mode 100644 index 00000000000..6f1005add1f --- /dev/null +++ b/filestore/pb/dataobj.pb.go @@ -0,0 +1,67 @@ +// Code generated by protoc-gen-gogo. +// source: dataobj.proto +// DO NOT EDIT! + +/* +Package datastore_pb is a generated protocol buffer package. + +It is generated from these files: + dataobj.proto + +It has these top-level messages: + DataObj +*/ +package datastore_pb + +import proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type DataObj struct { + FilePath *string `protobuf:"bytes,1,opt,name=FilePath" json:"FilePath,omitempty"` + Offset *uint64 `protobuf:"varint,2,opt,name=Offset" json:"Offset,omitempty"` + Size_ *uint64 `protobuf:"varint,3,opt,name=Size" json:"Size,omitempty"` + Modtime *float64 `protobuf:"fixed64,4,opt,name=Modtime" json:"Modtime,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DataObj) Reset() { *m = DataObj{} } +func (m *DataObj) String() string { return proto.CompactTextString(m) } +func (*DataObj) ProtoMessage() {} + +func (m *DataObj) GetFilePath() string { + if m != nil && m.FilePath != nil { + return *m.FilePath + } + return "" +} + +func (m *DataObj) GetOffset() uint64 { + if m != nil && m.Offset != nil { + return *m.Offset + } + return 0 +} + +func (m *DataObj) GetSize_() uint64 { + if m != nil && m.Size_ != nil { + return *m.Size_ + } + return 0 +} + +func (m *DataObj) GetModtime() float64 { + if m != nil && m.Modtime != nil { + return *m.Modtime + } + return 0 +} + +func init() { + proto.RegisterType((*DataObj)(nil), "datastore.pb.DataObj") +} diff --git a/filestore/pb/dataobj.proto b/filestore/pb/dataobj.proto new file mode 100644 index 00000000000..a5364e5e068 --- /dev/null +++ b/filestore/pb/dataobj.proto @@ -0,0 +1,9 @@ +package datastore.pb; + +message DataObj { + optional string FilePath = 1; + optional uint64 Offset = 2; + optional uint64 Size = 3; + + optional double Modtime = 4; +} diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 50331aba821..f5c3b1cb3ca 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -35,6 +35,10 @@ type DagBuilderParams struct { // DAGService to write blocks to (required) Dagserv dag.DAGService + + // NoCopy signals to the chunker that it should track fileinfo for + // filestore adds + NoCopy bool } // Generate a new DagBuilderHelper from the given params, which data source comes @@ -47,8 +51,8 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { maxlinks: dbp.Maxlinks, batch: dbp.Dagserv.Batch(), } - if fi, ok := spl.Reader().(files.FileInfo); ok { - db.fullPath = fi.FullPath() + if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok { + db.fullPath = fi.AbsPath() db.stat = fi.Stat() } return db @@ -146,7 +150,7 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { } func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { - if db.stat != nil { + if db.fullPath != "" { node.SetPosInfo(offset, db.fullPath, db.stat) } } diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 33c0cf7ac3c..97adaa508bb 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -160,10 +160,12 @@ func (n *UnixfsNode) GetDagNode() (node.Node, error) { } if n.posInfo != nil { - return &pi.FilestoreNode{ - Node: nd, - PosInfo: n.posInfo, - }, nil + if rn, ok := nd.(*dag.RawNode); ok { + return &pi.FilestoreNode{ + Node: rn, + PosInfo: n.posInfo, + }, nil + } } return nd, nil diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 91bdde2990a..78289d028d8 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -51,7 +51,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. if !gcs.Has(k) { err := bs.DeleteBlock(k) if err != nil { - log.Debugf("Error removing key from blockstore: %s", err) + log.Errorf("Error removing key from blockstore: %s", err) return } select { diff --git a/repo/config/config.go b/repo/config/config.go index 898cf56a472..fa94d1e3f07 100644 --- a/repo/config/config.go +++ b/repo/config/config.go @@ -30,7 +30,8 @@ type Config struct { API API // local node's API settings Swarm SwarmConfig - Reprovider Reprovider + Reprovider Reprovider + Experimental Experiments } const ( diff --git a/repo/config/experiments.go b/repo/config/experiments.go new file mode 100644 index 00000000000..4757d15a362 --- /dev/null +++ b/repo/config/experiments.go @@ -0,0 +1,5 @@ +package config + +type Experiments struct { + FilestoreEnabled bool +} diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 43f09b832a7..19e1f2111b4 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -11,6 +11,7 @@ import ( "strings" "sync" + filestore "github.com/ipfs/go-ipfs/filestore" keystore "github.com/ipfs/go-ipfs/keystore" repo "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/common" @@ -100,6 +101,7 @@ type FSRepo struct { config *config.Config ds repo.Datastore keystore keystore.Keystore + filemgr *filestore.FileManager } var _ repo.Repo = (*FSRepo)(nil) @@ -172,6 +174,10 @@ func open(repoPath string) (repo.Repo, error) { return nil, err } + if r.config.Experimental.FilestoreEnabled { + r.filemgr = filestore.NewFileManager(r.ds, filepath.Dir(r.path)) + } + keepLocked = true return r, nil } @@ -316,6 +322,10 @@ func (r *FSRepo) Keystore() keystore.Keystore { return r.keystore } +func (r *FSRepo) Path() string { + return r.path +} + // SetAPIAddr writes the API Addr to the /api file. func (r *FSRepo) SetAPIAddr(addr ma.Multiaddr) error { f, err := os.Create(filepath.Join(r.path, apiFile)) @@ -424,6 +434,10 @@ func (r *FSRepo) Config() (*config.Config, error) { return r.config, nil } +func (r *FSRepo) FileManager() *filestore.FileManager { + return r.filemgr +} + // setConfigUnsynced is for private use. func (r *FSRepo) setConfigUnsynced(updated *config.Config) error { configFilename, err := config.Filename(r.path) diff --git a/repo/mock.go b/repo/mock.go index 07151fca983..4af98d8f5c8 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -3,6 +3,7 @@ package repo import ( "errors" + filestore "github.com/ipfs/go-ipfs/filestore" keystore "github.com/ipfs/go-ipfs/keystore" "github.com/ipfs/go-ipfs/repo/config" @@ -48,3 +49,5 @@ func (m *Mock) Keystore() keystore.Keystore { return nil } func (m *Mock) SwarmKey() ([]byte, error) { return nil, nil } + +func (m *Mock) FileManager() *filestore.FileManager { return nil } diff --git a/repo/repo.go b/repo/repo.go index 493d06e084f..1ebc364e729 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -4,6 +4,7 @@ import ( "errors" "io" + filestore "github.com/ipfs/go-ipfs/filestore" keystore "github.com/ipfs/go-ipfs/keystore" config "github.com/ipfs/go-ipfs/repo/config" @@ -27,6 +28,8 @@ type Repo interface { Keystore() keystore.Keystore + FileManager() *filestore.FileManager + // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr ma.Multiaddr) error diff --git a/test/sharness/t0270-filestore.sh b/test/sharness/t0270-filestore.sh new file mode 100755 index 00000000000..7ccf7fd7e66 --- /dev/null +++ b/test/sharness/t0270-filestore.sh @@ -0,0 +1,93 @@ +#!/bin/sh +# +# Copyright (c) 2017 Jeromy Johnson +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test out the filestore nocopy functionality" + +. lib/test-lib.sh + + +test_expect_success "create a dataset" ' + random-files -seed=483 -depth=3 -dirs=4 -files=6 -filesize=1000000 somedir +' + +EXPHASH="QmW4JLyeTxEWGwa4mkE9mHzdtAkyhMX2ToGFEKZNjCiJud" + +get_repo_size() { + disk_usage "$IPFS_PATH" +} + +assert_repo_size_less_than() { + expval="$1" + + test_expect_success "check repo size" ' + test "$(get_repo_size)" -lt "$expval" || + (get_repo_size && false) + ' +} + +assert_repo_size_greater_than() { + expval="$1" + + test_expect_success "check repo size" ' + test "$(get_repo_size)" -gt "$expval" || + (get_repo_size && false) + ' +} + +test_filestore_adds() { + test_expect_success "nocopy add succeeds" ' + HASH=$(ipfs add --raw-leaves --nocopy -r -q somedir | tail -n1) + ' + + test_expect_success "nocopy add has right hash" ' + test "$HASH" = "$EXPHASH" + ' + + assert_repo_size_less_than 1000000 + + test_expect_success "normal add with fscache doesnt duplicate data" ' + HASH2=$(ipfs add --raw-leaves --fscache -r -q somedir | tail -n1) + ' + + assert_repo_size_less_than 1000000 + + test_expect_success "normal add without fscache duplicates data" ' + HASH2=$(ipfs add --raw-leaves -r -q somedir | tail -n1) + ' + + assert_repo_size_greater_than 1000000 +} + +init_ipfs_filestore() { + test_expect_success "clean up old node" ' + rm -rf "$IPFS_PATH" mountdir ipfs ipns + ' + + test_init_ipfs + + test_expect_success "enable filestore config setting" ' + ipfs config --json Experimental.FilestoreEnabled true + ' +} + +init_ipfs_filestore + +test_filestore_adds + +echo "WORKING DIR" +echo "IPFS PATH = " $IPFS_PATH +pwd + + +init_ipfs_filestore + +test_launch_ipfs_daemon + +test_filestore_adds + +test_kill_ipfs_daemon + +test_done