Skip to content

Commit

Permalink
refactor(cmds): do not return errors embedded in result type (#10527)
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Dec 3, 2024
1 parent 53e793a commit 224d6a3
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 341 deletions.
64 changes: 25 additions & 39 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,45 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
defer close(pins)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
return err
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}

pins := make(chan iface.Pin)
go func(ch chan<- iface.Pin) {
defer res.Output.Close()
defer close(ch)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
switch err := dec.Decode(&out); err {
case nil:
case io.EOF:
return
default:
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
defer res.Output.Close()

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
return err
}
return nil
}

c, err := cid.Parse(out.Cid)
if err != nil {
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
}
c, err := cid.Parse(out.Cid)
if err != nil {
return err
}

select {
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return
}
select {
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return ctx.Err()
}
}(pins)
return pins, nil
}
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
116 changes: 48 additions & 68 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
defer close(out)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
return err
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -156,86 +158,64 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}
if resp.Error != nil {
return nil, resp.Error
return err
}
defer resp.Close()

dec := json.NewDecoder(resp.Output)
out := make(chan iface.DirEntry)

go func() {
defer resp.Close()
defer close(out)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}

if len(link.Objects) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
return
for {
var link lsOutput
if err = dec.Decode(&link); err != nil {
if err != io.EOF {
return err
}
return nil
}

if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects) != 1 {
return errors.New("unexpected Objects len")
}

l0 := link.Objects[0].Links[0]
if len(link.Objects[0].Links) != 1 {
return errors.New("unexpected Links len")
}

c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}
l0 := link.Objects[0].Links[0]

var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
c, err := cid.Decode(l0.Hash)
if err != nil {
return err
}

select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
}
var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
}()

return out, nil
select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/kubo/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, s

// check if MFS pin exists (across all possible states) and inspect its CID
pinStatuses := []pinclient.Status{pinclient.StatusQueued, pinclient.StatusPinning, pinclient.StatusPinned, pinclient.StatusFailed}
lsPinCh, lsErrCh := c.Ls(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
lsPinCh, lsErrCh := c.GoLs(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
existingRequestID := "" // is there any pre-existing MFS pin with pinName (for any CID)?
pinning := false // is CID for current MFS already being pinned?
pinTime := time.Now().UTC()
Expand Down
23 changes: 14 additions & 9 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -133,23 +134,24 @@ The JSON output contains type information.
}
}

lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

for i, fpath := range paths {
pth, err := cmdutils.PathOrCidPath(fpath)
if err != nil {
return err
}

results, err := api.Unixfs().Ls(req.Context, pth,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
if err != nil {
return err
}
results := make(chan iface.DirEntry)
lsErr := make(chan error, 1)
go func() {
lsErr <- api.Unixfs().Ls(lsCtx, pth, results,
options.Unixfs.ResolveChildren(resolveSize || resolveType))
}()

processLink, dirDone = processDir()
for link := range results {
if link.Err != nil {
return link.Err
}
var ftype unixfs_pb.Data_DataType
switch link.Type {
case iface.TFile:
Expand All @@ -170,10 +172,13 @@ The JSON output contains type information.
Mode: link.Mode,
ModTime: link.ModTime,
}
if err := processLink(paths[i], lsLink); err != nil {
if err = processLink(paths[i], lsLink); err != nil {
return err
}
}
if err = <-lsErr; err != nil {
return err
}
dirDone(i)
}
return done()
Expand Down
18 changes: 9 additions & 9 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,16 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
panic("unhandled pin type")
}

pins, err := api.Pin().Ls(req.Context, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
if err != nil {
return err
}
pins := make(chan coreiface.Pin)
lsErr := make(chan error, 1)
lsCtx, cancel := context.WithCancel(req.Context)
defer cancel()

go func() {
lsErr <- api.Pin().Ls(lsCtx, pins, opt, options.Pin.Ls.Detailed(detailed), options.Pin.Ls.Name(name))
}()

for p := range pins {
if err := p.Err(); err != nil {
return err
}
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Expand All @@ -577,8 +578,7 @@ func pinLsAll(req *cmds.Request, typeStr string, detailed bool, name string, api
return err
}
}

return nil
return <-lsErr
}

const (
Expand Down
Loading

0 comments on commit 224d6a3

Please sign in to comment.