Skip to content

Commit

Permalink
commands/get: move FileArchive here
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Jan 30, 2019
1 parent 07d1d57 commit afa2ae5
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions core/commands/get.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package commands

import (
"bufio"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"

Expand All @@ -14,8 +16,8 @@ import (
"github.com/ipfs/go-ipfs/core/coreapi/interface"

"gx/ipfs/QmQine7gvHncNevKtG9QXxf3nXcwSj6aDDmMm52mHofEEp/tar-utils"
uarchive "gx/ipfs/QmSMJ4rZbCJaih3y82Ebq7BZqK6vU2FHsKcWKQiE1DPTpS/go-unixfs/archive"
"gx/ipfs/QmWGm4AbZEbnmdgVTza52MSNpEmBdFVqzmAysRbjrRyGbH/go-ipfs-cmds"
"gx/ipfs/QmXWZCd8jfaHmt4UDSnjKmGcrQMw95bDGWqEeVLVJjoANX/go-ipfs-files"
"gx/ipfs/QmYWB8oH6o7qftxoyqTTZhzLrhKCVT7NYahECQTwTtqbgj/pb"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)
Expand Down Expand Up @@ -87,7 +89,7 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetLength(uint64(size))

archive, _ := req.Options[archiveOptionName].(bool)
reader, err := uarchive.FileArchive(file, p.String(), archive, cmplvl)
reader, err := fileArchive(file, p.String(), archive, cmplvl)
if err != nil {
return err
}
Expand Down Expand Up @@ -247,3 +249,94 @@ func getCompressOptions(req *cmds.Request) (int, error) {
}
return cmplvl, nil
}

// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576

type identityWriteCloser struct {
w io.Writer
}

func (i *identityWriteCloser) Write(p []byte) (int, error) {
return i.w.Write(p)
}

func (i *identityWriteCloser) Close() error {
return nil
}

func fileArchive(f files.Node, name string, archive bool, compression int) (io.Reader, error) {
cleaned := path.Clean(name)
_, filename := path.Split(cleaned)

// need to connect a writer to a reader
piper, pipew := io.Pipe()
checkErrAndClosePipe := func(err error) bool {
if err != nil {
pipew.CloseWithError(err)
return true
}
return false
}

// use a buffered writer to parallelize task
bufw := bufio.NewWriterSize(pipew, DefaultBufSize)

// compression determines whether to use gzip compression.
maybeGzw, err := newMaybeGzWriter(bufw, compression)
if checkErrAndClosePipe(err) {
return nil, err
}

closeGzwAndPipe := func() {
if err := maybeGzw.Close(); checkErrAndClosePipe(err) {
return
}
if err := bufw.Flush(); checkErrAndClosePipe(err) {
return
}
pipew.Close() // everything seems to be ok.
}

if !archive && compression != gzip.NoCompression {
// the case when the node is a file
r := files.ToFile(f)
if r == nil {
return nil, errors.New("file is not regular")
}

go func() {
if _, err := io.Copy(maybeGzw, r); checkErrAndClosePipe(err) {
return
}
closeGzwAndPipe() // everything seems to be ok
}()
} else {
// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format

// construct the tar writer
w, err := files.NewTarWriter(maybeGzw)
if checkErrAndClosePipe(err) {
return nil, err
}

go func() {
// write all the nodes recursively
if err := w.WriteFile(f, filename); checkErrAndClosePipe(err) {
return
}
w.Close() // close tar writer
closeGzwAndPipe() // everything seems to be ok
}()
}

return piper, nil
}

func newMaybeGzWriter(w io.Writer, compression int) (io.WriteCloser, error) {
if compression != gzip.NoCompression {
return gzip.NewWriterLevel(w, compression)
}
return &identityWriteCloser{w}, nil
}

0 comments on commit afa2ae5

Please sign in to comment.