Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pin: follow async pinner changes #9859

Merged
merged 3 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"encoding/json"
"io"
"strings"

iface "github.com/ipfs/boxo/coreiface"
Expand Down Expand Up @@ -129,26 +130,31 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
type pinVerifyRes struct {
ok bool
badNodes []iface.BadPinNode
err error
}

func (r *pinVerifyRes) Ok() bool {
func (r pinVerifyRes) Ok() bool {
return r.ok
}

func (r *pinVerifyRes) BadNodes() []iface.BadPinNode {
func (r pinVerifyRes) BadNodes() []iface.BadPinNode {
return r.badNodes
}

func (r pinVerifyRes) Err() error {
return r.err
}

type badNode struct {
err error
cid cid.Cid
}

func (n *badNode) Path() path.Resolved {
func (n badNode) Path() path.Resolved {
return path.IpldPath(n.cid)
}

func (n *badNode) Err() error {
func (n badNode) Err() error {
return n.err
}

Expand All @@ -169,6 +175,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
for {
var out struct {
Cid string
Err string
Ok bool

BadNodes []struct {
Expand All @@ -177,35 +184,42 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
}
}
if err := dec.Decode(&out); err != nil {
return // todo: handle non io.EOF somehow
if err == io.EOF {
return
}
select {
case res <- pinVerifyRes{err: err}:
return
case <-ctx.Done():
return
}
}

if out.Err != "" {
select {
case res <- pinVerifyRes{err: errors.New(out.Err)}:
return
case <-ctx.Done():
return
}
}

badNodes := make([]iface.BadPinNode, len(out.BadNodes))
for i, n := range out.BadNodes {
c, err := cid.Decode(n.Cid)
if err != nil {
badNodes[i] = &badNode{
cid: c,
err: err,
}
badNodes[i] = badNode{cid: c, err: err}
continue
}

if n.Err != "" {
err = errors.New(n.Err)
}
badNodes[i] = &badNode{
cid: c,
err: err,
}
badNodes[i] = badNode{cid: c, err: err}
}

select {
case res <- &pinVerifyRes{
ok: out.Ok,

badNodes: badNodes,
}:
case res <- pinVerifyRes{ok: out.Ok, badNodes: badNodes}:
case <-ctx.Done():
return
}
Expand Down
63 changes: 37 additions & 26 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,17 @@ Example:
}

// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
var emit func(PinLsOutputWrapper) error
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
emit = func(v PinLsOutputWrapper) error {
lgcList[v.PinLsObject.Cid] = PinLsType{Type: v.PinLsObject.Type}
return nil
}
} else {
emit = func(v PinLsOutputWrapper) error {
return res.Emit(v)
}
}

if len(req.Arguments) > 0 {
Expand All @@ -371,7 +374,7 @@ Example:
},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
stream, _ := req.Options[pinStreamOptionName].(bool)

enc := json.NewEncoder(w)
Expand All @@ -382,7 +385,7 @@ Example:

return enc.Encode(out.PinLsList)
}),
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

Expand Down Expand Up @@ -432,7 +435,7 @@ type PinLsObject struct {
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand Down Expand Up @@ -470,7 +473,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
pinType = "indirect through " + pinType
}

err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(rp.Cid()),
Expand All @@ -484,7 +487,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand All @@ -511,7 +514,7 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun
if err := p.Err(); err != nil {
return err
}
err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
Expand Down Expand Up @@ -648,13 +651,14 @@ var verifyPinCmd = &cmds.Command{

// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
Cid string `json:",omitempty"`
Err string `json:",omitempty"`
PinStatus
}

// PinStatus is part of PinVerifyRes, do not use directly
type PinStatus struct {
Ok bool
Ok bool `json:",omitempty"`
BadNodes []BadNode `json:",omitempty"`
}

Expand All @@ -669,16 +673,13 @@ type pinVerifyOpts struct {
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
// FIXME: this implementation is duplicated sith core/coreapi.PinAPI.Verify, remove this one and exclusively rely on CoreAPI.
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
Expand Down Expand Up @@ -719,14 +720,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
return status
}

out := make(chan interface{})
out := make(chan any)
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for p := range n.Pinning.RecursiveKeys(ctx) {
if p.Err != nil {
out <- PinVerifyRes{Err: p.Err.Error()}
return
}
pinStatus := checkPin(p.C)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}:
case out <- PinVerifyRes{Cid: enc.Encode(p.C), PinStatus: pinStatus}:
case <-ctx.Done():
return
}
Expand All @@ -739,12 +744,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci

// Format formats PinVerifyRes
func (r PinVerifyRes) Format(out io.Writer) {
if r.Err != "" {
fmt.Fprintf(out, "error: %s\n", r.Err)
return
}

if r.Ok {
fmt.Fprintf(out, "%s ok\n", r.Cid)
} else {
fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
return
}

fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
}
Loading