Skip to content

Commit

Permalink
Logging improvements (#941)
Browse files Browse the repository at this point in the history
* Loggiing improvements

* Add more debug logging

* Add more debug logging

* Use logging helpers
  • Loading branch information
iand authored Sep 28, 2023
1 parent 4fa560f commit 6a4249c
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 25 deletions.
57 changes: 44 additions & 13 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

// DHT is an implementation of Kademlia with S/Kademlia modifications.
Expand Down Expand Up @@ -106,13 +107,23 @@ func New(h host.Host, cfg *Config) (*DHT, error) {

// instantiate a new Kademlia DHT coordinator.
coordCfg := coord.DefaultCoordinatorConfig()
coordCfg.Clock = cfg.Clock
coordCfg.Logger = cfg.Logger
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider

coordCfg.Query.Clock = cfg.Clock
coordCfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery")
coordCfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Query.Concurrency = cfg.Query.Concurrency
coordCfg.Query.Timeout = cfg.Query.Timeout
coordCfg.Query.RequestConcurrency = cfg.Query.RequestConcurrency
coordCfg.Query.RequestTimeout = cfg.Query.RequestTimeout
coordCfg.Clock = cfg.Clock
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider

coordCfg.Routing.Clock = cfg.Clock
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)

d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), &router{host: h, ProtocolID: cfg.ProtocolID}, d.rt, coordCfg)
if err != nil {
Expand Down Expand Up @@ -202,11 +213,11 @@ func (d *DHT) initAminoBackends() (map[string]Backend, error) {
// Close cleans up all resources associated with this DHT.
func (d *DHT) Close() error {
if err := d.sub.Close(); err != nil {
d.log.With("err", err).Debug("failed closing event bus subscription")
d.debugErr(err, "failed closing event bus subscription")
}

if err := d.kad.Close(); err != nil {
d.log.With("err", err).Debug("failed closing coordinator")
d.debugErr(err, "failed closing coordinator")
}

for ns, b := range d.backends {
Expand All @@ -216,7 +227,7 @@ func (d *DHT) Close() error {
}

if err := closer.Close(); err != nil {
d.log.Warn("failed closing backend", "namespace", ns, "err", err.Error())
d.warnErr(err, "failed closing backend", "namespace", ns)
}
}

Expand All @@ -230,7 +241,7 @@ func (d *DHT) Close() error {
if d.cfg.ProtocolID == ProtocolIPFS && d.cfg.Datastore == nil {
if pbe, err := typedBackend[*ProvidersBackend](d, namespaceProviders); err == nil {
if err := pbe.datastore.Close(); err != nil {
d.log.Warn("failed closing in memory datastore", "err", err.Error())
d.warnErr(err, "failed closing in memory datastore")
}
}
}
Expand All @@ -244,7 +255,7 @@ func (d *DHT) Close() error {
}

if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
d.debugErr(err, "failed closing stream")
}
}
}
Expand Down Expand Up @@ -302,21 +313,41 @@ func (d *DHT) setClientMode() {
}

if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
d.debugErr(err, "failed closing stream")
}
}
}
}

