Skip to content

Commit

Permalink
Add ProvideMany
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
  • Loading branch information
ajnavarro committed Sep 13, 2022
1 parent d27cb11 commit dccecff
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 3 deletions.
7 changes: 7 additions & 0 deletions compconfig.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package routinghelpers

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
)

type ParallelRouter struct {
Expand All @@ -18,3 +20,8 @@ type SequentialRouter struct {
IgnoreError bool
Router routing.Routing
}

type ProvideManyRouter interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}
34 changes: 34 additions & 0 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
)

var _ routing.Routing = &composableParallel{}
var _ ProvideManyRouter = &composableParallel{}

type composableParallel struct {
routers []*ParallelRouter
Expand All @@ -38,6 +40,38 @@ func (r *composableParallel) Provide(ctx context.Context, cid cid.Cid, provide b
)
}

// ProvideMany will call all supported Routers in parallel.
func (r *composableParallel) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
return executeParallel(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
pm, ok := r.(ProvideManyRouter)
if !ok {
return nil
}
return pm.ProvideMany(ctx, keys)
},
)
}

// Ready will call all supported ProvideMany Routers in parallel.
// If some of them are not ready, this method will return false.
func (r *composableParallel) Ready() bool {
ready, _ := getValueOrErrorParallel(context.Background(), r.routers,
func(ctx context.Context, r routing.Routing) (bool, bool, error) {
pm, ok := r.(ProvideManyRouter)
if !ok {
return false, true, nil
}

ready := pm.Ready()

return ready, !ready, nil
},
)

return ready
}

// FindProvidersAsync will execute all Routers in parallel, iterating results from them in unspecified order.
// If count is set, only that amount of elements will be returned without any specification about from what router is obtained.
// To gather providers from a set of Routers first, you can use the ExecuteAfter timer to delay some Router execution.
Expand Down
40 changes: 37 additions & 3 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
)

var _ routing.Routing = &composableSequential{}
Expand All @@ -27,9 +28,42 @@ func NewComposableSequential(routers []*SequentialRouter) *composableSequential
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *composableSequential) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
return executeSequential(ctx, r.routers, func(ctx context.Context, r routing.Routing) error {
return r.Provide(ctx, cid, provide)
})
return executeSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
return r.Provide(ctx, cid, provide)
})
}

// ProvideMany will call all supported Routers sequentially.
func (r *composableSequential) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
return executeSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) error {
pm, ok := r.(ProvideManyRouter)
if !ok {
return nil
}
return pm.ProvideMany(ctx, keys)
},
)
}

// Ready will call all supported ProvideMany Routers sequentially.
// If some of them are not ready, this method will return false.
func (r *composableSequential) Ready() bool {
ready, _ := getValueOrErrorSequential(context.Background(), r.routers,
func(ctx context.Context, r routing.Routing) (bool, bool, error) {
pm, ok := r.(ProvideManyRouter)
if !ok {
return false, true, nil
}

ready := pm.Ready()

return ready, !ready, nil
},
)

return ready
}

// FindProvidersAsync calls FindProvidersAsync per each router sequentially.
Expand Down

0 comments on commit dccecff

Please sign in to comment.