diff --git a/core/commands/get.go b/core/commands/get.go index 559b6db0da1b..00df3ca99fa8 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -1,11 +1,13 @@ package commands import ( + "bufio" "compress/gzip" "errors" "fmt" "io" "os" + "path" "path/filepath" "strings" @@ -14,8 +16,8 @@ import ( "github.com/ipfs/go-ipfs/core/coreapi/interface" "gx/ipfs/QmQine7gvHncNevKtG9QXxf3nXcwSj6aDDmMm52mHofEEp/tar-utils" - uarchive "gx/ipfs/QmSMJ4rZbCJaih3y82Ebq7BZqK6vU2FHsKcWKQiE1DPTpS/go-unixfs/archive" "gx/ipfs/QmWGm4AbZEbnmdgVTza52MSNpEmBdFVqzmAysRbjrRyGbH/go-ipfs-cmds" + "gx/ipfs/QmXWZCd8jfaHmt4UDSnjKmGcrQMw95bDGWqEeVLVJjoANX/go-ipfs-files" "gx/ipfs/QmYWB8oH6o7qftxoyqTTZhzLrhKCVT7NYahECQTwTtqbgj/pb" "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" ) @@ -87,7 +89,7 @@ may also specify the level of compression by specifying '-l=<1-9>'. res.SetLength(uint64(size)) archive, _ := req.Options[archiveOptionName].(bool) - reader, err := uarchive.FileArchive(file, p.String(), archive, cmplvl) + reader, err := fileArchive(file, p.String(), archive, cmplvl) if err != nil { return err } @@ -247,3 +249,94 @@ func getCompressOptions(req *cmds.Request) (int, error) { } return cmplvl, nil } + +// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks. +// TODO: does this need to be configurable? +var DefaultBufSize = 1048576 + +type identityWriteCloser struct { + w io.Writer +} + +func (i *identityWriteCloser) Write(p []byte) (int, error) { + return i.w.Write(p) +} + +func (i *identityWriteCloser) Close() error { + return nil +} + +func fileArchive(f files.Node, name string, archive bool, compression int) (io.Reader, error) { + cleaned := path.Clean(name) + _, filename := path.Split(cleaned) + + // need to connect a writer to a reader + piper, pipew := io.Pipe() + checkErrAndClosePipe := func(err error) bool { + if err != nil { + pipew.CloseWithError(err) + return true + } + return false + } + + // use a buffered writer to parallelize task + bufw := bufio.NewWriterSize(pipew, DefaultBufSize) + + // compression determines whether to use gzip compression. + maybeGzw, err := newMaybeGzWriter(bufw, compression) + if checkErrAndClosePipe(err) { + return nil, err + } + + closeGzwAndPipe := func() { + if err := maybeGzw.Close(); checkErrAndClosePipe(err) { + return + } + if err := bufw.Flush(); checkErrAndClosePipe(err) { + return + } + pipew.Close() // everything seems to be ok. + } + + if !archive && compression != gzip.NoCompression { + // the case when the node is a file + r := files.ToFile(f) + if r == nil { + return nil, errors.New("file is not regular") + } + + go func() { + if _, err := io.Copy(maybeGzw, r); checkErrAndClosePipe(err) { + return + } + closeGzwAndPipe() // everything seems to be ok + }() + } else { + // the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format + + // construct the tar writer + w, err := files.NewTarWriter(maybeGzw) + if checkErrAndClosePipe(err) { + return nil, err + } + + go func() { + // write all the nodes recursively + if err := w.WriteFile(f, filename); checkErrAndClosePipe(err) { + return + } + w.Close() // close tar writer + closeGzwAndPipe() // everything seems to be ok + }() + } + + return piper, nil +} + +func newMaybeGzWriter(w io.Writer, compression int) (io.WriteCloser, error) { + if compression != gzip.NoCompression { + return gzip.NewWriterLevel(w, compression) + } + return &identityWriteCloser{w}, nil +}