Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add experimental optimistic provide #9753

Merged
merged 2 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions config/experiments.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package config

type Experiments struct {
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
guseggert marked this conversation as resolved.
Show resolved Hide resolved
}
21 changes: 14 additions & 7 deletions core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return out, err
}

routingOptArgs := RoutingOptionArgs{
Ctx: ctx,
Datastore: params.Repo.Datastore(),
Validator: params.Validator,
BootstrapPeers: bootstrappers,
OptimisticProvide: cfg.Experimental.OptimisticProvide,
OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize,
}
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(
ctx, h,
params.Repo.Datastore(),
params.Validator,
bootstrappers...,
)
args := routingOptArgs
args.Host = h
r, err := params.RoutingOption(args)
out.Routing = r
return r, err
}))
Expand All @@ -69,10 +74,12 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return P2PHostOut{}, err
}

routingOptArgs.Host = out.Host

// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator, bootstrappers...)
r, err := params.RoutingOption(routingOptArgs)
if err != nil {
return P2PHostOut{}, err
}
Expand Down
108 changes: 39 additions & 69 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import (
routing "github.com/libp2p/go-libp2p/core/routing"
)

type RoutingOption func(
context.Context,
host.Host,
datastore.Batching,
record.Validator,
...peer.AddrInfo,
) (routing.Routing, error)
type RoutingOptionArgs struct {
Ctx context.Context
Host host.Host
Datastore datastore.Batching
Validator record.Validator
BootstrapPeers []peer.AddrInfo
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

type RoutingOption func(args RoutingOptionArgs) (routing.Routing, error)

// Default HTTP routers used in parallel to DHT when Routing.Type = "auto"
var defaultHTTPRouters = []string{
Expand All @@ -40,25 +44,13 @@ func init() {
}

// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
var routers []*routinghelpers.ParallelRouter

dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...)
dhtRouting, err := routingOpt(args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,54 +89,38 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
}

// constructDHTRouting is used when Routing.Type = "dht"
func constructDHTRouting(mode dht.ModeOpt) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructDHTRouting(mode dht.ModeOpt) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
dhtOpts := []dht.Option{
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(args.Datastore),
dht.Validator(args.Validator),
}
if args.OptimisticProvide {
dhtOpts = append(dhtOpts, dht.EnableOptimisticProvide())
}
if args.OptimisticProvideJobsPoolSize != 0 {
dhtOpts = append(dhtOpts, dht.OptimisticProvideJobsPoolSize(args.OptimisticProvideJobsPoolSize))
}
return dual.New(
ctx, host,
dual.DHTOption(
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(dstore),
dht.Validator(validator)),
dual.WanDHTOption(dht.BootstrapPeers(bootstrapPeers...)),
args.Ctx, args.Host,
dual.DHTOption(dhtOpts...),
dual.WanDHTOption(dht.BootstrapPeers(args.BootstrapPeers...)),
)
}
}

// ConstructDelegatedRouting is used when Routing.Type = "custom"
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
return irouting.Parse(routers, methods,
&irouting.ExtraDHTParams{
BootstrapPeers: bootstrapPeers,
Host: host,
Validator: validator,
Datastore: dstore,
Context: ctx,
BootstrapPeers: args.BootstrapPeers,
Host: args.Host,
Validator: args.Validator,
Datastore: args.Datastore,
Context: args.Ctx,
},
&irouting.ExtraHTTPParams{
PeerID: peerID,
Expand All @@ -154,13 +130,7 @@ func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, p
}
}

