From 859e648fe1b440f061fda884f87e98b29b23135f Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Sat, 2 Apr 2022 01:56:19 +0200 Subject: [PATCH] feat(cfg): opt-in Swarm.ResourceMgr This ensures we can safely test the resource manager without impacting default behavior. - Resource manager is disabled by default - Default for Swarm.ResourceMgr.Enabled is false for now - Swarm.ResourceMgr.Limits allows user to tweak limits per specific scope in a way that is persisted across restarts - 'ipfs swarm limit system' outputs human-readable json - 'ipfs swarm limit system new-limits.json' sets new runtime limits (but does not change Swarm.ResourceMgr.Limits in the config) Conventions to make libp2p devs life easier: - 'IPFS_RCMGR=1 ipfs daemon' overrides the config and enables resource manager - 'limit.json' overrides implicit defaults from libp2p (if present) --- config/swarm.go | 36 ++++ core/commands/config.go | 20 +- core/commands/swarm.go | 387 ++++++-------------------------------- core/node/groups.go | 2 +- core/node/libp2p/rcmgr.go | 356 ++++++++++++++++++++++++++++++++--- docs/config.md | 69 +++++++ 6 files changed, 505 insertions(+), 365 deletions(-) diff --git a/config/swarm.go b/config/swarm.go index ba7b2255fd5..a18a2fd5dae 100644 --- a/config/swarm.go +++ b/config/swarm.go @@ -49,6 +49,9 @@ type SwarmConfig struct { // ConnMgr configures the connection manager. ConnMgr ConnMgr + + // ResourceMgr configures the libp2p Network Resource Manager + ResourceMgr ResourceMgr } type RelayClient struct { @@ -129,3 +132,36 @@ type ConnMgr struct { HighWater int GracePeriod string } + +// ResourceMgr defines configuration options for the libp2p Network Resource Manager +// +type ResourceMgr struct { + // Enables the Network Resource Manager feature + Enabled Flag `json:",omitempty"` + + // Limits is a map of Resource Scope. + Limits map[string]ResourceMgrScopeConfig `json:",omitempty"` +} + +const ( + ResourceMgrSystemScope = "system" + ResourceMgrTransientScope = "transient" + ResourceMgrServiceScopePrefix = "svc:" + ResourceMgrProtocolScopePrefix = "proto:" + ResourceMgrPeerScopePrefix = "peer:" +) + +// libp2p Network Resource Manager config for a scope (ipfs swarm stats|limit) +type ResourceMgrScopeConfig struct { + Dynamic bool `json:",omitempty"` + // set if Dynamic is false + Memory int64 `json:",omitempty"` + // set if Dynamic is true + MemoryFraction float64 `json:",omitempty"` + MinMemory int64 `json:",omitempty"` + MaxMemory int64 `json:",omitempty"` + + Streams, StreamsInbound, StreamsOutbound int + Conns, ConnsInbound, ConnsOutbound int + FD int +} diff --git a/core/commands/config.go b/core/commands/config.go index 7a6e5abaf07..7c4ae78f12a 100644 --- a/core/commands/config.go +++ b/core/commands/config.go @@ -215,18 +215,20 @@ NOTE: For security reasons, this command will omit your private key and remote s return cmds.EmitOnce(res, &cfg) }, Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *map[string]interface{}) error { - buf, err := config.HumanOutput(out) - if err != nil { - return err - } - buf = append(buf, byte('\n')) - _, err = w.Write(buf) - return err - }), + cmds.Text: HumanJsonEncoder, }, } +var HumanJsonEncoder = cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *map[string]interface{}) error { + buf, err := config.HumanOutput(out) + if err != nil { + return err + } + buf = append(buf, byte('\n')) + _, err = w.Write(buf) + return err +}) + // Scrubs value and returns error if missing func scrubValue(m map[string]interface{}, key []string) (map[string]interface{}, error) { return scrubMapInternal(m, key, false) diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 2d2a8b47b82..b82ca519a03 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -7,25 +7,22 @@ import ( "errors" "fmt" "io" - "os" "path" "sort" - "strings" "sync" "time" + files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/go-ipfs/commands" "github.com/ipfs/go-ipfs/config" "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/fsrepo" cmds "github.com/ipfs/go-ipfs-cmds" - "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - rcmgr "github.com/libp2p/go-libp2p-resource-manager" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" mamask "github.com/whyrusleeping/multiaddr-filter" @@ -59,8 +56,8 @@ ipfs peers in the internet. "filters": swarmFiltersCmd, "peers": swarmPeersCmd, "peering": swarmPeeringCmd, - "stats": swarmStatsCmd, - "limit": swarmLimitCmd, + "stats": swarmStatsCmd, // libp2p Network Resource Manager + "limit": swarmLimitCmd, // libp2p Network Resource Manager }, } @@ -318,13 +315,15 @@ var swarmStatsCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Report resource usage for a scope.", LongDescription: `Report resource usage for a scope. - The scope can be one of the following: - - system -- reports the system aggregate resource usage. - - transient -- reports the transient resource usage. - - svc: -- reports the resource usage of a specific service. - - proto: -- reports the resource usage of a specific protocol. - - peer: -- reports the resource usage of a specific peer. - - all -- reports the resource usage for all currently active scopes. +The scope can be one of the following: +- system -- reports the system aggregate resource usage. +- transient -- reports the transient resource usage. +- svc: -- reports the resource usage of a specific service. +- proto: -- reports the resource usage of a specific protocol. +- peer: -- reports the resource usage of a specific peer. +- all -- reports the resource usage for all currently active scopes. + +The output of this command is JSON. `}, Arguments: []cmds.Argument{ cmds.StringArg("scope", true, false, "scope of the stat report"), @@ -343,7 +342,7 @@ var swarmStatsCmd = &cmds.Command{ return fmt.Errorf("must specify exactly one scope") } scope := req.Arguments[0] - result, err := NetStat(node.ResourceManager, req.Context, scope) + result, err := libp2p.NetStat(node.ResourceManager, scope) if err != nil { return err } @@ -356,6 +355,9 @@ var swarmStatsCmd = &cmds.Command{ } return cmds.EmitOnce(res, b) }, + Encoders: cmds.EncoderMap{ + cmds.Text: HumanJsonEncoder, + }, } var swarmLimitCmd = &cmds.Command{ @@ -363,20 +365,27 @@ var swarmLimitCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Get or set resource limits for a scope.", LongDescription: `Get or set resource limits for a scope. - The scope can be one of the following: - - system -- reports the system aggregate resource usage. - - transient -- reports the transient resource usage. - - svc: -- reports the resource usage of a specific service. - - proto: -- reports the resource usage of a specific protocol. - - peer: -- reports the resource usage of a specific peer. - The limit is json-formatted, with the same structure as the limits file. +The scope can be one of the following: +- system -- limits for the system aggregate resource usage. +- transient -- limits for the transient resource usage. +- svc: -- limits for the resource usage of a specific service. +- proto: -- limits for the resource usage of a specific protocol. +- peer: -- limits for the resource usage of a specific peer. + +The output of this command is JSON. + +It is possible to use this command to inspect and tweak limits at runtime: + + $ ipfs swarm limit system > limit.json + $ vi limit.json + $ ipfs swarm limit system limit.json + +Changes made via command line are discarded on node shutdown. +For permanent limits set Swarm.ResourceMgr.Limits in the $IPFS_PATH/config file. `}, Arguments: []cmds.Argument{ cmds.StringArg("scope", true, false, "scope of the limit"), - cmds.StringArg("limit", false, false, "path of the limit configuration file"), - }, - Options: []cmds.Option{ - cmds.BoolOption("set", "s", "Set the limit for a scope (instead of viewing it).").WithDefault(false), + cmds.FileArg("limit.json", false, false, "limits to be set").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) @@ -388,33 +397,29 @@ var swarmLimitCmd = &cmds.Command{ return fmt.Errorf("no resource manager available, make sure the daemon is running") } - setLimit, _ := req.Options["set"].(bool) - if setLimit { - if len(req.Arguments) != 2 { - return fmt.Errorf("must specify exactly a scope and a limit") - } + scope := req.Arguments[0] - scope := req.Arguments[0] - limitPath := req.Arguments[1] - limitStr, err := os.ReadFile(limitPath) - if err != nil { - return fmt.Errorf("error opening limit JSON file: %w", err) + // set scope limit to new values (when limit.json is passed as a second arg) + if req.Files != nil { + var newLimit config.ResourceMgrScopeConfig + it := req.Files.Entries() + if it.Next() { + file := files.FileFromEntry(it) + if file == nil { + return errors.New("expected a JSON file") + } + if err := json.NewDecoder(file).Decode(&newLimit); err != nil { + return errors.New("failed to decode JSON as ResourceMgrScopeConfig") + } + return libp2p.NetSetLimit(node.ResourceManager, scope, newLimit) } - - var limit NetLimitConfig - err = json.Unmarshal([]byte(limitStr), &limit) - if err != nil { - return fmt.Errorf("error decoding limit: %w", err) + if err := it.Err(); err != nil { + return fmt.Errorf("error opening limit JSON file: %w", err) } - - return NetSetLimit(node.ResourceManager, req.Context, scope, limit) } - if len(req.Arguments) != 1 { - return fmt.Errorf("must specify exactly one scope") - } - scope := req.Arguments[0] - result, err := NetLimit(node.ResourceManager, req.Context, scope) + // get scope limit + result, err := libp2p.NetLimit(node.ResourceManager, scope) if err != nil { return err } @@ -427,289 +432,9 @@ var swarmLimitCmd = &cmds.Command{ } return cmds.EmitOnce(res, b) }, -} - -// FIXME(BLOCKING): Decide where to move the net stat/limit logic and types. - -type NetStatOut struct { - System *network.ScopeStat `json:",omitempty"` - Transient *network.ScopeStat `json:",omitempty"` - Services map[string]network.ScopeStat `json:",omitempty"` - Protocols map[string]network.ScopeStat `json:",omitempty"` - Peers map[string]network.ScopeStat `json:",omitempty"` -} - -type NetLimitConfig struct { - Dynamic bool `json:",omitempty"` - // set if Dynamic is false - Memory int64 `json:",omitempty"` - // set if Dynamic is true - MemoryFraction float64 `json:",omitempty"` - MinMemory int64 `json:",omitempty"` - MaxMemory int64 `json:",omitempty"` - - Streams, StreamsInbound, StreamsOutbound int - Conns, ConnsInbound, ConnsOutbound int - FD int -} - -func NetStat(mgr network.ResourceManager, ctx context.Context, scope string) (NetStatOut, error) { - var err error - var result NetStatOut - switch { - case scope == "all": - rapi, ok := mgr.(rcmgr.ResourceManagerState) - if !ok { - return result, fmt.Errorf("resource manager does not support ResourceManagerState API") - } - - stat := rapi.Stat() - result.System = &stat.System - result.Transient = &stat.Transient - if len(stat.Services) > 0 { - result.Services = stat.Services - } - if len(stat.Protocols) > 0 { - result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols)) - for proto, stat := range stat.Protocols { - result.Protocols[string(proto)] = stat - } - } - if len(stat.Peers) > 0 { - result.Peers = make(map[string]network.ScopeStat, len(stat.Peers)) - for p, stat := range stat.Peers { - result.Peers[p.Pretty()] = stat - } - } - - return result, nil - - case scope == "system": - err = mgr.ViewSystem(func(s network.ResourceScope) error { - stat := s.Stat() - result.System = &stat - return nil - }) - return result, err - - case scope == "transient": - err = mgr.ViewTransient(func(s network.ResourceScope) error { - stat := s.Stat() - result.Transient = &stat - return nil - }) - return result, err - - case strings.HasPrefix(scope, "svc:"): - svc := scope[4:] - err = mgr.ViewService(svc, func(s network.ServiceScope) error { - stat := s.Stat() - result.Services = map[string]network.ScopeStat{ - svc: stat, - } - return nil - }) - return result, err - - case strings.HasPrefix(scope, "proto:"): - proto := scope[6:] - err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { - stat := s.Stat() - result.Protocols = map[string]network.ScopeStat{ - proto: stat, - } - return nil - }) - return result, err - - case strings.HasPrefix(scope, "peer:"): - p := scope[5:] - pid, err := peer.Decode(p) - if err != nil { - return result, fmt.Errorf("invalid peer ID: %s: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { - stat := s.Stat() - result.Peers = map[string]network.ScopeStat{ - p: stat, - } - return nil - }) - return result, err - - default: - return result, fmt.Errorf("invalid scope %s", scope) - } -} - -func NetLimit(mgr network.ResourceManager, ctx context.Context, scope string) (NetLimitConfig, error) { - var result NetLimitConfig - getLimit := func(s network.ResourceScope) error { - limiter, ok := s.(rcmgr.ResourceScopeLimiter) - if !ok { - return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") - } - - limit := limiter.Limit() - switch l := limit.(type) { - case *rcmgr.StaticLimit: - result.Memory = l.Memory - result.Streams = l.BaseLimit.Streams - result.StreamsInbound = l.BaseLimit.StreamsInbound - result.StreamsOutbound = l.BaseLimit.StreamsOutbound - result.Conns = l.BaseLimit.Conns - result.ConnsInbound = l.BaseLimit.ConnsInbound - result.ConnsOutbound = l.BaseLimit.ConnsOutbound - result.FD = l.BaseLimit.FD - - case *rcmgr.DynamicLimit: - result.Dynamic = true - result.MemoryFraction = l.MemoryLimit.MemoryFraction - result.MinMemory = l.MemoryLimit.MinMemory - result.MaxMemory = l.MemoryLimit.MaxMemory - result.Streams = l.BaseLimit.Streams - result.StreamsInbound = l.BaseLimit.StreamsInbound - result.StreamsOutbound = l.BaseLimit.StreamsOutbound - result.Conns = l.BaseLimit.Conns - result.ConnsInbound = l.BaseLimit.ConnsInbound - result.ConnsOutbound = l.BaseLimit.ConnsOutbound - result.FD = l.BaseLimit.FD - - default: - return fmt.Errorf("unknown limit type %T", limit) - } - - return nil - } - - switch { - case scope == "system": - err := mgr.ViewSystem(func(s network.ResourceScope) error { - return getLimit(s) - }) - return result, err - - case scope == "transient": - err := mgr.ViewTransient(func(s network.ResourceScope) error { - return getLimit(s) - }) - return result, err - - case strings.HasPrefix(scope, "svc:"): - svc := scope[4:] - err := mgr.ViewService(svc, func(s network.ServiceScope) error { - return getLimit(s) - }) - return result, err - - case strings.HasPrefix(scope, "proto:"): - proto := scope[6:] - err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { - return getLimit(s) - }) - return result, err - - case strings.HasPrefix(scope, "peer:"): - p := scope[5:] - pid, err := peer.Decode(p) - if err != nil { - return result, fmt.Errorf("invalid peer ID: %s: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { - return getLimit(s) - }) - return result, err - - default: - return result, fmt.Errorf("invalid scope %s", scope) - } -} - -func NetSetLimit(mgr network.ResourceManager, ctx context.Context, scope string, limit NetLimitConfig) error { - setLimit := func(s network.ResourceScope) error { - limiter, ok := s.(rcmgr.ResourceScopeLimiter) - if !ok { - return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") - } - - var newLimit rcmgr.Limit - if limit.Dynamic { - newLimit = &rcmgr.DynamicLimit{ - MemoryLimit: rcmgr.MemoryLimit{ - MemoryFraction: limit.MemoryFraction, - MinMemory: limit.MinMemory, - MaxMemory: limit.MaxMemory, - }, - BaseLimit: rcmgr.BaseLimit{ - Streams: limit.Streams, - StreamsInbound: limit.StreamsInbound, - StreamsOutbound: limit.StreamsOutbound, - Conns: limit.Conns, - ConnsInbound: limit.ConnsInbound, - ConnsOutbound: limit.ConnsOutbound, - FD: limit.FD, - }, - } - } else { - newLimit = &rcmgr.StaticLimit{ - Memory: limit.Memory, - BaseLimit: rcmgr.BaseLimit{ - Streams: limit.Streams, - StreamsInbound: limit.StreamsInbound, - StreamsOutbound: limit.StreamsOutbound, - Conns: limit.Conns, - ConnsInbound: limit.ConnsInbound, - ConnsOutbound: limit.ConnsOutbound, - FD: limit.FD, - }, - } - } - - limiter.SetLimit(newLimit) - return nil - } - - switch { - case scope == "system": - err := mgr.ViewSystem(func(s network.ResourceScope) error { - return setLimit(s) - }) - return err - - case scope == "transient": - err := mgr.ViewTransient(func(s network.ResourceScope) error { - return setLimit(s) - }) - return err - - case strings.HasPrefix(scope, "svc:"): - svc := scope[4:] - err := mgr.ViewService(svc, func(s network.ServiceScope) error { - return setLimit(s) - }) - return err - - case strings.HasPrefix(scope, "proto:"): - proto := scope[6:] - err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { - return setLimit(s) - }) - return err - - case strings.HasPrefix(scope, "peer:"): - p := scope[5:] - pid, err := peer.Decode(p) - if err != nil { - return fmt.Errorf("invalid peer ID: %s: %w", p, err) - } - err = mgr.ViewPeer(pid, func(s network.PeerScope) error { - return setLimit(s) - }) - return err - - default: - return fmt.Errorf("invalid scope %s", scope) - } + Encoders: cmds.EncoderMap{ + cmds.Text: HumanJsonEncoder, + }, } type streamInfo struct { diff --git a/core/node/groups.go b/core/node/groups.go index 4d0f339195f..48ee175a33c 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -145,7 +145,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { BaseLibP2P, // Services (resource management) - fx.Provide(libp2p.ResourceManager()), + fx.Provide(libp2p.ResourceManager(cfg.Swarm.ResourceMgr)), fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)), diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index 9d38dd958f3..c4af75ae142 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -5,56 +5,364 @@ import ( "errors" "fmt" "os" + "strings" + cfg "github.com/ipfs/go-ipfs/config" + config "github.com/ipfs/go-ipfs/config" "github.com/ipfs/go-ipfs/repo" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" "go.uber.org/fx" ) const NetLimitDefaultFilename = "limit.json" -func ResourceManager() func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { +func ResourceManager(cfg cfg.ResourceMgr) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var limiter *rcmgr.BasicLimiter + var manager network.ResourceManager var opts Libp2pOpts - limitFile, err := os.Open(NetLimitDefaultFilename) - if errors.Is(err, os.ErrNotExist) { - log.Debug("limit file %s not found, creating a default resource manager", NetLimitDefaultFilename) - limiter = rcmgr.NewDefaultLimiter() - } else { - if err != nil { - return nil, opts, fmt.Errorf("error opening limit JSON file %s: %w", - NetLimitDefaultFilename, err) + // Config Swarm.ResourceMgr.Enabled decides if we run a real manager + enabled := cfg.Enabled.WithDefault(false) + + /// ENV overrides Config (if present) + // TODO: document IPFS_RCMGR and IPFS_DEBUG_RCMGR in docs/environment-variables.md + switch os.Getenv("IPFS_RCMGR") { + case "0", "false": + enabled = false + case "1", "true": + enabled = true + } + + if enabled { + log.Debug("libp2p resource manager is enabled") + + // Try defaults from limit.json if provided + // (a convention to make libp2p team life easier) + // TODO: look in current dir and in IPFS_PATH + _, err := os.Stat(NetLimitDefaultFilename) + if !errors.Is(err, os.ErrNotExist) { + limitFile, err := os.Open(NetLimitDefaultFilename) + if err != nil { + return nil, opts, fmt.Errorf("error opening limit JSON file %s: %w", + NetLimitDefaultFilename, err) + } + defer limitFile.Close() //nolint:errcheck + limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitFile) + if err != nil { + return nil, opts, fmt.Errorf("error parsing limit file: %w", err) + } + + } else { + // Use defaults from go-libp2p + log.Debug("limit file %s not found, creating a default resource manager", NetLimitDefaultFilename) + limiter = rcmgr.NewDefaultLimiter() + } + + libp2p.SetDefaultServiceLimits(limiter) + + var ropts []rcmgr.Option + if os.Getenv("IPFS_DEBUG_RCMGR") != "" { + ropts = append(ropts, rcmgr.WithTrace("rcmgr.json.gz")) } - defer limitFile.Close() //nolint:errcheck - limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitFile) + manager, err = rcmgr.NewResourceManager(limiter, ropts...) if err != nil { - return nil, opts, fmt.Errorf("error parsing limit file: %w", err) + return nil, opts, fmt.Errorf("error creating resource manager: %w", err) } - } - libp2p.SetDefaultServiceLimits(limiter) + // Apply user-defined Swarm.ResourceMgr.Limits + for scope, userLimit := range cfg.Limits { + err := NetSetLimit(manager, scope, userLimit) + if err != nil { + return nil, opts, fmt.Errorf("error while applying Swarm.ResourceMgr.Limits for scope %q: %w", scope, err) + } + } - var ropts []rcmgr.Option - if os.Getenv("IPFS_DEBUG_RCMGR") != "" { - ropts = append(ropts, rcmgr.WithTrace("rcmgr.json.gz")) + } else { + log.Debug("libp2p resource manager is disabled") + manager = network.NullResourceManager } - rcmgr, err := rcmgr.NewResourceManager(limiter, ropts...) - if err != nil { - return nil, opts, fmt.Errorf("error creating resource manager: %w", err) - } - opts.Opts = append(opts.Opts, libp2p.ResourceManager(rcmgr)) + opts.Opts = append(opts.Opts, libp2p.ResourceManager(manager)) lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { - return rcmgr.Close() + return manager.Close() }}) - return rcmgr, opts, nil + return manager, opts, nil + } +} + +type NetStatOut struct { + System *network.ScopeStat `json:",omitempty"` + Transient *network.ScopeStat `json:",omitempty"` + Services map[string]network.ScopeStat `json:",omitempty"` + Protocols map[string]network.ScopeStat `json:",omitempty"` + Peers map[string]network.ScopeStat `json:",omitempty"` +} + +func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) { + var err error + var result NetStatOut + switch { + case scope == "all": + rapi, ok := mgr.(rcmgr.ResourceManagerState) + if !ok { + return result, fmt.Errorf("resource manager does not support ResourceManagerState API") + } + + stat := rapi.Stat() + result.System = &stat.System + result.Transient = &stat.Transient + if len(stat.Services) > 0 { + result.Services = stat.Services + } + if len(stat.Protocols) > 0 { + result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols)) + for proto, stat := range stat.Protocols { + result.Protocols[string(proto)] = stat + } + } + if len(stat.Peers) > 0 { + result.Peers = make(map[string]network.ScopeStat, len(stat.Peers)) + for p, stat := range stat.Peers { + result.Peers[p.Pretty()] = stat + } + } + + return result, nil + + case scope == config.ResourceMgrSystemScope: + err = mgr.ViewSystem(func(s network.ResourceScope) error { + stat := s.Stat() + result.System = &stat + return nil + }) + return result, err + + case scope == config.ResourceMgrTransientScope: + err = mgr.ViewTransient(func(s network.ResourceScope) error { + stat := s.Stat() + result.Transient = &stat + return nil + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): + svc := scope[4:] + err = mgr.ViewService(svc, func(s network.ServiceScope) error { + stat := s.Stat() + result.Services = map[string]network.ScopeStat{ + svc: stat, + } + return nil + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): + proto := scope[6:] + err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + stat := s.Stat() + result.Protocols = map[string]network.ScopeStat{ + proto: stat, + } + return nil + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + stat := s.Stat() + result.Peers = map[string]network.ScopeStat{ + p: stat, + } + return nil + }) + return result, err + + default: + return result, fmt.Errorf("invalid scope %q", scope) + } +} + +func NetLimit(mgr network.ResourceManager, scope string) (config.ResourceMgrScopeConfig, error) { + var result config.ResourceMgrScopeConfig + getLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + limit := limiter.Limit() + switch l := limit.(type) { + case *rcmgr.StaticLimit: + result.Dynamic = false + result.Memory = l.Memory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + case *rcmgr.DynamicLimit: + result.Dynamic = true + result.MemoryFraction = l.MemoryLimit.MemoryFraction + result.MinMemory = l.MemoryLimit.MinMemory + result.MaxMemory = l.MemoryLimit.MaxMemory + result.Streams = l.BaseLimit.Streams + result.StreamsInbound = l.BaseLimit.StreamsInbound + result.StreamsOutbound = l.BaseLimit.StreamsOutbound + result.Conns = l.BaseLimit.Conns + result.ConnsInbound = l.BaseLimit.ConnsInbound + result.ConnsOutbound = l.BaseLimit.ConnsOutbound + result.FD = l.BaseLimit.FD + + default: + return fmt.Errorf("unknown limit type %T", limit) + } + + return nil + } + + switch { + case scope == config.ResourceMgrSystemScope: + err := mgr.ViewSystem(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case scope == config.ResourceMgrTransientScope: + err := mgr.ViewTransient(func(s network.ResourceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): + svc := scope[4:] + err := mgr.ViewService(svc, func(s network.ServiceScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): + proto := scope[6:] + err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return getLimit(s) + }) + return result, err + + case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + return getLimit(s) + }) + return result, err + + default: + return result, fmt.Errorf("invalid scope %q", scope) + } +} + +func NetSetLimit(mgr network.ResourceManager, scope string, limit config.ResourceMgrScopeConfig) error { + setLimit := func(s network.ResourceScope) error { + limiter, ok := s.(rcmgr.ResourceScopeLimiter) + if !ok { + return fmt.Errorf("resource scope doesn't implement ResourceScopeLimiter interface") + } + + var newLimit rcmgr.Limit + if limit.Dynamic { + newLimit = &rcmgr.DynamicLimit{ + MemoryLimit: rcmgr.MemoryLimit{ + MemoryFraction: limit.MemoryFraction, + MinMemory: limit.MinMemory, + MaxMemory: limit.MaxMemory, + }, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } else { + newLimit = &rcmgr.StaticLimit{ + Memory: limit.Memory, + BaseLimit: rcmgr.BaseLimit{ + Streams: limit.Streams, + StreamsInbound: limit.StreamsInbound, + StreamsOutbound: limit.StreamsOutbound, + Conns: limit.Conns, + ConnsInbound: limit.ConnsInbound, + ConnsOutbound: limit.ConnsOutbound, + FD: limit.FD, + }, + } + } + + limiter.SetLimit(newLimit) + return nil + } + + switch { + case scope == config.ResourceMgrSystemScope: + err := mgr.ViewSystem(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case scope == config.ResourceMgrTransientScope: + err := mgr.ViewTransient(func(s network.ResourceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): + svc := scope[4:] + err := mgr.ViewService(svc, func(s network.ServiceScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): + proto := scope[6:] + err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { + return setLimit(s) + }) + return err + + case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): + p := scope[5:] + pid, err := peer.Decode(p) + if err != nil { + return fmt.Errorf("invalid peer ID: %q: %w", p, err) + } + err = mgr.ViewPeer(pid, func(s network.PeerScope) error { + return setLimit(s) + }) + return err + + default: + return fmt.Errorf("invalid scope %q", scope) } } diff --git a/docs/config.md b/docs/config.md index 2c935190923..34845379707 100644 --- a/docs/config.md +++ b/docs/config.md @@ -130,6 +130,7 @@ config file at runtime. - [`Swarm.ConnMgr.LowWater`](#swarmconnmgrlowwater) - [`Swarm.ConnMgr.HighWater`](#swarmconnmgrhighwater) - [`Swarm.ConnMgr.GracePeriod`](#swarmconnmgrgraceperiod) + - [`Swarm.ResourceMgr`](#swarmresourcemgr) - [`Swarm.Transports`](#swarmtransports) - [`Swarm.Transports.Network`](#swarmtransportsnetwork) - [`Swarm.Transports.Network.TCP`](#swarmtransportsnetworktcp) @@ -1628,6 +1629,74 @@ Default: `"20s"` Type: `duration` +### `Swarm.ResourceMgr` + +The [libp2p Network Resource Manager](https://github.com/libp2p/go-libp2p-resource-manager#readme) allows setting limits per a scope, +and track recource usage over time. + +#### `Swarm.ResourceMgr.Enabled` + +**EXPERIMENTAL**: this feature is is disabled by default, use with caution. + +Enables the libp2p Network Resource Manager and auguments the default limits +using user-defined ones in `Swarm.ResourceMgr.Limits` (if present). + +Default: `false` + +Type: `flag` + +#### `Swarm.ResourceMgr.Limits` + +Map of resource limits [per scope](https://github.com/libp2p/go-libp2p-resource-manager#resource-scopes). + +The key is a string with the scope name. It can be one of the following: +- `system` -- limits the system aggregate resource usage. +- `transient` -- limits the transient resource usage +- `svc:` -- limits the resource usage of a specific service +- `proto:` -- limits the resource usage of a specific protocol +- `peer:` -- limits the resource usage of a specific peer + +Example: + +```json +{ + "Swarm": { + "ResourceMgr": { + "Enabled": true, + "Limits": { + "system": { + "Conns": 1024, + "ConnsInbound": 256, + "ConnsOutbound": 1024, + "FD": 512, + "Memory": 1073741824, + "Streams": 16384, + "StreamsInbound": 4096, + "StreamsOutbound": 16384 + }, + "transient": { + "Conns": 128, + "ConnsInbound": 32, + "ConnsOutbound": 128, + "FD": 128, + "Memory": 67108864, + "Streams": 512, + "StreamsInbound": 128, + "StreamsOutbound": 512 + } + } + } + } +} +``` + +Current resource usage and a list of services, protocols, and peers can be obtained via +`ipfs swarm stats --help` + +Default: `{}` (empty == implicit defaults from go-libp2p) + +Type: `object[string->object] + ### `Swarm.Transports` Configuration section for libp2p transports. An empty configuration will apply