Skip to content

Commit

Permalink
Requested changes.
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
  • Loading branch information
ajnavarro committed Jun 22, 2022
1 parent 35ba77c commit b0da79c
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 95 deletions.
14 changes: 6 additions & 8 deletions config/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package config

// Addresses stores the (string) multiaddr addresses for the node.
type Addresses struct {
Swarm []string // addresses for the swarm to listen on
Announce []string // swarm addresses to announce to the network, if len > 0 replaces auto detected addresses
AppendAnnounce []string // similar to Announce but doesn't overwrite auto detected addresses, they are just appended
NoAnnounce []string // swarm addresses not to announce to the network
ExtraReframeRouters []string // allows to add other resolvers using the Reframe spec: https://github.com/ipfs/specs/blob/master/REFRAME.md
API Strings // address for the local API (RPC)
Gateway Strings // address to listen on for IPFS HTTP object gateway

Swarm []string // addresses for the swarm to listen on
Announce []string // swarm addresses to announce to the network, if len > 0 replaces auto detected addresses
AppendAnnounce []string // similar to Announce but doesn't overwrite auto detected addresses, they are just appended
NoAnnounce []string // swarm addresses not to announce to the network
API Strings // address for the local API (RPC)
Gateway Strings // address to listen on for IPFS HTTP object gateway
}
11 changes: 5 additions & 6 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ func addressesConfig() Addresses {
"/ip4/0.0.0.0/udp/4001/quic",
"/ip6/::/udp/4001/quic",
},
Announce: []string{},
AppendAnnounce: []string{},
NoAnnounce: []string{},
ExtraReframeRouters: []string{},
API: Strings{"/ip4/127.0.0.1/tcp/5001"},
Gateway: Strings{"/ip4/127.0.0.1/tcp/8080"},
Announce: []string{},
AppendAnnounce: []string{},
NoAnnounce: []string{},
API: Strings{"/ip4/127.0.0.1/tcp/5001"},
Gateway: Strings{"/ip4/127.0.0.1/tcp/8080"},
}
}

Expand Down
41 changes: 40 additions & 1 deletion config/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,44 @@ type Routing struct {
// Type sets default daemon routing mode.
//
// Can be one of "dht", "dhtclient", "dhtserver", "none", or unset.
Type string
Type string `json:",omitempty"`

Routers map[string]Router
}

type Router struct {

// Type can be one of "dht", "dhtclient", "dhtserver", "reframe".
// Reframe type allows to add other resolvers using the Reframe spec:
// https://github.com/ipfs/specs/blob/master/REFRAME.md
Type string
Enabled bool

// Methods that we want to use from this provider.
// Leave it empty to use all the supported and available ones.
// Actual supported methods: "FindProviders", "GetIPNS", "PutIPNS"
Methods []string

// Parameters are extra configuration that this router might need.
// A common one for reframe endpoints is "Address".
Parameters map[string]string
}

// Type is the routing type.
// Depending of the type we need to instantiate different Routing implementations.
type RouterType string

const (
RouterTypeReframe RouterType = "reframe"
RouterTypeDHT RouterType = "dht"
)

type RouterParam string

const (
// RouterParamAddress is the URL where the routing implementation will point to get the information.
// Usually used for reframe Routers.
RouterParamAddress RouterParam = "address"

RouterParamPriority RouterParam = "priority"
)
4 changes: 2 additions & 2 deletions core/node/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
config "github.com/ipfs/go-ipfs/config"
irouting "github.com/ipfs/go-ipfs/routing"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
Expand All @@ -25,7 +25,7 @@ const (

// OnlineExchange creates new LibP2P backed block exchange (BitSwap)
func OnlineExchange(cfg *config.Config, provide bool) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt irouting.Tiered, bs blockstore.GCBlockstore) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(host, rt)

var internalBsCfg config.InternalBitswap
Expand Down
2 changes: 1 addition & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {

fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
fx.Provide(libp2p.DelegatedRouting(cfg.Addresses.ExtraReframeRouters)),
fx.Provide(libp2p.DelegatedRouting(cfg.Routing.Routers)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down
11 changes: 6 additions & 5 deletions core/node/ipns.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"fmt"
"time"

"github.com/ipfs/go-ipfs-util"
util "github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-record"
record "github.com/libp2p/go-libp2p-record"
madns "github.com/multiformats/go-multiaddr-dns"

"github.com/ipfs/go-ipfs/repo"
irouting "github.com/ipfs/go-ipfs/routing"

"github.com/ipfs/go-namesys"
"github.com/ipfs/go-namesys/republisher"
)
Expand All @@ -28,8 +29,8 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator {
}

// Namesys creates new name system
func Namesys(cacheSize int) func(rt routing.Routing, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) {
return func(rt routing.Routing, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) {
func Namesys(cacheSize int) func(rt irouting.Tiered, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) {
return func(rt irouting.Tiered, rslv *madns.Resolver, repo repo.Repo) (namesys.NameSystem, error) {
opts := []namesys.Option{
namesys.WithDatastore(repo.Datastore()),
namesys.WithDNSResolver(rslv),
Expand Down
68 changes: 21 additions & 47 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"sort"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs/core/node/helpers"
irouting "github.com/ipfs/go-ipfs/routing"

drc "github.com/ipfs/go-delegated-routing/client"
drp "github.com/ipfs/go-delegated-routing/gen/proto"
config "github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/repo"
"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -29,8 +27,6 @@ import (
"go.uber.org/fx"
)

type BaseIpfsRouting routing.Routing

type Router struct {
routing.Routing

Expand Down Expand Up @@ -60,7 +56,6 @@ type processInitialRoutingOut struct {
Router Router `group:"routers"`
DHT *ddht.DHT
DHTClient routing.Routing `name:"dhtc"`
BaseRT BaseIpfsRouting
}

type AddrInfoChan chan peer.AddrInfo
Expand Down Expand Up @@ -114,7 +109,6 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
},
DHT: dr,
DHTClient: expClient,
BaseRT: expClient,
}, nil
}

Expand All @@ -125,7 +119,6 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
},
DHT: dr,
DHTClient: dr,
BaseRT: in.Router,
}, nil
}
}
Expand All @@ -136,58 +129,38 @@ type delegatedRouterOut struct {
Routers []Router `group:"routers,flatten"`
}

