Skip to content

Commit

Permalink
Requested changes.
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
  • Loading branch information
ajnavarro committed Sep 12, 2022
1 parent 0620c6e commit d27cb11
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 53 deletions.
58 changes: 28 additions & 30 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,35 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
)

var _ routing.Routing = &Parallel{}
var _ routing.Routing = &composableParallel{}

type ComposableParallel struct {
type composableParallel struct {
routers []*ParallelRouter
}

// NewComposableParallel creates a Router that will execute methods from provided Routers in parallel.
// On all methods, If IgnoreError flag is set, that Router will not stop the entire execution.
// On all methods, If ExecuteAfter is set, that Router will be executed after the timer.
// Router specific timeout will start counting AFTER the ExecuteAfter timer.
func NewComposableParallel(routers []*ParallelRouter) *ComposableParallel {
return &ComposableParallel{
func NewComposableParallel(routers []*ParallelRouter) *composableParallel {
return &composableParallel{
routers: routers,
}
}

// Provide will call all Routers in parallel.
func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
func (r *composableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Provide(ctx, cid, provide)
},
)
}

// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified oredr.
// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified order.
// If count is set, only that amount of elements will be returned without any specification about from what router is obtained.
// To gather providers from a set of Routers first, you can use the ExecuteAfter timer to delay some Router execution.
func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
var totalCount int64
ch, _ := getChannelOrErrorParallel(
ctx,
Expand All @@ -58,19 +58,18 @@ func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid
}

// FindPeer will execute all Routers in parallel, getting the first AddrInfo found and cancelling all other Router calls.
func (r *ComposableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
func (r *composableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return getValueOrErrorParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, error) {
return r.FindPeer(ctx, id)
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) {
addr, err := r.FindPeer(ctx, id)
return addr, addr.ID == "", err
},
func(ai peer.AddrInfo) bool {
return ai.ID == ""
})
)
}

// PutValue will execute all Routers in parallel. If a Router fails and IgnoreError flag is not set, the whole execution will fail.
// Some Puts before the failure might be successful, even if we return an error.
func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
func (r *composableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.PutValue(ctx, key, val, opts...)
Expand All @@ -79,17 +78,15 @@ func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byt
}

// GetValue will execute all Routers in parallel. The first value found will be returned, cancelling all other executions.
func (r *ComposableParallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
func (r *composableParallel) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
return getValueOrErrorParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) ([]byte, error) {
return r.GetValue(ctx, key, opts...)
},
func(ai []byte) bool {
return len(ai) == 0
func(ctx context.Context, r routing.Routing) ([]byte, bool, error) {
val, err := r.GetValue(ctx, key, opts...)
return val, len(val) == 0, err
})
}

