Skip to content

Commit

Permalink
refactor(remote/pinning): Ls to take results channel instead of ret…
Browse files Browse the repository at this point in the history
…urning one (#738)
  • Loading branch information
gammazero authored Dec 3, 2024
1 parent 9069a29 commit 3a3e8af
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes:
- `routing/http/client`: creating delegated routing client with `New` now defaults to querying delegated routing server with `DefaultProtocolFilter` ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#689](https://github.com/ipfs/boxo/pull/689)
- `bitswap/client`: Wait at lease one broadcast interval before resending wants to a peer. Check for peers to rebroadcast to more often than one broadcast interval.
- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents. [#710](https://github.com/ipfs/boxo/pull/710)
- `pinning/remote/client`: Refactor remote pinning `Ls` to take results channel instead of returning one. The previous `Ls` behavior is implemented by the GoLs function, which creates the channels, starts the goroutine that calls Ls, and returns the channels to the caller [#738](https://github.com/ipfs/boxo/pull/738)

### Removed

Expand Down
130 changes: 72 additions & 58 deletions pinning/remote/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,77 +137,90 @@ func (pinLsOpts) LsMeta(meta map[string]string) LsOption {

type pinResults = openapi.PinResults

func (c *Client) Ls(ctx context.Context, opts ...LsOption) (chan PinStatusGetter, chan error) {
res := make(chan PinStatusGetter, 1)
errs := make(chan error, 1)

// Ls writes pin statuses to the PinStatusGetter channel. The channel is
// closed when there are no more pins. If an error occurs or ctx is canceled,
// then the channel is closed and an error is returned.
//
// Example:
//
// res := make(chan PinStatusGetter, 1)
// lsErr := make(chan error, 1)
// go func() {
// lsErr <- c.Ls(ctx, res, opts...)
// }()
// for r := range res {
// processPin(r)
// }
// return <-lsErr
func (c *Client) Ls(ctx context.Context, res chan<- PinStatusGetter, opts ...LsOption) (err error) {
settings := new(lsSettings)
for _, o := range opts {
if err := o(settings); err != nil {
if err = o(settings); err != nil {
close(res)
errs <- err
close(errs)
return res, errs
return err
}
}

go func() {
defer func() {
if r := recover(); r != nil {
var err error
switch x := r.(type) {
case string:
err = fmt.Errorf("unexpected error while listing remote pins: %s", x)
case error:
err = fmt.Errorf("unexpected error while listing remote pins: %w", x)
default:
err = errors.New("unknown panic while listing remote pins")
}
errs <- err
}
close(errs)
close(res)
}()

for {
pinRes, err := c.lsInternal(ctx, settings)
if err != nil {
errs <- err
return
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = fmt.Errorf("unexpected error while listing remote pins: %s", x)
case error:
err = fmt.Errorf("unexpected error while listing remote pins: %w", x)
default:
err = errors.New("unknown panic while listing remote pins")
}
}
close(res)
}()

results := pinRes.GetResults()
for _, r := range results {
select {
case res <- &pinStatusObject{r}:
case <-ctx.Done():
errs <- ctx.Err()
return
}
}
for {
pinRes, err := c.lsInternal(ctx, settings)
if err != nil {
return err
}

batchSize := len(results)
if int(pinRes.Count) == batchSize {
// no more batches
return
results := pinRes.GetResults()
for _, r := range results {
select {
case res <- &pinStatusObject{r}:
case <-ctx.Done():
return ctx.Err()
}
}

// Better DX/UX for cases like https://github.com/application-research/estuary/issues/124
if batchSize == 0 && int(pinRes.Count) != 0 {
errs <- fmt.Errorf("invalid pinning service response: PinResults.count=%d but no PinResults.results", int(pinRes.Count))
return
}
batchSize := len(results)
if int(pinRes.Count) == batchSize {
// no more batches
return nil
}

oldestResult := results[batchSize-1]
settings.before = &oldestResult.Created
// Better DX/UX for cases like https://github.com/application-research/estuary/issues/124
if batchSize == 0 && int(pinRes.Count) != 0 {
return fmt.Errorf("invalid pinning service response: PinResults.count=%d but no PinResults.results", int(pinRes.Count))
}

oldestResult := results[batchSize-1]
settings.before = &oldestResult.Created
}
}

// GoLs creates the results and error channels, starts the goroutine that calls
// Ls, and returns the channels to the caller.
func (c *Client) GoLs(ctx context.Context, opts ...LsOption) (<-chan PinStatusGetter, <-chan error) {
res := make(chan PinStatusGetter)
errs := make(chan error, 1)

go func() {
errs <- c.Ls(ctx, res, opts...)
}()

return res, errs
}

func (c *Client) LsSync(ctx context.Context, opts ...LsOption) ([]PinStatusGetter, error) {
resCh, errCh := c.Ls(ctx, opts...)
resCh, errCh := c.GoLs(ctx, opts...)

var res []PinStatusGetter
for r := range resCh {
Expand All @@ -219,8 +232,6 @@ func (c *Client) LsSync(ctx context.Context, opts ...LsOption) ([]PinStatusGette

// Manual version of Ls that returns a single batch of results and int with total count
func (c *Client) LsBatchSync(ctx context.Context, opts ...LsOption) ([]PinStatusGetter, int, error) {
var res []PinStatusGetter

settings := new(lsSettings)
for _, o := range opts {
if err := o(settings); err != nil {
Expand All @@ -233,9 +244,13 @@ func (c *Client) LsBatchSync(ctx context.Context, opts ...LsOption) ([]PinStatus
return nil, 0, err
}

var res []PinStatusGetter
results := pinRes.GetResults()
for _, r := range results {
res = append(res, &pinStatusObject{r})
if len(results) != 0 {
res = make([]PinStatusGetter, len(results))
for i, r := range results {
res[i] = &pinStatusObject{r}
}
}

return res, int(pinRes.Count), nil
Expand Down Expand Up @@ -274,8 +289,7 @@ func (c *Client) lsInternal(ctx context.Context, settings *lsSettings) (pinResul
// TODO: Ignoring HTTP Response OK?
results, httpresp, err := getter.Execute()
if err != nil {
err := httperr(httpresp, err)
return pinResults{}, err
return pinResults{}, httperr(httpresp, err)
}

return results, nil
Expand Down

0 comments on commit 3a3e8af

Please sign in to comment.