func DelegatedRouting(urls []string) interface{} {
func DelegatedRouting(routers map[string]config.Router) interface{} {
return func() (delegatedRouterOut, error) {
out := delegatedRouterOut{}

for _, u := range urls {
var dr drp.DelegatedRouting_Client
dr, err := drp.New_DelegatedRouting_Client(u)
for _, v := range routers {
if !v.Enabled {
continue
}

r, err := irouting.RoutingFromConfig(v)
if err != nil {
return out, err
}

c := drc.NewClient(dr)
crc := drc.NewContentRoutingClient(c)
out.Routers = append(out.Routers, Router{Routing: &routingWrapper{
Client: c,
ContentRoutingClient: crc,
}})
out.Routers = append(out.Routers, Router{
Routing: r,
Priority: irouting.GetPriority(v.Parameters),
})
}

return out, nil
}
}

var _ routing.Routing = &routingWrapper{}

// routingWrapper is a wrapper needed to construct the routing.Routing interface from
// delegated-routing library.
type routingWrapper struct {
*drc.Client
*drc.ContentRoutingClient
}

func (c *routingWrapper) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
return c.ContentRoutingClient.FindProvidersAsync(ctx, cid, count)
}

func (c *routingWrapper) Bootstrap(ctx context.Context) error {
return nil
}

func (c *routingWrapper) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return peer.AddrInfo{}, routing.ErrNotSupported
}

type p2pOnlineRoutingIn struct {
fx.In

Routers []Router `group:"routers"`
Validator record.Validator
}

func Routing(in p2pOnlineRoutingIn) routing.Routing {
func Routing(in p2pOnlineRoutingIn) irouting.Tiered {
routers := in.Routers

sort.SliceStable(routers, func(i, j int) bool {
Expand All @@ -199,19 +172,20 @@ func Routing(in p2pOnlineRoutingIn) routing.Routing {
irouters[i] = v.Routing
}

return routinghelpers.Tiered{
Routers: irouters,
Validator: in.Validator,
return irouting.Tiered{
Tiered: routinghelpers.Tiered{
Routers: irouters,
Validator: in.Validator,
},
}
}

type p2pPSRoutingIn struct {
fx.In

BaseIpfsRouting BaseIpfsRouting
Validator record.Validator
Host host.Host
PubSub *pubsub.PubSub `optional:"true"`
Validator record.Validator
Host host.Host
PubSub *pubsub.PubSub `optional:"true"`
}

func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
Expand Down
3 changes: 2 additions & 1 deletion core/node/libp2p/topicdiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
disc "github.com/libp2p/go-libp2p-discovery"

"github.com/ipfs/go-ipfs/core/node/helpers"
irouting "github.com/ipfs/go-ipfs/routing"
"go.uber.org/fx"
)

func TopicDiscovery() interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr irouting.Tiered) (service discovery.Discovery, err error) {
baseDisc := disc.NewRoutingDiscovery(cr)
minBackoff, maxBackoff := time.Second*60, time.Hour
rng := rand.New(rand.NewSource(rand.Int63()))
Expand Down
23 changes: 8 additions & 15 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ import (
"time"

"github.com/ipfs/go-fetcher"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-provider"
pin "github.com/ipfs/go-ipfs-pinner"
provider "github.com/ipfs/go-ipfs-provider"
"github.com/ipfs/go-ipfs-provider/batched"
q "github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
irouting "github.com/ipfs/go-ipfs/routing"
)

const kReprovideFrequency = time.Hour * 12
Expand All @@ -30,13 +28,13 @@ func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q
}

// SimpleProvider creates new record provider
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.Routing) provider.Provider {
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.Tiered) provider.Provider {
return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
}

// SimpleReprovider creates new reprovider
func SimpleReprovider(reproviderInterval time.Duration) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.Routing, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.Tiered, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
}
Expand All @@ -62,16 +60,11 @@ func SimpleProviderSys(isOnline bool) interface{} {
}
}

type provideMany interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}

// BatchedProviderSys creates new provider system
func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} {
return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
r, ok := (cr).(provideMany)
if !ok {
return func(lc fx.Lifecycle, cr irouting.Tiered, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
r := cr.ProviderManyWrapper()
if r == nil {
return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany")
}

Expand Down
Loading

0 comments on commit b0da79c

Please sign in to comment.