func (r *ComposableParallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
func (r *composableParallel) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
return getChannelOrErrorParallel(
ctx,
r.routers,
Expand All @@ -100,7 +97,7 @@ func (r *ComposableParallel) SearchValue(ctx context.Context, key string, opts .
)
}

func (r *ComposableParallel) Bootstrap(ctx context.Context) error {
func (r *composableParallel) Bootstrap(ctx context.Context) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Bootstrap(ctx)
Expand All @@ -110,14 +107,14 @@ func (r *ComposableParallel) Bootstrap(ctx context.Context) error {
func getValueOrErrorParallel[T any](
ctx context.Context,
routers []*ParallelRouter,
f func(context.Context, routing.Routing) (T, error),
isEmpty func(T) bool,
f func(context.Context, routing.Routing) (T, bool, error),
) (value T, err error) {
outCh := make(chan T)
errCh := make(chan error)

// global cancel context to stop early other router's execution.
ctx, gcancel := context.WithCancel(ctx)
ctx, cancelAll := context.WithCancel(ctx)
defer cancelAll()
var wg sync.WaitGroup
for _, r := range routers {
wg.Add(1)
Expand All @@ -133,7 +130,7 @@ func getValueOrErrorParallel[T any](
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
value, err := f(ctx, r.Router)
value, empty, err := f(ctx, r.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!r.IgnoreError {
Expand All @@ -143,7 +140,7 @@ func getValueOrErrorParallel[T any](
}
return
}
if isEmpty(value) {
if empty {
return
}
select {
Expand All @@ -164,19 +161,16 @@ func getValueOrErrorParallel[T any](

select {
case out, ok := <-outCh:
gcancel()
if !ok {
return value, routing.ErrNotFound
}
return out, nil
case err, ok := <-errCh:
gcancel()
if !ok {
return value, routing.ErrNotFound
}
return value, err
case <-ctx.Done():
gcancel()
return value, ctx.Err()
}
}
Expand Down Expand Up @@ -233,6 +227,7 @@ func getChannelOrErrorParallel[T any](
outCh := make(chan T)
errCh := make(chan error)
var wg sync.WaitGroup
ctx, cancelAll := context.WithCancel(ctx)
for _, r := range routers {
wg.Add(1)
go func(r *ParallelRouter) {
Expand Down Expand Up @@ -282,15 +277,18 @@ func getChannelOrErrorParallel[T any](
wg.Wait()
close(outCh)
close(errCh)
cancelAll()
}()

select {
case err, ok := <-errCh:
cancelAll()
if !ok {
return nil, routing.ErrNotFound
}
return nil, err
case <-ctx.Done():
cancelAll()
return nil, ctx.Err()
default:
return outCh, nil
Expand Down
43 changes: 20 additions & 23 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
)

var _ routing.Routing = &ComposableSequential{}
var _ routing.Routing = &composableSequential{}

type ComposableSequential struct {
type composableSequential struct {
routers []*SequentialRouter
}

func NewComposableSequential(routers []*SequentialRouter) *ComposableSequential {
return &ComposableSequential{
func NewComposableSequential(routers []*SequentialRouter) *composableSequential {
return &composableSequential{
routers: routers,
}
}

// Provide calls Provide method per each router sequentially.
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
func (r *composableSequential) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
return executeSequential(ctx, r.routers, func(ctx context.Context, r routing.Routing) error {
return r.Provide(ctx, cid, provide)
})
Expand All @@ -36,7 +36,7 @@ func (r *ComposableSequential) Provide(ctx context.Context, cid cid.Cid, provide
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
// If count is set, the channel will return up to count results, stopping routers iteration.
func (r *ComposableSequential) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
func (r *composableSequential) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
var totalCount int64
ch, _ := getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) {
Expand All @@ -53,20 +53,18 @@ func (r *ComposableSequential) FindProvidersAsync(ctx context.Context, cid cid.C
// FindPeer calls FindPeer per each router sequentially.
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
func (r *composableSequential) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
return getValueOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, error) {
return r.FindPeer(ctx, pid)
},
func(p peer.AddrInfo) bool {
return p.ID == ""
func(ctx context.Context, r routing.Routing) (peer.AddrInfo, bool, error) {
addr, err := r.FindPeer(ctx, pid)
return addr, addr.ID == "", err
},
)
}

// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
func (r *composableSequential) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
return executeSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.PutValue(ctx, key, val, opts...)
Expand All @@ -75,18 +73,18 @@ func (r *ComposableSequential) PutValue(ctx context.Context, key string, val []b

// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
func (r *composableSequential) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
return getValueOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) ([]byte, error) {
return r.GetValue(ctx, key, opts...)
func(ctx context.Context, r routing.Routing) ([]byte, bool, error) {
val, err := r.GetValue(ctx, key, opts...)
return val, len(val) == 0, err
},
func(b []byte) bool { return len(b) == 0 },
)
}

// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
func (r *composableSequential) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
return getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan []byte, error) {
return r.SearchValue(ctx, key, opts...)
Expand All @@ -98,7 +96,7 @@ func (r *ComposableSequential) SearchValue(ctx context.Context, key string, opts

// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *ComposableSequential) Bootstrap(ctx context.Context) error {
func (r *composableSequential) Bootstrap(ctx context.Context) error {
return executeSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Bootstrap(ctx)
Expand All @@ -109,20 +107,19 @@ func (r *ComposableSequential) Bootstrap(ctx context.Context) error {
func getValueOrErrorSequential[T any](
ctx context.Context,
routers []*SequentialRouter,
f func(context.Context, routing.Routing) (T, error),
isEmpty func(T) bool,
f func(context.Context, routing.Routing) (T, bool, error),
) (value T, err error) {
for _, router := range routers {
ctx, cancel := context.WithTimeout(ctx, router.Timeout)
defer cancel()
value, err := f(ctx, router.Router)
value, empty, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
return value, err
}

if isEmpty(value) {
if empty {
continue
}

Expand Down

0 comments on commit d27cb11

Please sign in to comment.