Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

trace the scope name as a JSON object #45

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ func (r *resourceManager) gc() {

func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace, rcmgr.metrics),
resourceScope: newResourceScope(limit, nil, scopeName{IsSystem: true}, rcmgr.trace, rcmgr.metrics),
}
}

func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
"transient", rcmgr.trace, rcmgr.metrics),
scopeName{IsTransient: true}, rcmgr.trace, rcmgr.metrics),
system: rcmgr.system,
}
}
Expand All @@ -382,7 +382,7 @@ func newServiceScope(service string, limit Limit, rcmgr *resourceManager) *servi
return &serviceScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("service:%s", service), rcmgr.trace, rcmgr.metrics),
scopeName{Service: service}, rcmgr.trace, rcmgr.metrics),
service: service,
rcmgr: rcmgr,
}
Expand All @@ -392,7 +392,7 @@ func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *p
return &protocolScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics),
scopeName{Protocol: proto}, rcmgr.trace, rcmgr.metrics),
proto: proto,
rcmgr: rcmgr,
}
Expand All @@ -402,7 +402,7 @@ func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("peer:%s", p), rcmgr.trace, rcmgr.metrics),
scopeName{Peer: p}, rcmgr.trace, rcmgr.metrics),
peer: p,
rcmgr: rcmgr,
}
Expand All @@ -412,7 +412,7 @@ func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *r
return &connectionScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
scopeName{ConnID: rcmgr.nextConnId()}, rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
Expand All @@ -423,7 +423,7 @@ func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *
return &streamScope{
resourceScope: newResourceScope(limit,
[]*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics),
scopeName{StreamID: rcmgr.nextStreamId()}, rcmgr.trace, rcmgr.metrics),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
Expand All @@ -450,7 +450,9 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}

ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
sn := s.name
sn.Peer = p
ps = newResourceScope(l, nil, sn, s.rcmgr.trace, s.rcmgr.metrics)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, whats the name here?

s.peers[p] = ps

ps.IncRef()
Expand All @@ -477,7 +479,9 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}

ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
sn := s.name
sn.Peer = p
ps = newResourceScope(l, nil, sn, s.rcmgr.trace, s.rcmgr.metrics)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too.

s.peers[p] = ps

ps.IncRef()
Expand Down
117 changes: 109 additions & 8 deletions scope.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package rcmgr

import (
"encoding/json"
"fmt"
"sync"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

// resources tracks the current state of resource consumption
Expand All @@ -18,6 +21,95 @@ type resources struct {
memory int64
}

type scopeName struct {
IsSystem, IsTransient bool
ConnID, StreamID, SpanID int64
Service string
Protocol protocol.ID
Peer peer.ID
}

func addToClass(cl, str string) string {
if cl == "" {
return str
}
return cl + "-" + str
}

func (s scopeName) MarshalJSON() ([]byte, error) {
var class string
if s.IsSystem {
class = "system"
}
if s.IsTransient {
class = "transient"
}
if s.Service != "" {
class = "service"
}
if s.Protocol != "" {
class = "protocol"
}
if s.Peer != "" {
class = addToClass(class, "peer")
}
if s.ConnID > 0 {
class = "conn"
}
if s.StreamID > 0 {
class = "stream"
}
if s.SpanID > 0 {
class = addToClass(class, "span")
}
return json.Marshal(struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's super annoying that standard library encoding/json still doesn't allow allocation-free marshaling of arbitrary structs. At leasts it's a stack allocation though.

Class string `json:"class,omitempty"`
Service string `json:"service,omitempty"`
Protocol string `json:"protocol,omitempty"`
Peer string `json:"peer,omitempty"`
Conn int64 `json:"conn,omitempty"`
Stream int64 `json:"stream,omitempty"`
Span int64 `json:"span,omitempty"`
}{
Class: class,
Service: s.Service,
Protocol: string(s.Protocol),
Peer: s.Peer.String(),
Conn: s.ConnID,
Stream: s.StreamID,
Span: s.SpanID,
})
}

func (s scopeName) String() string {
var name string
if s.IsSystem {
name = "system"
}
if s.IsTransient {
name = "transient"
}
if s.Service != "" {
name = "service"
}
if s.Protocol != "" {
name = "protocol"
}
if s.Peer != "" {
name = addToClass(name, "peer:"+s.Peer.String())
}
if s.ConnID > 0 {
name = fmt.Sprintf("conn-%d", s.ConnID)
}
if s.StreamID > 0 {
name = fmt.Sprintf("stream-%d", s.StreamID)
}
if s.SpanID > 0 {
name = addToClass(name, fmt.Sprintf("span:%d", s.SpanID))
}
return name
}

// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
Expand All @@ -32,19 +124,21 @@ type resourceScope struct {
done bool
refCnt int

spanID int64

rc resources
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set

name string // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
name scopeName // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
}

var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)

func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace, metrics *metrics) *resourceScope {
func newResourceScope(limit Limit, edges []*resourceScope, name scopeName, trace *trace, metrics *metrics) *resourceScope {
for _, e := range edges {
e.IncRef()
}
Expand All @@ -59,11 +153,13 @@ func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *t
return r
}

func newResourceScopeSpan(owner *resourceScope) *resourceScope {
func newResourceScopeSpan(owner *resourceScope, id int64) *resourceScope {
sn := owner.name
sn.SpanID = id
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
name: sn,
trace: owner.trace,
metrics: owner.metrics,
}
Expand Down Expand Up @@ -227,7 +323,7 @@ func (rc *resources) stat() network.ScopeStat {

// resourceScope implementation
func (s *resourceScope) wrapError(err error) error {
return fmt.Errorf("%s: %w", s.name, err)
return fmt.Errorf("%s: %w", s.name.String(), err)
}

func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
Expand Down Expand Up @@ -610,6 +706,11 @@ func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}

func (s *resourceScope) nextSpanID() int64 {
s.spanID++
return s.spanID
}

func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
s.Lock()
defer s.Unlock()
Expand All @@ -619,7 +720,7 @@ func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
}

s.refCnt++
return newResourceScopeSpan(s), nil
return newResourceScopeSpan(s, s.nextSpanID()), nil
}

func (s *resourceScope) Done() {
Expand Down
Loading