func constructNilRouting(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructNilRouting(_ RoutingOptionArgs) (routing.Routing, error) {
return routinghelpers.Null{}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.22.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
Expand Down Expand Up @@ -188,6 +188,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions docs/examples/kubo-as-a-library/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
Expand Down Expand Up @@ -1191,6 +1191,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
74 changes: 74 additions & 0 deletions docs/experimental-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ the above issue.
- [Graphsync](#graphsync)
- [Noise](#noise)
- [Accelerated DHT Client](#accelerated-dht-client)
- [Optimistic Provide](#optimistic-provide)

---

Expand Down Expand Up @@ -597,3 +598,76 @@ ipfs config --json Experimental.AcceleratedDHTClient true
- [ ] Needs more people to use and report on how well it works
- [ ] Should be usable for queries (even if slower/less efficient) shortly after startup
- [ ] Should be usable with non-WAN DHTs

## Optimistic Provide

### In Version

0.20.0

### State

Experimental, disabled by default.

When the DHT client tries to store a provider in the DHT, it typically searches for the 20 peers that are closest to the
target key. However, this process can be time-consuming, as the search terminates only after no closer peers are found
among the three currently (during the query) known closest ones. In cases where these closest peers are slow to respond
(which often happens if they are located at the edge of the DHT network), the query gets blocked by the slowest peer.

To address this issue, the `OptimisticProvide` feature can be enabled. This feature allows the client to estimate the
network size and determine how close a peer _likely_ needs to be to the target key to be within the 20 closest peers.
While searching for the closest peers in the DHT, the client will _optimistically_ store the provider record with peers
and abort the query completely when the set of currently known 20 closest peers are also _likely_ the actual 20 closest
ones. This heuristic approach can significantly speed up the process, resulting in a speed improvement of 2x to >10x.

When it is enabled:

- DHT provide operations should complete much faster than with it disabled
- This can be tested with commands such as `ipfs routing provide`

**Tradeoffs**

There are now the classic client, the accelerated DHT client, and optimistic provide that improve the provider process.
There are different trade-offs with all of them. The accelerated DHT client is still faster to provide large amounts
of provider records at the cost of high resource requirements. Optimistic provide doesn't have the high resource
requirements but might not choose optimal peers and is not as fast as the accelerated client, but still much faster
than the classic client.

**Caveats:**

1. Providing optimistically requires a current network size estimation. This estimation is calculated through routing
table refresh queries and is only available after the daemon has been running for some time. If there is no network
size estimation available the client will transparently fall back to the classic approach.
2. The chosen peers to store the provider records might not be the actual closest ones. Measurements showed that this
is not a problem.
3. The optimistic provide process returns already after 15 out of the 20 provider records were stored with peers. The
reasoning here is that one out of the remaining 5 peers are very likely to time out and delay the whole process. To
limit the number of in-flight async requests there is the second `OptimisticProvideJobsPoolSize` setting. Currently,
this is set to 60. This means that at most 60 parallel background requests are allowed to be in-flight. If this
limit is exceeded optimistic provide will block until all 20 provider records are written. This is still 2x faster
than the classic approach but not as fast as returning early which yields >10x speed-ups.
4. Since the in-flight background requests are likely to time out, they are not consuming many resources and the job
pool size could probably be much higher.

For more information, see:

- Project doc: https://protocollabs.notion.site/Optimistic-Provide-2c79745820fa45649d48de038516b814
- go-libp2p-kad-dht: https://github.com/libp2p/go-libp2p-kad-dht/pull/783

### Configuring
To enable:

```
ipfs config --json Experimental.OptimisticProvide true
```

If you want to change the `OptimisticProvideJobsPoolSize` setting from its default of 60:

```
ipfs config --json Experimental.OptimisticProvideJobsPoolSize 120
```

### Road to being a real feature

- [ ] Needs more people to use and report on how well it works
- [ ] Should prove at least equivalent availability of provider records as the classic approach
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0
github.com/libp2p/go-libp2p v0.26.4
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.22.0
github.com/libp2p/go-libp2p-kad-dht v0.23.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
Expand Down Expand Up @@ -222,6 +222,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
Expand Down Expand Up @@ -1282,6 +1282,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
30 changes: 30 additions & 0 deletions test/cli/dht_opt_prov_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cli

import (
"testing"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"
"github.com/stretchr/testify/assert"
)

func TestDHTOptimisticProvide(t *testing.T) {
t.Parallel()

t.Run("optimistic provide smoke test", func(t *testing.T) {
nodes := harness.NewT(t).NewNodes(2).Init()

nodes[0].UpdateConfig(func(cfg *config.Config) {
cfg.Experimental.OptimisticProvide = true
})

nodes.StartDaemons().Connect()

hash := nodes[0].IPFSAddStr(testutils.RandomStr(100))
nodes[0].IPFS("dht", "provide", hash)

res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash)
assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed())
})
}
Loading