// logErr is a helper method that uses the slogger of the DHT and writes a
// warning log line with the given message alongside the error. If the error
// warnErr is a helper method that uses the slogger of the DHT and writes a
// warning log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) logErr(err error, msg string) {
func (d *DHT) warnErr(err error, msg string, args ...any) {
if err == nil {
return
}

d.log.Warn(msg, "err", err.Error())
if len(args) == 0 {
d.log.Warn(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Warn(msg, tele.LogAttrError(err))
}

// debugErr is a helper method that uses the slogger of the DHT and writes a
// debug log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) debugErr(err error, msg string, args ...any) {
if err == nil {
return
}
if len(args) == 0 {
d.log.Debug(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Debug(msg, tele.LogAttrError(err))
}

// AddAddresses suggests peers and their associated addresses to be added to the routing table.
Expand Down
15 changes: 15 additions & 0 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error
func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query")
defer span.End()
c.cfg.Logger.Debug("starting query for closest nodes", tele.LogAttrKey(target))

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -381,6 +382,10 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coor
func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coordt.QueryFunc, numResults int) (coordt.QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage")
defer span.End()
if msg == nil {
return coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
}
c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -421,6 +426,10 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor
func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord")
defer span.End()
if msg == nil {
return fmt.Errorf("no message supplied for broadcast")
}
c.cfg.Logger.Debug("starting broadcast with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -474,6 +483,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID,
ctx, ev := wev.Ctx, wev.Event
switch ev := ev.(type) {
case *EventQueryProgressed:
c.cfg.Logger.Debug("query made progress", "query_id", queryID, tele.LogAttrPeerID(ev.NodeID), slog.Duration("elapsed", c.cfg.Clock.Since(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure))
lastStats = coordt.QueryStats{
Start: ev.Stats.Start,
Requests: ev.Stats.Requests,
Expand All @@ -483,12 +493,14 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID,
nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID)
if err != nil {
// ignore unknown node
c.cfg.Logger.Debug("node handler not found", "query_id", queryID, tele.LogAttrError, err)
break
}

err = fn(ctx, nh.ID(), ev.Response, lastStats)
if errors.Is(err, coordt.ErrSkipRemaining) {
// done
c.cfg.Logger.Debug("query done", "query_id", queryID)
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return nil, lastStats, nil
}
Expand All @@ -501,6 +513,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID,
case *EventQueryFinished:
// query is done
lastStats.Exhausted = true
c.cfg.Logger.Debug("query ran to exhaustion", "query_id", queryID, slog.Duration("elapsed", ev.Stats.End.Sub(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure))
return ev.ClosestNodes, lastStats, nil

default:
Expand Down Expand Up @@ -571,6 +584,7 @@ func (c *Coordinator) NotifyConnectivity(ctx context.Context, id kadt.PeerID) er
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity")
defer span.End()

c.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(id), "source", "notify")
c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{
NodeID: id,
})
Expand All @@ -584,6 +598,7 @@ func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID)
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(id), "source", "notify")
c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})
Expand Down
4 changes: 2 additions & 2 deletions v2/internal/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ type EventNotifyConnectivity struct {
NodeID kadt.PeerID
}

func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingNotification() {}
func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingCommand() {}

// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support
// finding closer nodes is known.
Expand Down
2 changes: 2 additions & 0 deletions v2/internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
}
case *EventGetCloserNodesFailure:
// queue an event that will notify the routing behaviour of a failed node
p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query")
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To,
})
Expand Down Expand Up @@ -231,6 +232,7 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
}
case *EventSendMessageFailure:
// queue an event that will notify the routing behaviour of a failed node
p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query")
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To,
})
Expand Down
10 changes: 9 additions & 1 deletion v2/internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
if r.self.Equal(ev.NodeID) {
break
}
r.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(ev.NodeID))

