Skip to content

Commit

Permalink
pin cmd: stream recursive pins
Browse files Browse the repository at this point in the history
Add a --stream flag to stream the results instead of
accumulating the final result in memory.

This is a rework of ipfs#5005
  • Loading branch information
MichaelMure authored and Walter Beegle committed Jun 6, 2020
1 parent b50f684 commit 08351e0
Showing 1 changed file with 107 additions and 44 deletions.
151 changes: 107 additions & 44 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
}

const (
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
)

var listPinCmd = &cmds.Command{
Expand Down Expand Up @@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Don't buffer pins before sending."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
Expand All @@ -326,9 +328,7 @@ Example:
}

typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
return err
}
stream, _ := req.Options[pinStreamOptionName].(bool)

switch typeStr {
case "all", "direct", "indirect", "recursive":
Expand All @@ -337,34 +337,50 @@ Example:
return err
}

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
lgcList := map[string]RefObject{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.RefKeyObject.Cid] = RefObject{Type: obj.RefKeyObject.Type}
return nil
}
}

var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api, emit)
} else {
keys, err = pinLsAll(req.Context, typeStr, n)
err = pinLsAll(req.Context, typeStr, n, emit)
}
if err != nil {
return err
}

refKeys := make(map[string]RefKeyObject, len(keys))
for k, v := range keys {
refKeys[enc.Encode(k)] = v
if !stream {
return cmds.EmitOnce(res, &PinLsOutputWrapper{
RefKeyList: RefKeyList{Keys: lgcList},
})
}

return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys})
return nil
},
Type: RefKeyList{},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.RefKeyObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.RefKeyObject.Cid, out.RefKeyObject.Type)
}
return nil
}

for k, v := range out.Keys {
for k, v := range out.RefKeyList.Keys {
if quiet {
fmt.Fprintf(w, "%s\n", k)
} else {
Expand Down Expand Up @@ -492,80 +508,127 @@ var verifyPinCmd = &cmds.Command{
}

type RefKeyObject struct {
Cid string
Type string
}

type RefObject struct {
Type string
}

type RefKeyList struct {
Keys map[string]RefKeyObject
Keys map[string]RefObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) {
// Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
RefKeyList
RefKeyObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

keys := make(map[cid.Cid]RefKeyObject)

for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p))
if err != nil {
return nil, err
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return nil, err
return err
}

if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
return fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.Cid()] = RefKeyObject{
Type: pinType,

err = emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: pinType,
Cid: c.Cid().String(),
},
})
if err != nil {
return err
}
}

return keys, nil
return nil
}

func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) {

keys := make(map[cid.Cid]RefKeyObject)
func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
keys[c] = RefKeyObject{
Type: typeStr,
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: typeStr,
Cid: c.String(),
},
})
if err != nil {
return err
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
var visitErr error
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: "indirect",
Cid: c.String(),
},
})
if err != nil {
visitErr = err
}
}
return r
})

if visitErr != nil {
return visitErr
}
if err != nil {
return nil, err
return err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}

return keys, nil
return nil
}

// PinVerifyRes is the result returned for each pin checked in "pin verify"
Expand Down

0 comments on commit 08351e0

Please sign in to comment.