From 276ccf07b50db01ecfc84ec82b26e2076a243739 Mon Sep 17 00:00:00 2001 From: Antonio Navarro Perez Date: Wed, 22 Jun 2022 14:07:19 +0200 Subject: [PATCH] Requested changes. Signed-off-by: Antonio Navarro Perez --- config/addresses.go | 14 ++--- config/init.go | 11 ++-- config/routing.go | 41 ++++++++++++- core/node/groups.go | 2 +- core/node/libp2p/routing.go | 68 +++++++--------------- core/node/libp2p/topicdiscovery.go | 3 +- core/node/provider.go | 18 ++---- docs/config.md | 17 +++++- go.mod | 5 +- go.sum | 8 +-- routing/delegated.go | 93 ++++++++++++++++++++++++++++++ routing/error.go | 27 +++++++++ routing/wrapper.go | 66 +++++++++++++++++++++ 13 files changed, 288 insertions(+), 85 deletions(-) create mode 100644 routing/delegated.go create mode 100644 routing/error.go create mode 100644 routing/wrapper.go diff --git a/config/addresses.go b/config/addresses.go index c80c5e2b37fd..f16563d6ad66 100644 --- a/config/addresses.go +++ b/config/addresses.go @@ -2,12 +2,10 @@ package config // Addresses stores the (string) multiaddr addresses for the node. type Addresses struct { - Swarm []string // addresses for the swarm to listen on - Announce []string // swarm addresses to announce to the network, if len > 0 replaces auto detected addresses - AppendAnnounce []string // similar to Announce but doesn't overwrite auto detected addresses, they are just appended - NoAnnounce []string // swarm addresses not to announce to the network - ExtraReframeRouters []string // allows to add other resolvers using the Reframe spec: https://github.com/ipfs/specs/blob/master/REFRAME.md - API Strings // address for the local API (RPC) - Gateway Strings // address to listen on for IPFS HTTP object gateway - + Swarm []string // addresses for the swarm to listen on + Announce []string // swarm addresses to announce to the network, if len > 0 replaces auto detected addresses + AppendAnnounce []string // similar to Announce but doesn't overwrite auto detected addresses, they are just appended + NoAnnounce []string // swarm addresses not to announce to the network + API Strings // address for the local API (RPC) + Gateway Strings // address to listen on for IPFS HTTP object gateway } diff --git a/config/init.go b/config/init.go index 9e5ce4259a26..8e54eaa5866a 100644 --- a/config/init.go +++ b/config/init.go @@ -121,12 +121,11 @@ func addressesConfig() Addresses { "/ip4/0.0.0.0/udp/4001/quic", "/ip6/::/udp/4001/quic", }, - Announce: []string{}, - AppendAnnounce: []string{}, - NoAnnounce: []string{}, - ExtraReframeRouters: []string{}, - API: Strings{"/ip4/127.0.0.1/tcp/5001"}, - Gateway: Strings{"/ip4/127.0.0.1/tcp/8080"}, + Announce: []string{}, + AppendAnnounce: []string{}, + NoAnnounce: []string{}, + API: Strings{"/ip4/127.0.0.1/tcp/5001"}, + Gateway: Strings{"/ip4/127.0.0.1/tcp/8080"}, } } diff --git a/config/routing.go b/config/routing.go index c6157ec9637d..b99f27e8f7c5 100644 --- a/config/routing.go +++ b/config/routing.go @@ -5,5 +5,44 @@ type Routing struct { // Type sets default daemon routing mode. // // Can be one of "dht", "dhtclient", "dhtserver", "none", or unset. - Type string + Type string `json:",omitempty"` + + Routers map[string]Router +} + +type Router struct { + + // Type can be one of "dht", "dhtclient", "dhtserver", "reframe". + // Reframe type allows to add other resolvers using the Reframe spec: + // https://github.com/ipfs/specs/blob/master/REFRAME.md + Type string + Enabled bool + + // Methods that we want to use from this provider. + // Leave it empty to use all the supported and available ones. + // Actual supported methods: "FindProviders", "GetIPNS", "PutIPNS" + Methods []string + + // Parameters are extra configuration that this router might need. + // A common one for reframe endpoints is "Address". + Parameters map[string]string } + +// Type is the routing type. +// Depending of the type we need to instantiate different Routing implementations. +type RouterType string + +const ( + RouterTypeReframe RouterType = "reframe" + RouterTypeDHT RouterType = "dht" +) + +type RouterParam string + +const ( + // RouterParamAddress is the URL where the routing implementation will point to get the information. + // Usually used for reframe Routers. + RouterParamAddress RouterParam = "address" + + RouterParamPriority RouterParam = "priority" +) diff --git a/core/node/groups.go b/core/node/groups.go index b5d964318099..433ae1dd8678 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -167,7 +167,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(libp2p.Routing), fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)), - fx.Provide(libp2p.DelegatedRouting(cfg.Addresses.ExtraReframeRouters)), + fx.Provide(libp2p.DelegatedRouting(cfg.Routing.Routers)), maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index 195184fe6e18..6242602126bd 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -7,11 +7,9 @@ import ( "sort" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs/core/node/helpers" + irouting "github.com/ipfs/go-ipfs/routing" - drc "github.com/ipfs/go-delegated-routing/client" - drp "github.com/ipfs/go-delegated-routing/gen/proto" config "github.com/ipfs/go-ipfs/config" "github.com/ipfs/go-ipfs/repo" "github.com/libp2p/go-libp2p-core/host" @@ -29,8 +27,6 @@ import ( "go.uber.org/fx" ) -type BaseIpfsRouting routing.Routing - type Router struct { routing.Routing @@ -60,7 +56,6 @@ type processInitialRoutingOut struct { Router Router `group:"routers"` DHT *ddht.DHT DHTClient routing.Routing `name:"dhtc"` - BaseRT BaseIpfsRouting } type AddrInfoChan chan peer.AddrInfo @@ -114,7 +109,6 @@ func BaseRouting(experimentalDHTClient bool) interface{} { }, DHT: dr, DHTClient: expClient, - BaseRT: expClient, }, nil } @@ -125,7 +119,6 @@ func BaseRouting(experimentalDHTClient bool) interface{} { }, DHT: dr, DHTClient: dr, - BaseRT: in.Router, }, nil } } @@ -136,50 +129,30 @@ type delegatedRouterOut struct { Routers []Router `group:"routers,flatten"` } -func DelegatedRouting(urls []string) interface{} { +func DelegatedRouting(routers map[string]config.Router) interface{} { return func() (delegatedRouterOut, error) { out := delegatedRouterOut{} - for _, u := range urls { - var dr drp.DelegatedRouting_Client - dr, err := drp.New_DelegatedRouting_Client(u) + for _, v := range routers { + if !v.Enabled { + continue + } + + r, err := irouting.RoutingFromConfig(v) if err != nil { return out, err } - c := drc.NewClient(dr) - crc := drc.NewContentRoutingClient(c) - out.Routers = append(out.Routers, Router{Routing: &routingWrapper{ - Client: c, - ContentRoutingClient: crc, - }}) + out.Routers = append(out.Routers, Router{ + Routing: r, + Priority: irouting.GetPriority(v.Parameters), + }) } return out, nil } } -var _ routing.Routing = &routingWrapper{} - -// routingWrapper is a wrapper needed to construct the routing.Routing interface from -// delegated-routing library. -type routingWrapper struct { - *drc.Client - *drc.ContentRoutingClient -} - -func (c *routingWrapper) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo { - return c.ContentRoutingClient.FindProvidersAsync(ctx, cid, count) -} - -func (c *routingWrapper) Bootstrap(ctx context.Context) error { - return nil -} - -func (c *routingWrapper) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { - return peer.AddrInfo{}, routing.ErrNotSupported -} - type p2pOnlineRoutingIn struct { fx.In @@ -187,7 +160,7 @@ type p2pOnlineRoutingIn struct { Validator record.Validator } -func Routing(in p2pOnlineRoutingIn) routing.Routing { +func Routing(in p2pOnlineRoutingIn) irouting.Tiered { routers := in.Routers sort.SliceStable(routers, func(i, j int) bool { @@ -199,19 +172,20 @@ func Routing(in p2pOnlineRoutingIn) routing.Routing { irouters[i] = v.Routing } - return routinghelpers.Tiered{ - Routers: irouters, - Validator: in.Validator, + return irouting.Tiered{ + Tiered: routinghelpers.Tiered{ + Routers: irouters, + Validator: in.Validator, + }, } } type p2pPSRoutingIn struct { fx.In - BaseIpfsRouting BaseIpfsRouting - Validator record.Validator - Host host.Host - PubSub *pubsub.PubSub `optional:"true"` + Validator record.Validator + Host host.Host + PubSub *pubsub.PubSub `optional:"true"` } func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) { diff --git a/core/node/libp2p/topicdiscovery.go b/core/node/libp2p/topicdiscovery.go index fd2cbe006371..9b95d78b5960 100644 --- a/core/node/libp2p/topicdiscovery.go +++ b/core/node/libp2p/topicdiscovery.go @@ -9,11 +9,12 @@ import ( disc "github.com/libp2p/go-libp2p-discovery" "github.com/ipfs/go-ipfs/core/node/helpers" + irouting "github.com/ipfs/go-ipfs/routing" "go.uber.org/fx" ) func TopicDiscovery() interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr irouting.Tiered) (service discovery.Discovery, err error) { baseDisc := disc.NewRoutingDiscovery(cr) minBackoff, maxBackoff := time.Second*60, time.Hour rng := rand.New(rand.NewSource(rand.Int63())) diff --git a/core/node/provider.go b/core/node/provider.go index 3dd7f9b728ab..54197ae77ec2 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -6,18 +6,17 @@ import ( "time" "github.com/ipfs/go-fetcher" - "github.com/ipfs/go-ipfs-pinner" - "github.com/ipfs/go-ipfs-provider" + pin "github.com/ipfs/go-ipfs-pinner" + provider "github.com/ipfs/go-ipfs-provider" "github.com/ipfs/go-ipfs-provider/batched" q "github.com/ipfs/go-ipfs-provider/queue" "github.com/ipfs/go-ipfs-provider/simple" "github.com/libp2p/go-libp2p-core/routing" - "github.com/multiformats/go-multihash" "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/repo" + irouting "github.com/ipfs/go-ipfs/routing" ) const kReprovideFrequency = time.Hour * 12 @@ -62,16 +61,11 @@ func SimpleProviderSys(isOnline bool) interface{} { } } -type provideMany interface { - ProvideMany(ctx context.Context, keys []multihash.Multihash) error - Ready() bool -} - // BatchedProviderSys creates new provider system func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} { - return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { - r, ok := (cr).(provideMany) - if !ok { + return func(lc fx.Lifecycle, cr irouting.Tiered, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { + r := cr.ProviderManyWrapper() + if r == nil { return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany") } diff --git a/docs/config.md b/docs/config.md index 79d308c926a0..7f36bf5ce8bb 100644 --- a/docs/config.md +++ b/docs/config.md @@ -102,19 +102,22 @@ config file at runtime. - [`Reprovider.Interval`](#reproviderinterval) - [`Reprovider.Strategy`](#reproviderstrategy) - [`Routing`](#routing) + - [`Routing.Routers`](#routingrouters) - [`Routing.Type`](#routingtype) - [`Swarm`](#swarm) - [`Swarm.AddrFilters`](#swarmaddrfilters) - [`Swarm.DisableBandwidthMetrics`](#swarmdisablebandwidthmetrics) - [`Swarm.DisableNatPortMap`](#swarmdisablenatportmap) - [`Swarm.EnableHolePunching`](#swarmenableholepunching) + - [`Swarm.EnableAutoRelay`](#swarmenableautorelay) - [`Swarm.RelayClient`](#swarmrelayclient) - [`Swarm.RelayClient.Enabled`](#swarmrelayclientenabled) - [`Swarm.RelayClient.StaticRelays`](#swarmrelayclientstaticrelays) - [`Swarm.RelayService`](#swarmrelayservice) - [`Swarm.RelayService.Enabled`](#swarmrelayserviceenabled) - - [`Swarm.RelayService.ConnectionDurationLimit`](#swarmrelayserviceconnectiondurationlimit) - - [`Swarm.RelayService.ConnectionDataLimit`](#swarmrelayserviceconnectiondatalimit) + - [`Swarm.RelayService.Limit`](#swarmrelayservicelimit) + - [`Swarm.RelayService.ConnectionDurationLimit`](#swarmrelayserviceconnectiondurationlimit) + - [`Swarm.RelayService.ConnectionDataLimit`](#swarmrelayserviceconnectiondatalimit) - [`Swarm.RelayService.ReservationTTL`](#swarmrelayservicereservationttl) - [`Swarm.RelayService.MaxReservations`](#swarmrelayservicemaxreservations) - [`Swarm.RelayService.MaxCircuits`](#swarmrelayservicemaxcircuits) @@ -122,6 +125,8 @@ config file at runtime. - [`Swarm.RelayService.MaxReservationsPerPeer`](#swarmrelayservicemaxreservationsperpeer) - [`Swarm.RelayService.MaxReservationsPerIP`](#swarmrelayservicemaxreservationsperip) - [`Swarm.RelayService.MaxReservationsPerASN`](#swarmrelayservicemaxreservationsperasn) + - [`Swarm.EnableRelayHop`](#swarmenablerelayhop) + - [`Swarm.DisableRelay`](#swarmdisablerelay) - [`Swarm.EnableAutoNATService`](#swarmenableautonatservice) - [`Swarm.ConnMgr`](#swarmconnmgr) - [`Swarm.ConnMgr.Type`](#swarmconnmgrtype) @@ -1289,6 +1294,14 @@ Type: `string` (or unset for the default, which is "all") Contains options for content, peer, and IPNS routing mechanisms. +### `Routing.Routers` + +It can contains several Router implementation, some of them using the [reframe specs](https://github.com/ipfs/specs/blob/master/REFRAME.md). + +// TODO add more documentation + +Type: `object[string->object]` + ### `Routing.Type` Content routing mode. Can be overridden with daemon `--routing` flag. diff --git a/go.mod b/go.mod index bef04070b630..22d094e9ee30 100644 --- a/go.mod +++ b/go.mod @@ -127,11 +127,10 @@ require ( require ( github.com/benbjohnson/clock v1.3.0 + github.com/ipfs/go-delegated-routing v0.2.2 github.com/ipfs/go-log/v2 v2.5.1 ) -require github.com/ipld/edelweiss v0.1.1-0.20220523211122-2134034b3e81 // indirect - require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/Kubuxu/go-os-helper v0.0.1 // indirect @@ -175,11 +174,11 @@ require ( github.com/huin/goupnp v1.0.3 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.0.0 // indirect - github.com/ipfs/go-delegated-routing v0.2.1-0.20220524163630-968b11329985 github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect github.com/ipfs/go-peertaskqueue v0.7.1 // indirect + github.com/ipld/edelweiss v0.1.2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect diff --git a/go.sum b/go.sum index 16286ff62a60..c9bc0d79db4f 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.5.1 h1:WkRhLuISI+XPD0uk3OskB0fYFSyqK8Ob5ZYew9Qa1nQ= github.com/ipfs/go-datastore v0.5.1/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= -github.com/ipfs/go-delegated-routing v0.2.1-0.20220524163630-968b11329985 h1:Zd6mW1YEd5byQPw5qhJxYUSJeDhSJBG2q5jQg9N5ADg= -github.com/ipfs/go-delegated-routing v0.2.1-0.20220524163630-968b11329985/go.mod h1:eVvUPhPqkJBHDvekHy+g3ixUck11Vi5qPuOpZ5u/XE0= +github.com/ipfs/go-delegated-routing v0.2.2 h1:4fM5i/XFDY1VLXvkP/bvrV/4DgOxdymTm7U9V2Nf9aw= +github.com/ipfs/go-delegated-routing v0.2.2/go.mod h1:T8wrRhlXBHLPUR3bZQgArHPfdi7nBfOsZ1m5fr9tAp4= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= @@ -650,8 +650,8 @@ github.com/ipfs/interface-go-ipfs-core v0.7.0 h1:7tb+2upz8oCcjIyjo1atdMk+P+u7wPm github.com/ipfs/interface-go-ipfs-core v0.7.0/go.mod h1:lF27E/nnSPbylPqKVXGZghal2hzifs3MmjyiEjnc9FY= github.com/ipfs/tar-utils v0.0.2 h1:UNgHB4x/PPzbMkmJi+7EqC9LNMPDztOVSnx1HAqSNg4= github.com/ipfs/tar-utils v0.0.2/go.mod h1:4qlnRWgTVljIMhSG2SqRYn66NT+3wrv/kZt9V+eqxDM= -github.com/ipld/edelweiss v0.1.1-0.20220523211122-2134034b3e81 h1:cu+XHovL7JOm7Fs6YL7ht3zK7QIMK617ZBptJqjz2mo= -github.com/ipld/edelweiss v0.1.1-0.20220523211122-2134034b3e81/go.mod h1:14NnBVHgrPO8cqDnKg7vc69LGI0aCAcax6mj21+99ec= +github.com/ipld/edelweiss v0.1.2 h1:dpcC0V+O4tLgIpLG5E4Lqbpp1N1ytnVnvcHYd1lHfN0= +github.com/ipld/edelweiss v0.1.2/go.mod h1:14NnBVHgrPO8cqDnKg7vc69LGI0aCAcax6mj21+99ec= github.com/ipld/go-car v0.3.2 h1:V9wt/80FNfbMRWSD98W5br6fyjUAyVgI2lDOTZX16Lg= github.com/ipld/go-car v0.3.2/go.mod h1:WEjynkVt04dr0GwJhry0KlaTeSDEiEYyMPOxDBQ17KE= github.com/ipld/go-car/v2 v2.1.1 h1:saaKz4nC0AdfCGHLYKeXLGn8ivoPC54fyS55uyOLKwA= diff --git a/routing/delegated.go b/routing/delegated.go new file mode 100644 index 000000000000..6e06cb60c4d6 --- /dev/null +++ b/routing/delegated.go @@ -0,0 +1,93 @@ +package routing + +import ( + "strconv" + + drc "github.com/ipfs/go-delegated-routing/client" + drp "github.com/ipfs/go-delegated-routing/gen/proto" + "github.com/ipfs/go-ipfs/config" + "github.com/libp2p/go-libp2p-core/routing" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" +) + +// Tiered is a routing Tiered implementation providing some extra methods to fill +// some special use cases when initializing the client. +type Tiered struct { + routinghelpers.Tiered +} + +// ProviderManyWrapper returns a ProviderMany implementation including all Routers that +// implements ProviderMany +func (ds Tiered) ProviderManyWrapper() ProvideMany { + var pms []ProvideMany + for _, r := range ds.Tiered.Routers { + pm, ok := r.(ProvideMany) + if !ok { + continue + } + pms = append(pms, pm) + } + + if len(pms) == 0 { + return nil + } + + return &ProvideManyWrapper{pms: pms} +} + +const defaultPriority = 100000 + +// GetPriority extract priority from config params. +// Small numbers represent more important routers. +func GetPriority(params map[string]string) int { + param := params[string(config.RouterParamPriority)] + if param == "" { + return defaultPriority + } + + p, err := strconv.Atoi(param) + if err != nil { + return defaultPriority + } + + return p +} + +// RoutingFromConfig creates a Routing instance from the specified configuration. +func RoutingFromConfig(c config.Router) (routing.Routing, error) { + + switch { + case c.Type == string(config.RouterTypeReframe): + return reframeRoutingFromConfig(c) + case c.Type == string(config.RouterTypeDHT): + return dhtRoutingFromConfig(c) + default: + return nil, &RouterTypeNotFoundError{c.Type} + } +} + +func dhtRoutingFromConfig(conf config.Router) (routing.Routing, error) { + panic("implement this") +} + +func reframeRoutingFromConfig(conf config.Router) (routing.Routing, error) { + var dr drp.DelegatedRouting_Client + + param := string(config.RouterParamAddress) + addr, ok := conf.Parameters[param] + if !ok { + return nil, NewParamNeededErr(param, conf.Type) + } + + dr, err := drp.New_DelegatedRouting_Client(addr) + if err != nil { + return nil, err + } + + c := drc.NewClient(dr) + crc := drc.NewContentRoutingClient(c) + return &reframeRoutingWrapper{ + Client: c, + ContentRoutingClient: crc, + }, nil +} diff --git a/routing/error.go b/routing/error.go new file mode 100644 index 000000000000..15016a0e4e6c --- /dev/null +++ b/routing/error.go @@ -0,0 +1,27 @@ +package routing + +import "fmt" + +type ParamNeededError struct { + ParamName string + RouterType string +} + +func NewParamNeededErr(param, routing string) error { + return &ParamNeededError{ + ParamName: param, + RouterType: routing, + } +} + +func (e *ParamNeededError) Error() string { + return fmt.Sprintf("configuration param '%v' is needed for %v delegated routing types", e.ParamName, e.RouterType) +} + +type RouterTypeNotFoundError struct { + RouterType string +} + +func (e *RouterTypeNotFoundError) Error() string { + return fmt.Sprintf("router type %v is not supported", e.RouterType) +} diff --git a/routing/wrapper.go b/routing/wrapper.go new file mode 100644 index 000000000000..b3dea9b8c5ed --- /dev/null +++ b/routing/wrapper.go @@ -0,0 +1,66 @@ +package routing + +import ( + "context" + + "github.com/ipfs/go-cid" + drc "github.com/ipfs/go-delegated-routing/client" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" + "golang.org/x/sync/errgroup" +) + +var _ routing.Routing = &reframeRoutingWrapper{} + +// reframeRoutingWrapper is a wrapper needed to construct the routing.Routing interface from +// delegated-routing library. +type reframeRoutingWrapper struct { + *drc.Client + *drc.ContentRoutingClient +} + +func (c *reframeRoutingWrapper) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo { + return c.ContentRoutingClient.FindProvidersAsync(ctx, cid, count) +} + +func (c *reframeRoutingWrapper) Bootstrap(ctx context.Context) error { + return nil +} + +func (c *reframeRoutingWrapper) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + return peer.AddrInfo{}, routing.ErrNotSupported +} + +type ProvideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error + Ready() bool +} + +var _ ProvideMany = &ProvideManyWrapper{} + +type ProvideManyWrapper struct { + pms []ProvideMany +} + +func (pmw *ProvideManyWrapper) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { + var g errgroup.Group + for _, pm := range pmw.pms { + pm := pm + g.Go(func() error { + return pm.ProvideMany(ctx, keys) + }) + } + + return g.Wait() +} + +// Ready is ready if all providers are ready +func (pmw *ProvideManyWrapper) Ready() bool { + out := true + for _, pm := range pmw.pms { + out = out && pm.Ready() + } + + return out +}