// tell the include state machine in case this is a new peer that could be added to the routing table
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
Expand Down Expand Up @@ -705,6 +707,7 @@ func (r *RoutingBehaviour) advanceBootstrap(ctx context.Context, ev routing.Boot
case *routing.StateBootstrapWaiting:
// bootstrap waiting for a message response, nothing to do
case *routing.StateBootstrapFinished:
r.cfg.Logger.Debug("bootstrap finished", slog.Duration("elapsed", st.Stats.End.Sub(st.Stats.Start)), slog.Int("requests", st.Stats.Requests), slog.Int("failures", st.Stats.Failure))
return &EventBootstrapFinished{
Stats: st.Stats,
}, true
Expand All @@ -726,6 +729,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ
case *routing.StateIncludeConnectivityCheck[kadt.Key, kadt.PeerID]:
span.SetAttributes(attribute.String("out_event", "EventOutboundGetCloserNodes"))
// include wants to send a find node message to a node
r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "include")
return &EventOutboundGetCloserNodes{
QueryID: IncludeQueryID,
To: st.NodeID,
Expand All @@ -743,6 +747,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ

// return the event to notify outwards too
span.SetAttributes(attribute.String("out_event", "EventRoutingUpdated"))
r.cfg.Logger.Debug("peer added to routing table", tele.LogAttrPeerID(st.NodeID))
return &EventRoutingUpdated{
NodeID: st.NodeID,
}, true
Expand All @@ -768,6 +773,7 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve
switch st := st.(type) {
case *routing.StateProbeConnectivityCheck[kadt.Key, kadt.PeerID]:
// include wants to send a find node message to a node
r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "probe")
return &EventOutboundGetCloserNodes{
QueryID: ProbeQueryID,
To: st.NodeID,
Expand All @@ -778,6 +784,7 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve
// a node has failed a connectivity check and been removed from the routing table and the probe list

// emit an EventRoutingRemoved event to notify clients that the node has been removed
r.cfg.Logger.Debug("peer removed from routing table", tele.LogAttrPeerID(st.NodeID))
r.pending = append(r.pending, &EventRoutingRemoved{
NodeID: st.NodeID,
})
Expand Down Expand Up @@ -809,6 +816,7 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor
switch st := bstate.(type) {

case *routing.StateExploreFindCloser[kadt.Key, kadt.PeerID]:
r.cfg.Logger.Debug("starting explore", slog.Int("cpl", st.Cpl), tele.LogAttrPeerID(st.NodeID))
return &EventOutboundGetCloserNodes{
QueryID: routing.ExploreQueryID,
To: st.NodeID,
Expand All @@ -823,7 +831,7 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor
case *routing.StateExploreQueryTimeout:
// nothing to do except notify via telemetry
case *routing.StateExploreFailure:
r.cfg.Logger.Warn("explore failure", "cpl", st.Cpl, "error", st.Error)
r.cfg.Logger.Warn("explore failure", slog.Int("cpl", st.Cpl), tele.LogAttrError(st.Error))
case *routing.StateExploreIdle:
// bootstrap not running, nothing to do
default:
Expand Down
1 change: 1 addition & 0 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func (d *DHT) SearchValue(ctx context.Context, s string, option ...routing.Optio
func (d *DHT) Bootstrap(ctx context.Context) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap")
defer span.End()
d.log.Info("Starting bootstrap")

seed := make([]kadt.PeerID, len(d.cfg.BootstrapPeers))
for i, addrInfo := range d.cfg.BootstrapPeers {
Expand Down
6 changes: 3 additions & 3 deletions v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ func (d *DHT) streamHandler(s network.Stream) {

if err := s.Scope().SetService(ServiceName); err != nil {
d.log.LogAttrs(ctx, slog.LevelWarn, "error attaching stream to DHT service", slog.String("err", err.Error()))
d.logErr(s.Reset(), "failed to reset stream")
d.warnErr(s.Reset(), "failed to reset stream")
span.RecordError(err)
return
}

if err := d.handleNewStream(ctx, s); err != nil {
// If we exited with an error, let the remote peer know.
d.logErr(s.Reset(), "failed to reset stream")
d.warnErr(s.Reset(), "failed to reset stream")
span.RecordError(err)
} else {
// If we exited without an error, close gracefully.
d.logErr(s.Close(), "failed to close stream")
d.warnErr(s.Close(), "failed to close stream")
}
}

Expand Down
27 changes: 25 additions & 2 deletions v2/tele/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,33 @@ package tele

import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"
)

func DefaultLogger(system string) *slog.Logger {
return slog.New(zapslog.NewHandler(logging.Logger(system).Desugar().Core()))
func DefaultLogger(name string) *slog.Logger {
return slog.New(zapslog.NewHandler(logging.Logger(name).Desugar().Core()))
}

// Attributes that can be used with logging or tracing
const (
AttrKeyError = "error"
AttrKeyPeerID = "peer_id"
AttrKeyKey = "key"
AttrKeyCacheHit = "hit"
AttrKeyInEvent = "in_event"
AttrKeyOutEvent = "out_event"
)

func LogAttrError(err error) slog.Attr {
return slog.Attr{Key: AttrKeyError, Value: slog.AnyValue(err)}
}

func LogAttrPeerID(id kadt.PeerID) slog.Attr {
return slog.String(AttrKeyPeerID, id.String())
}

func LogAttrKey(kk kadt.Key) slog.Attr {
return slog.String(AttrKeyKey, kk.HexString())
}
8 changes: 4 additions & 4 deletions v2/tele/tele.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func AttrInstanceID(instanceID string) attribute.KeyValue {
}

func AttrPeerID(pid string) attribute.KeyValue {
return attribute.String("peer_id", pid)
return attribute.String(AttrKeyPeerID, pid)
}

func AttrCacheHit(hit bool) attribute.KeyValue {
return attribute.Bool("hit", hit)
return attribute.Bool(AttrKeyCacheHit, hit)
}

// AttrRecordType is currently only used for the provider backend LRU cache
Expand All @@ -101,12 +101,12 @@ func AttrKey(val string) attribute.KeyValue {

// AttrInEvent creates an attribute that records the type of an event
func AttrInEvent(t any) attribute.KeyValue {
return attribute.String("in_event", fmt.Sprintf("%T", t))
return attribute.String(AttrKeyInEvent, fmt.Sprintf("%T", t))
}

// AttrOutEvent creates an attribute that records the type of an event being returned
func AttrOutEvent(t any) attribute.KeyValue {
return attribute.String("out_event", fmt.Sprintf("%T", t))
return attribute.String(AttrKeyOutEvent, fmt.Sprintf("%T", t))
}

// WithAttributes is a function that attaches the provided attributes to the
Expand Down

0 comments on commit 6a4249c

Please sign in to comment.