diff --git a/v2/dht.go b/v2/dht.go index b9ec9993..b5229c9e 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) // DHT is an implementation of Kademlia with S/Kademlia modifications. @@ -106,13 +107,23 @@ func New(h host.Host, cfg *Config) (*DHT, error) { // instantiate a new Kademlia DHT coordinator. coordCfg := coord.DefaultCoordinatorConfig() + coordCfg.Clock = cfg.Clock + coordCfg.Logger = cfg.Logger + coordCfg.MeterProvider = cfg.MeterProvider + coordCfg.TracerProvider = cfg.TracerProvider + + coordCfg.Query.Clock = cfg.Clock + coordCfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery") + coordCfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) coordCfg.Query.Concurrency = cfg.Query.Concurrency coordCfg.Query.Timeout = cfg.Query.Timeout coordCfg.Query.RequestConcurrency = cfg.Query.RequestConcurrency coordCfg.Query.RequestTimeout = cfg.Query.RequestTimeout - coordCfg.Clock = cfg.Clock - coordCfg.MeterProvider = cfg.MeterProvider - coordCfg.TracerProvider = cfg.TracerProvider + + coordCfg.Routing.Clock = cfg.Clock + coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing") + coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName) + coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName) d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), &router{host: h, ProtocolID: cfg.ProtocolID}, d.rt, coordCfg) if err != nil { @@ -202,11 +213,11 @@ func (d *DHT) initAminoBackends() (map[string]Backend, error) { // Close cleans up all resources associated with this DHT. func (d *DHT) Close() error { if err := d.sub.Close(); err != nil { - d.log.With("err", err).Debug("failed closing event bus subscription") + d.debugErr(err, "failed closing event bus subscription") } if err := d.kad.Close(); err != nil { - d.log.With("err", err).Debug("failed closing coordinator") + d.debugErr(err, "failed closing coordinator") } for ns, b := range d.backends { @@ -216,7 +227,7 @@ func (d *DHT) Close() error { } if err := closer.Close(); err != nil { - d.log.Warn("failed closing backend", "namespace", ns, "err", err.Error()) + d.warnErr(err, "failed closing backend", "namespace", ns) } } @@ -230,7 +241,7 @@ func (d *DHT) Close() error { if d.cfg.ProtocolID == ProtocolIPFS && d.cfg.Datastore == nil { if pbe, err := typedBackend[*ProvidersBackend](d, namespaceProviders); err == nil { if err := pbe.datastore.Close(); err != nil { - d.log.Warn("failed closing in memory datastore", "err", err.Error()) + d.warnErr(err, "failed closing in memory datastore") } } } @@ -244,7 +255,7 @@ func (d *DHT) Close() error { } if err := s.Reset(); err != nil { - d.log.With("err", err).Debug("failed closing stream") + d.debugErr(err, "failed closing stream") } } } @@ -302,21 +313,41 @@ func (d *DHT) setClientMode() { } if err := s.Reset(); err != nil { - d.log.With("err", err).Debug("failed closing stream") + d.debugErr(err, "failed closing stream") } } } } -// logErr is a helper method that uses the slogger of the DHT and writes a -// warning log line with the given message alongside the error. If the error +// warnErr is a helper method that uses the slogger of the DHT and writes a +// warning log line with the given message alongside the error. args is a list of +// key/value pairs or slog.Attrs that will be included with the log message. If the error // is nil, this method is a no-op. -func (d *DHT) logErr(err error, msg string) { +func (d *DHT) warnErr(err error, msg string, args ...any) { if err == nil { return } - d.log.Warn(msg, "err", err.Error()) + if len(args) == 0 { + d.log.Warn(msg, tele.LogAttrError(err)) + return + } + d.log.With(args...).Warn(msg, tele.LogAttrError(err)) +} + +// debugErr is a helper method that uses the slogger of the DHT and writes a +// debug log line with the given message alongside the error. args is a list of +// key/value pairs or slog.Attrs that will be included with the log message. If the error +// is nil, this method is a no-op. +func (d *DHT) debugErr(err error, msg string, args ...any) { + if err == nil { + return + } + if len(args) == 0 { + d.log.Debug(msg, tele.LogAttrError(err)) + return + } + d.log.With(args...).Debug(msg, tele.LogAttrError(err)) } // AddAddresses suggests peers and their associated addresses to be added to the routing table. diff --git a/v2/internal/coord/coordinator.go b/v2/internal/coord/coordinator.go index e8e1428b..d4dda7fa 100644 --- a/v2/internal/coord/coordinator.go +++ b/v2/internal/coord/coordinator.go @@ -336,6 +336,7 @@ func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query") defer span.End() + c.cfg.Logger.Debug("starting query for closest nodes", tele.LogAttrKey(target)) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -381,6 +382,10 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coor func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coordt.QueryFunc, numResults int) (coordt.QueryStats, error) { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage") defer span.End() + if msg == nil { + return coordt.QueryStats{}, fmt.Errorf("no message supplied for query") + } + c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String())) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -421,6 +426,10 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error { ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord") defer span.End() + if msg == nil { + return fmt.Errorf("no message supplied for broadcast") + } + c.cfg.Logger.Debug("starting broadcast with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String())) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -474,6 +483,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, ctx, ev := wev.Ctx, wev.Event switch ev := ev.(type) { case *EventQueryProgressed: + c.cfg.Logger.Debug("query made progress", "query_id", queryID, tele.LogAttrPeerID(ev.NodeID), slog.Duration("elapsed", c.cfg.Clock.Since(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure)) lastStats = coordt.QueryStats{ Start: ev.Stats.Start, Requests: ev.Stats.Requests, @@ -483,12 +493,14 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID) if err != nil { // ignore unknown node + c.cfg.Logger.Debug("node handler not found", "query_id", queryID, tele.LogAttrError, err) break } err = fn(ctx, nh.ID(), ev.Response, lastStats) if errors.Is(err, coordt.ErrSkipRemaining) { // done + c.cfg.Logger.Debug("query done", "query_id", queryID) c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID}) return nil, lastStats, nil } @@ -501,6 +513,7 @@ func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, case *EventQueryFinished: // query is done lastStats.Exhausted = true + c.cfg.Logger.Debug("query ran to exhaustion", "query_id", queryID, slog.Duration("elapsed", ev.Stats.End.Sub(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure)) return ev.ClosestNodes, lastStats, nil default: @@ -571,6 +584,7 @@ func (c *Coordinator) NotifyConnectivity(ctx context.Context, id kadt.PeerID) er ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity") defer span.End() + c.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(id), "source", "notify") c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{ NodeID: id, }) @@ -584,6 +598,7 @@ func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity") defer span.End() + c.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(id), "source", "notify") c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{ NodeID: id, }) diff --git a/v2/internal/coord/event.go b/v2/internal/coord/event.go index 69f86a33..20810ce0 100644 --- a/v2/internal/coord/event.go +++ b/v2/internal/coord/event.go @@ -221,8 +221,8 @@ type EventNotifyConnectivity struct { NodeID kadt.PeerID } -func (*EventNotifyConnectivity) behaviourEvent() {} -func (*EventNotifyConnectivity) routingNotification() {} +func (*EventNotifyConnectivity) behaviourEvent() {} +func (*EventNotifyConnectivity) routingCommand() {} // EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support // finding closer nodes is known. diff --git a/v2/internal/coord/query.go b/v2/internal/coord/query.go index 68ca59d5..7bae8ac0 100644 --- a/v2/internal/coord/query.go +++ b/v2/internal/coord/query.go @@ -200,6 +200,7 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { } case *EventGetCloserNodesFailure: // queue an event that will notify the routing behaviour of a failed node + p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query") p.pending = append(p.pending, &EventNotifyNonConnectivity{ ev.To, }) @@ -231,6 +232,7 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { } case *EventSendMessageFailure: // queue an event that will notify the routing behaviour of a failed node + p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query") p.pending = append(p.pending, &EventNotifyNonConnectivity{ ev.To, }) diff --git a/v2/internal/coord/routing.go b/v2/internal/coord/routing.go index ab6918d5..bf019752 100644 --- a/v2/internal/coord/routing.go +++ b/v2/internal/coord/routing.go @@ -585,6 +585,8 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { if r.self.Equal(ev.NodeID) { break } + r.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(ev.NodeID)) + // tell the include state machine in case this is a new peer that could be added to the routing table cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{ NodeID: ev.NodeID, @@ -705,6 +707,7 @@ func (r *RoutingBehaviour) advanceBootstrap(ctx context.Context, ev routing.Boot case *routing.StateBootstrapWaiting: // bootstrap waiting for a message response, nothing to do case *routing.StateBootstrapFinished: + r.cfg.Logger.Debug("bootstrap finished", slog.Duration("elapsed", st.Stats.End.Sub(st.Stats.Start)), slog.Int("requests", st.Stats.Requests), slog.Int("failures", st.Stats.Failure)) return &EventBootstrapFinished{ Stats: st.Stats, }, true @@ -726,6 +729,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ case *routing.StateIncludeConnectivityCheck[kadt.Key, kadt.PeerID]: span.SetAttributes(attribute.String("out_event", "EventOutboundGetCloserNodes")) // include wants to send a find node message to a node + r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "include") return &EventOutboundGetCloserNodes{ QueryID: IncludeQueryID, To: st.NodeID, @@ -743,6 +747,7 @@ func (r *RoutingBehaviour) advanceInclude(ctx context.Context, ev routing.Includ // return the event to notify outwards too span.SetAttributes(attribute.String("out_event", "EventRoutingUpdated")) + r.cfg.Logger.Debug("peer added to routing table", tele.LogAttrPeerID(st.NodeID)) return &EventRoutingUpdated{ NodeID: st.NodeID, }, true @@ -768,6 +773,7 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve switch st := st.(type) { case *routing.StateProbeConnectivityCheck[kadt.Key, kadt.PeerID]: // include wants to send a find node message to a node + r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "probe") return &EventOutboundGetCloserNodes{ QueryID: ProbeQueryID, To: st.NodeID, @@ -778,6 +784,7 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve // a node has failed a connectivity check and been removed from the routing table and the probe list // emit an EventRoutingRemoved event to notify clients that the node has been removed + r.cfg.Logger.Debug("peer removed from routing table", tele.LogAttrPeerID(st.NodeID)) r.pending = append(r.pending, &EventRoutingRemoved{ NodeID: st.NodeID, }) @@ -809,6 +816,7 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor switch st := bstate.(type) { case *routing.StateExploreFindCloser[kadt.Key, kadt.PeerID]: + r.cfg.Logger.Debug("starting explore", slog.Int("cpl", st.Cpl), tele.LogAttrPeerID(st.NodeID)) return &EventOutboundGetCloserNodes{ QueryID: routing.ExploreQueryID, To: st.NodeID, @@ -823,7 +831,7 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor case *routing.StateExploreQueryTimeout: // nothing to do except notify via telemetry case *routing.StateExploreFailure: - r.cfg.Logger.Warn("explore failure", "cpl", st.Cpl, "error", st.Error) + r.cfg.Logger.Warn("explore failure", slog.Int("cpl", st.Cpl), tele.LogAttrError(st.Error)) case *routing.StateExploreIdle: // bootstrap not running, nothing to do default: diff --git a/v2/routing.go b/v2/routing.go index 756a3d48..ed66a559 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -358,6 +358,7 @@ func (d *DHT) SearchValue(ctx context.Context, s string, option ...routing.Optio func (d *DHT) Bootstrap(ctx context.Context) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap") defer span.End() + d.log.Info("Starting bootstrap") seed := make([]kadt.PeerID, len(d.cfg.BootstrapPeers)) for i, addrInfo := range d.cfg.BootstrapPeers { diff --git a/v2/stream.go b/v2/stream.go index e9e747a3..35aee6e2 100644 --- a/v2/stream.go +++ b/v2/stream.go @@ -42,18 +42,18 @@ func (d *DHT) streamHandler(s network.Stream) { if err := s.Scope().SetService(ServiceName); err != nil { d.log.LogAttrs(ctx, slog.LevelWarn, "error attaching stream to DHT service", slog.String("err", err.Error())) - d.logErr(s.Reset(), "failed to reset stream") + d.warnErr(s.Reset(), "failed to reset stream") span.RecordError(err) return } if err := d.handleNewStream(ctx, s); err != nil { // If we exited with an error, let the remote peer know. - d.logErr(s.Reset(), "failed to reset stream") + d.warnErr(s.Reset(), "failed to reset stream") span.RecordError(err) } else { // If we exited without an error, close gracefully. - d.logErr(s.Close(), "failed to close stream") + d.warnErr(s.Close(), "failed to close stream") } } diff --git a/v2/tele/log.go b/v2/tele/log.go index cc65f7e4..b91426a9 100644 --- a/v2/tele/log.go +++ b/v2/tele/log.go @@ -2,10 +2,33 @@ package tele import ( logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" "go.uber.org/zap/exp/zapslog" "golang.org/x/exp/slog" ) -func DefaultLogger(system string) *slog.Logger { - return slog.New(zapslog.NewHandler(logging.Logger(system).Desugar().Core())) +func DefaultLogger(name string) *slog.Logger { + return slog.New(zapslog.NewHandler(logging.Logger(name).Desugar().Core())) +} + +// Attributes that can be used with logging or tracing +const ( + AttrKeyError = "error" + AttrKeyPeerID = "peer_id" + AttrKeyKey = "key" + AttrKeyCacheHit = "hit" + AttrKeyInEvent = "in_event" + AttrKeyOutEvent = "out_event" +) + +func LogAttrError(err error) slog.Attr { + return slog.Attr{Key: AttrKeyError, Value: slog.AnyValue(err)} +} + +func LogAttrPeerID(id kadt.PeerID) slog.Attr { + return slog.String(AttrKeyPeerID, id.String()) +} + +func LogAttrKey(kk kadt.Key) slog.Attr { + return slog.String(AttrKeyKey, kk.HexString()) } diff --git a/v2/tele/tele.go b/v2/tele/tele.go index 0cd4fac7..78aecc98 100644 --- a/v2/tele/tele.go +++ b/v2/tele/tele.go @@ -79,11 +79,11 @@ func AttrInstanceID(instanceID string) attribute.KeyValue { } func AttrPeerID(pid string) attribute.KeyValue { - return attribute.String("peer_id", pid) + return attribute.String(AttrKeyPeerID, pid) } func AttrCacheHit(hit bool) attribute.KeyValue { - return attribute.Bool("hit", hit) + return attribute.Bool(AttrKeyCacheHit, hit) } // AttrRecordType is currently only used for the provider backend LRU cache @@ -101,12 +101,12 @@ func AttrKey(val string) attribute.KeyValue { // AttrInEvent creates an attribute that records the type of an event func AttrInEvent(t any) attribute.KeyValue { - return attribute.String("in_event", fmt.Sprintf("%T", t)) + return attribute.String(AttrKeyInEvent, fmt.Sprintf("%T", t)) } // AttrOutEvent creates an attribute that records the type of an event being returned func AttrOutEvent(t any) attribute.KeyValue { - return attribute.String("out_event", fmt.Sprintf("%T", t)) + return attribute.String(AttrKeyOutEvent, fmt.Sprintf("%T", t)) } // WithAttributes is a function that attaches the provided attributes to the