diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index df7762c0062..e58ddc82d0e 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -168,13 +168,6 @@ var DagResolveCmd = &cmds.Command{ Type: ResolveOutput{}, } -type importResult struct { - blockCount uint64 - blockBytesCount uint64 - roots map[cid.Cid]struct{} - err error -} - // DagImportCmd is a command for importing a car to ipfs var DagImportCmd = &cmds.Command{ Helptext: cmds.HelpText{ diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index c9f0ebbdeae..ef39e38f493 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -2,24 +2,22 @@ package dagcmd import ( "errors" - "fmt" "io" cid "github.com/ipfs/go-cid" + cmds "github.com/ipfs/go-ipfs-cmds" ipld "github.com/ipfs/go-ipld-format" ipldlegacy "github.com/ipfs/go-ipld-legacy" "github.com/ipfs/go-libipfs/files" - iface "github.com/ipfs/interface-go-ipfs-core" "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/ipfs/interface-go-ipfs-core/path" + gocarv2 "github.com/ipld/go-car/v2" + "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/ipfs/kubo/core/commands/cmdutils" - - cmds "github.com/ipfs/go-ipfs-cmds" - gocarv2 "github.com/ipld/go-car/v2" ) func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - node, err := cmdenv.GetNode(env) if err != nil { return err @@ -38,127 +36,42 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment return err } + doPinRoots, _ := req.Options[pinRootsOptionName].(bool) + // grab a pinlock ( which doubles as a GC lock ) so that regardless of the // size of the streamed-in cars nothing will disappear on us before we had // a chance to roots that may show up at the very end // This is especially important for use cases like dagger: // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) // - unlocker := node.Blockstore.PinLock(req.Context) - defer unlocker.Unlock(req.Context) - - doPinRoots, _ := req.Options[pinRootsOptionName].(bool) - - retCh := make(chan importResult, 1) - go importWorker(req, res, api, retCh) - - done := <-retCh - if done.err != nil { - return done.err - } - - // It is not guaranteed that a root in a header is actually present in the same ( or any ) - // .car file. This is the case in version 1, and ideally in further versions too - // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered - // We will attempt a pin *only* at the end in case all car files were well formed - // - // The boolean value indicates whether we have encountered the root within the car file's - roots := done.roots - - // opportunistic pinning: try whatever sticks if doPinRoots { - - var failedPins int - for c := range roots { - - // We need to re-retrieve a block, convert it to ipld, and feed it - // to the Pinning interface, sigh... - // - // If we didn't have the problem of inability to take multiple pinlocks, - // we could use the api directly like so (though internally it does the same): - // - // // not ideal, but the pinning api takes only paths :( - // rp := path.NewResolvedPath( - // ipfspath.FromCid(c), - // c, - // c, - // "", - // ) - // - // if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil { - - ret := RootMeta{Cid: c} - - if block, err := node.Blockstore.Get(req.Context, c); err != nil { - ret.PinErrorMsg = err.Error() - } else if nd, err := ipldlegacy.DecodeNode(req.Context, block); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Pin(req.Context, nd, true); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Flush(req.Context); err != nil { - ret.PinErrorMsg = err.Error() - } - - if ret.PinErrorMsg != "" { - failedPins++ - } - - if err := res.Emit(&CarImportOutput{Root: &ret}); err != nil { - return err - } - } - - if failedPins > 0 { - return fmt.Errorf( - "unable to pin all roots: %d out of %d failed", - failedPins, - len(roots), - ) - } - } - - stats, _ := req.Options[statsOptionName].(bool) - if stats { - err = res.Emit(&CarImportOutput{ - Stats: &CarImportStats{ - BlockCount: done.blockCount, - BlockBytesCount: done.blockBytesCount, - }, - }) - if err != nil { - return err - } + unlocker := node.Blockstore.PinLock(req.Context) + defer unlocker.Unlock(req.Context) } - return nil -} - -func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) { - // this is *not* a transaction // it is simply a way to relieve pressure on the blockstore // similar to pinner.Pin/pinner.Flush batch := ipld.NewBatch(req.Context, api.Dag()) - roots := make(map[cid.Cid]struct{}) + roots := cid.NewSet() var blockCount, blockBytesCount uint64 it := req.Files.Entries() for it.Next() { - file := files.FileFromEntry(it) if file == nil { - ret <- importResult{err: errors.New("expected a file handle")} - return + return errors.New("expected a file handle") } - // wrap a defer-closer-scope - // - // every single file in it() is already open before we start - // just close here sooner rather than later for neatness - // and to surface potential errors writing on closed fifos - // this won't/can't help with not running out of handles - err := func() error { + // import blocks + err = func() error { + // wrap a defer-closer-scope + // + // every single file in it() is already open before we start + // just close here sooner rather than later for neatness + // and to surface potential errors writing on closed fifos + // this won't/can't help with not running out of handles defer file.Close() car, err := gocarv2.NewBlockReader(file) @@ -167,7 +80,7 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, } for _, c := range car.Roots { - roots[c] = struct{}{} + roots.Add(c) } for { @@ -193,28 +106,51 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, blockCount++ blockBytesCount += uint64(len(block.RawData())) } - return nil }() + } - if err != nil { - ret <- importResult{err: err} - return - } + if err := batch.Commit(); err != nil { + return err } - if err := it.Err(); err != nil { - ret <- importResult{err: err} - return + // It is not guaranteed that a root in a header is actually present in the same ( or any ) + // .car file. This is the case in version 1, and ideally in further versions too. + // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered + // We will attempt a pin *only* at the end in case all car files were well-formed. + + // opportunistic pinning: try whatever sticks + if doPinRoots { + err = roots.ForEach(func(c cid.Cid) error { + ret := RootMeta{Cid: c} + + // This will trigger a full read of the DAG in the pinner, to make sure we have all blocks. + // Ideally we would have a lighter merkledag.Walk() instead of the underlying merkledag.FetchDag, + // then pinner.PinWithMode(). + err = api.Pin().Add(req.Context, path.IpldPath(c), options.Pin.Recursive(true)) + if err != nil { + return err + } + + return res.Emit(&CarImportOutput{Root: &ret}) + }) + if err != nil { + return err + } } - if err := batch.Commit(); err != nil { - ret <- importResult{err: err} - return + stats, _ := req.Options[statsOptionName].(bool) + if stats { + err = res.Emit(&CarImportOutput{ + Stats: &CarImportStats{ + BlockCount: blockCount, + BlockBytesCount: blockBytesCount, + }, + }) + if err != nil { + return err + } } - ret <- importResult{ - blockCount: blockCount, - blockBytesCount: blockBytesCount, - roots: roots} + return nil }