Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging improvements #941

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

// DHT is an implementation of Kademlia with S/Kademlia modifications.
Expand Down Expand Up @@ -95,7 +96,7 @@
} else if cfg.ProtocolID == ProtocolIPFS {
d.backends, err = d.initAminoBackends()
if err != nil {
return nil, fmt.Errorf("init amino backends: %w", err)

Check warning on line 99 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L99

Added line #L99 was not covered by tests
}
}

Expand All @@ -106,13 +107,23 @@

// instantiate a new Kademlia DHT coordinator.
coordCfg := coord.DefaultCoordinatorConfig()
coordCfg.Clock = cfg.Clock
coordCfg.Logger = cfg.Logger
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider

coordCfg.Query.Clock = cfg.Clock
coordCfg.Query.Logger = cfg.Logger.With("behaviour", "pooledquery")
coordCfg.Query.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Query.Concurrency = cfg.Query.Concurrency
coordCfg.Query.Timeout = cfg.Query.Timeout
coordCfg.Query.RequestConcurrency = cfg.Query.RequestConcurrency
coordCfg.Query.RequestTimeout = cfg.Query.RequestTimeout
coordCfg.Clock = cfg.Clock
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider

coordCfg.Routing.Clock = cfg.Clock
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)

d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), &router{host: h, ProtocolID: cfg.ProtocolID}, d.rt, coordCfg)
if err != nil {
Expand Down Expand Up @@ -152,18 +163,18 @@
)

if d.cfg.Datastore != nil {
dstore = d.cfg.Datastore

Check warning on line 166 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L166

Added line #L166 was not covered by tests
} else if dstore, err = InMemoryDatastore(); err != nil {
return nil, fmt.Errorf("new default datastore: %w", err)
}

Check warning on line 169 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L168-L169

Added lines #L168 - L169 were not covered by tests

// wrap datastore in open telemetry tracing
dstore = trace.New(dstore, d.tele.Tracer)

pbeCfg, err := DefaultProviderBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 177 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L176-L177

Added lines #L176 - L177 were not covered by tests
pbeCfg.Logger = d.cfg.Logger
pbeCfg.AddressFilter = d.cfg.AddressFilter
pbeCfg.Tele = d.tele
Expand All @@ -171,26 +182,26 @@

pbe, err := NewBackendProvider(d.host.Peerstore(), dstore, pbeCfg)
if err != nil {
return nil, fmt.Errorf("new provider backend: %w", err)
}

Check warning on line 186 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L185-L186

Added lines #L185 - L186 were not covered by tests

rbeCfg, err := DefaultRecordBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 191 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L190-L191

Added lines #L190 - L191 were not covered by tests
rbeCfg.Logger = d.cfg.Logger
rbeCfg.Tele = d.tele
rbeCfg.clk = d.cfg.Clock

ipnsBe, err := NewBackendIPNS(dstore, d.host.Peerstore(), rbeCfg)
if err != nil {
return nil, fmt.Errorf("new ipns backend: %w", err)
}

Check warning on line 199 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L198-L199

Added lines #L198 - L199 were not covered by tests

pkBe, err := NewBackendPublicKey(dstore, rbeCfg)
if err != nil {
return nil, fmt.Errorf("new public key backend: %w", err)
}

Check warning on line 204 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L203-L204

Added lines #L203 - L204 were not covered by tests

return map[string]Backend{
namespaceIPNS: ipnsBe,
Expand All @@ -202,11 +213,11 @@
// Close cleans up all resources associated with this DHT.
func (d *DHT) Close() error {
if err := d.sub.Close(); err != nil {
d.log.With("err", err).Debug("failed closing event bus subscription")
d.debugErr(err, "failed closing event bus subscription")

Check warning on line 216 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L216

Added line #L216 was not covered by tests
}

if err := d.kad.Close(); err != nil {
d.log.With("err", err).Debug("failed closing coordinator")
d.debugErr(err, "failed closing coordinator")

Check warning on line 220 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L220

Added line #L220 was not covered by tests
}

for ns, b := range d.backends {
Expand All @@ -216,7 +227,7 @@
}

if err := closer.Close(); err != nil {
d.log.Warn("failed closing backend", "namespace", ns, "err", err.Error())
d.warnErr(err, "failed closing backend", "namespace", ns)

Check warning on line 230 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L230

Added line #L230 was not covered by tests
}
}

Expand All @@ -230,7 +241,7 @@
if d.cfg.ProtocolID == ProtocolIPFS && d.cfg.Datastore == nil {
if pbe, err := typedBackend[*ProvidersBackend](d, namespaceProviders); err == nil {
if err := pbe.datastore.Close(); err != nil {
d.log.Warn("failed closing in memory datastore", "err", err.Error())
d.warnErr(err, "failed closing in memory datastore")

Check warning on line 244 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L244

Added line #L244 was not covered by tests
}
}
}
Expand All @@ -244,7 +255,7 @@
}

if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
d.debugErr(err, "failed closing stream")

Check warning on line 258 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L258

Added line #L258 was not covered by tests
}
}
}
Expand Down Expand Up @@ -302,21 +313,41 @@
}

if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
d.debugErr(err, "failed closing stream")

Check warning on line 316 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L316

Added line #L316 was not covered by tests
}
}
}
}

// logErr is a helper method that uses the slogger of the DHT and writes a
// warning log line with the given message alongside the error. If the error
// warnErr is a helper method that uses the slogger of the DHT and writes a
// warning log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) logErr(err error, msg string) {
func (d *DHT) warnErr(err error, msg string, args ...any) {
if err == nil {
return
}

d.log.Warn(msg, "err", err.Error())
if len(args) == 0 {
d.log.Warn(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Warn(msg, tele.LogAttrError(err))

Check warning on line 335 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L335

Added line #L335 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, but now that we have helper methods for logging error messages we could use the more performant LogAttrs method here. Error logs shouldn't live in the hot path anyway so feel free to ignore.

}

// debugErr is a helper method that uses the slogger of the DHT and writes a
// debug log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) debugErr(err error, msg string, args ...any) {
if err == nil {
return
}
if len(args) == 0 {
d.log.Debug(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Debug(msg, tele.LogAttrError(err))

Check warning on line 350 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L342-L350

Added lines #L342 - L350 were not covered by tests
}

// AddAddresses suggests peers and their associated addresses to be added to the routing table.
Expand Down
15 changes: 15 additions & 0 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,33 +154,33 @@

func NewCoordinator(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 157 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L157

Added line #L157 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 160 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L159-L160

Added lines #L159 - L160 were not covered by tests

// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 166 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L165-L166

Added lines #L165 - L166 were not covered by tests

queryBehaviour, err := NewPooledQueryBehaviour(self, &cfg.Query)
if err != nil {
return nil, fmt.Errorf("query behaviour: %w", err)
}

Check warning on line 171 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L170-L171

Added lines #L170 - L171 were not covered by tests

routingBehaviour, err := NewRoutingBehaviour(self, rt, &cfg.Routing)
if err != nil {
return nil, fmt.Errorf("routing behaviour: %w", err)
}

Check warning on line 176 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L175-L176

Added lines #L175 - L176 were not covered by tests

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer)

b, err := brdcst.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, nil)
if err != nil {
return nil, fmt.Errorf("broadcast: %w", err)
}

Check warning on line 183 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L182-L183

Added lines #L182 - L183 were not covered by tests

brdcstBehaviour := NewPooledBroadcastBehaviour(b, cfg.Logger, tele.Tracer)

Expand Down Expand Up @@ -215,8 +215,8 @@
return nil
}

func (c *Coordinator) ID() kadt.PeerID {
return c.self

Check warning on line 219 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L218-L219

Added lines #L218 - L219 were not covered by tests
}

func (c *Coordinator) eventLoop(ctx context.Context) {
Expand All @@ -233,14 +233,14 @@
case <-ctx.Done():
// coordinator is closing
return
case <-c.networkBehaviour.Ready():
ev, ok = c.networkBehaviour.Perform(ctx)

Check warning on line 237 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L236-L237

Added lines #L236 - L237 were not covered by tests
case <-c.routingBehaviour.Ready():
ev, ok = c.routingBehaviour.Perform(ctx)
case <-c.queryBehaviour.Ready():
ev, ok = c.queryBehaviour.Perform(ctx)
case <-c.brdcstBehaviour.Ready():
ev, ok = c.brdcstBehaviour.Perform(ctx)

Check warning on line 243 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L242-L243

Added lines #L242 - L243 were not covered by tests
}

if ok {
Expand All @@ -256,10 +256,10 @@
switch ev := ev.(type) {
case NetworkCommand:
c.networkBehaviour.Notify(ctx, ev)
case QueryCommand:
c.queryBehaviour.Notify(ctx, ev)
case BrdcstCommand:
c.brdcstBehaviour.Notify(ctx, ev)

Check warning on line 262 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L259-L262

Added lines #L259 - L262 were not covered by tests
case RoutingCommand:
c.routingBehaviour.Notify(ctx, ev)
case RoutingNotification:
Expand All @@ -267,8 +267,8 @@
rn := c.routingNotifier
c.routingNotifierMu.RUnlock()
rn.Notify(ctx, ev)
default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 271 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L270-L271

Added lines #L270 - L271 were not covered by tests
}
}

Expand All @@ -289,8 +289,8 @@

nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 293 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L292-L293

Added lines #L292 - L293 were not covered by tests
return nh, nil
}

Expand All @@ -303,8 +303,8 @@
for _, id := range closest {
nh, err := c.networkBehaviour.getNodeHandler(ctx, id)
if err != nil {
return nil, err
}

Check warning on line 307 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L306-L307

Added lines #L306 - L307 were not covered by tests
nodes = append(nodes, nh)
}
return nodes, nil
Expand All @@ -312,14 +312,14 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error) {
panic("not implemented")

Check warning on line 316 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L315-L316

Added lines #L315 - L316 were not covered by tests
}

// PutValue requests that the node stores a value to be associated with the supplied key.
// If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.
func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error {
panic("not implemented")

Check warning on line 322 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}

// QueryClosest starts a query that attempts to find the closest nodes to the target key.
Expand All @@ -336,14 +336,15 @@
func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query")
defer span.End()
c.cfg.Logger.Debug("starting query for closest nodes", tele.LogAttrKey(target))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, target, 20)
if err != nil {
return nil, coordt.QueryStats{}, err
}

Check warning on line 347 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L346-L347

Added lines #L346 - L347 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand Down Expand Up @@ -381,18 +382,22 @@
func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coordt.QueryFunc, numResults int) (coordt.QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage")
defer span.End()
if msg == nil {
return coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
}

Check warning on line 387 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L386-L387

Added lines #L386 - L387 were not covered by tests
c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if numResults < 1 {
numResults = 20 // TODO: parameterize
}

Check warning on line 395 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L394-L395

Added lines #L394 - L395 were not covered by tests

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
if err != nil {
return coordt.QueryStats{}, err
}

Check warning on line 400 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L399-L400

Added lines #L399 - L400 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
Expand Down Expand Up @@ -421,19 +426,23 @@
func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.BroadcastRecord")
defer span.End()
if msg == nil {
return fmt.Errorf("no message supplied for broadcast")
}

Check warning on line 431 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L430-L431

Added lines #L430 - L431 were not covered by tests
c.cfg.Logger.Debug("starting broadcast with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
if err != nil {
return err
}

Check warning on line 440 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L439-L440

Added lines #L439 - L440 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, s.ID())
}

Check warning on line 445 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L444-L445

Added lines #L444 - L445 were not covered by tests

waiter := NewWaiter[BehaviourEvent]()
queryID := c.newOperationID()
Expand All @@ -452,8 +461,8 @@

_, _, err = c.waitForBroadcast(ctx, waiter)

return err
}

Check warning on line 465 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L464-L465

Added lines #L464 - L465 were not covered by tests

func (c *Coordinator) waitForQuery(ctx context.Context, queryID coordt.QueryID, waiter *Waiter[BehaviourEvent], fn coordt.QueryFunc) ([]kadt.PeerID, coordt.QueryStats, error) {
var lastStats coordt.QueryStats
Expand All @@ -461,28 +470,31 @@
select {
case <-ctx.Done():
return nil, lastStats, ctx.Err()
case wev := <-waiter.Chan():

Check warning on line 473 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L473

Added line #L473 was not covered by tests
ctx, ev := wev.Ctx, wev.Event
switch ev := ev.(type) {
case *EventQueryProgressed:
c.cfg.Logger.Debug("query made progress", "query_id", queryID, tele.LogAttrPeerID(ev.NodeID), slog.Duration("elapsed", c.cfg.Clock.Since(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure))
lastStats = coordt.QueryStats{
Start: ev.Stats.Start,
Requests: ev.Stats.Requests,
Success: ev.Stats.Success,

Check warning on line 481 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L480-L481

Added lines #L480 - L481 were not covered by tests
Failure: ev.Stats.Failure,
}
nh, err := c.networkBehaviour.getNodeHandler(ctx, ev.NodeID)
if err != nil {
// ignore unknown node
c.cfg.Logger.Debug("node handler not found", "query_id", queryID, tele.LogAttrError, err)
break
}

err = fn(ctx, nh.ID(), ev.Response, lastStats)
if errors.Is(err, coordt.ErrSkipRemaining) {
// done
c.cfg.Logger.Debug("query done", "query_id", queryID)
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return nil, lastStats, nil
}

Check warning on line 497 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L495-L497

Added lines #L495 - L497 were not covered by tests
if err != nil {
// user defined error that terminates the query
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
Expand All @@ -492,10 +504,11 @@
case *EventQueryFinished:
// query is done
lastStats.Exhausted = true
c.cfg.Logger.Debug("query ran to exhaustion", "query_id", queryID, slog.Duration("elapsed", ev.Stats.End.Sub(ev.Stats.Start)), slog.Int("requests", ev.Stats.Requests), slog.Int("failures", ev.Stats.Failure))
return ev.ClosestNodes, lastStats, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))

Check warning on line 511 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L508-L511

Added lines #L508 - L511 were not covered by tests
}
}
}
Expand All @@ -503,8 +516,8 @@

func (c *Coordinator) waitForBroadcast(ctx context.Context, waiter *Waiter[BehaviourEvent]) ([]kadt.PeerID, map[string]struct {
Node kadt.PeerID
Err error
}, error,

Check warning on line 520 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L519-L520

Added lines #L519 - L520 were not covered by tests
) {
for {
select {
Expand All @@ -517,16 +530,16 @@
return ev.Contacted, ev.Errors, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))
}

Check warning on line 534 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L533-L534

Added lines #L533 - L534 were not covered by tests
}
}
}

Check warning on line 537 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L537

Added line #L537 was not covered by tests

// AddNodes suggests new DHT nodes to be added to the routing table.
// If the routing table is updated as a result of this operation an EventRoutingUpdated notification
// is emitted on the routing notification channel.
func (c *Coordinator) AddNodes(ctx context.Context, ids []kadt.PeerID) error {

Check warning on line 542 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L541-L542

Added lines #L541 - L542 were not covered by tests
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.AddNodes")
defer span.End()
for _, id := range ids {
Expand All @@ -540,8 +553,8 @@
})

}

return nil

Check warning on line 557 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L556-L557

Added lines #L556 - L557 were not covered by tests
}

// Bootstrap instructs the dht to begin bootstrapping the routing table.
Expand All @@ -562,6 +575,7 @@
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity")
defer span.End()

c.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(id), "source", "notify")
c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{
NodeID: id,
})
Expand All @@ -575,20 +589,21 @@
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(id), "source", "notify")
c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil
}

func (c *Coordinator) newOperationID() coordt.QueryID {
next := c.lastQueryID.Add(1)
return coordt.QueryID(fmt.Sprintf("%016x", next))
}

// A BufferedRoutingNotifier is a [RoutingNotifier] that buffers [RoutingNotification] events and provides methods
// to expect occurrences of specific events. It is designed for use in a test environment.

Check warning on line 606 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L597-L606

Added lines #L597 - L606 were not covered by tests
type BufferedRoutingNotifier struct {
mu sync.Mutex
buffered []RoutingNotification
Expand Down Expand Up @@ -636,8 +651,8 @@

// ExpectRoutingUpdated blocks until an [EventRoutingUpdated] event is seen for the specified peer id
func (w *BufferedRoutingNotifier) ExpectRoutingUpdated(ctx context.Context, id kadt.PeerID) (*EventRoutingUpdated, error) {
for {
// look in buffered events

Check warning on line 655 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L654-L655

Added lines #L654 - L655 were not covered by tests
w.mu.Lock()
for i, ev := range w.buffered {
if tev, ok := ev.(*EventRoutingUpdated); ok {
Expand All @@ -662,8 +677,8 @@

// ExpectRoutingRemoved blocks until an [EventRoutingRemoved] event is seen for the specified peer id
func (w *BufferedRoutingNotifier) ExpectRoutingRemoved(ctx context.Context, id kadt.PeerID) (*EventRoutingRemoved, error) {
for {
// look in buffered events

Check warning on line 681 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L680-L681

Added lines #L680 - L681 were not covered by tests
w.mu.Lock()
for i, ev := range w.buffered {
if tev, ok := ev.(*EventRoutingRemoved); ok {
Expand All @@ -688,4 +703,4 @@

type nullRoutingNotifier struct{}

func (nullRoutingNotifier) Notify(context.Context, RoutingNotification) {}

Check warning on line 706 in v2/internal/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/coordinator.go#L706

Added line #L706 was not covered by tests
4 changes: 2 additions & 2 deletions v2/internal/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
SeedNodes []kadt.PeerID
}

func (*EventStartBootstrap) behaviourEvent() {}
func (*EventStartBootstrap) routingCommand() {}

Check warning on line 58 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L57-L58

Added lines #L57 - L58 were not covered by tests

type EventOutboundGetCloserNodes struct {
QueryID coordt.QueryID
Expand All @@ -64,9 +64,9 @@
Notify Notify[BehaviourEvent]
}

func (*EventOutboundGetCloserNodes) behaviourEvent() {}
func (*EventOutboundGetCloserNodes) nodeHandlerRequest() {}
func (*EventOutboundGetCloserNodes) networkCommand() {}

Check warning on line 69 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L67-L69

Added lines #L67 - L69 were not covered by tests

type EventOutboundSendMessage struct {
QueryID coordt.QueryID
Expand All @@ -75,9 +75,9 @@
Notify Notify[BehaviourEvent]
}

func (*EventOutboundSendMessage) behaviourEvent() {}
func (*EventOutboundSendMessage) nodeHandlerRequest() {}
func (*EventOutboundSendMessage) networkCommand() {}

Check warning on line 80 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L78-L80

Added lines #L78 - L80 were not covered by tests

type EventStartMessageQuery struct {
QueryID coordt.QueryID
Expand All @@ -88,8 +88,8 @@
NumResults int // the minimum number of nodes to successfully contact before considering iteration complete
}

func (*EventStartMessageQuery) behaviourEvent() {}
func (*EventStartMessageQuery) queryCommand() {}

Check warning on line 92 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L91-L92

Added lines #L91 - L92 were not covered by tests

type EventStartFindCloserQuery struct {
QueryID coordt.QueryID
Expand All @@ -99,23 +99,23 @@
NumResults int // the minimum number of nodes to successfully contact before considering iteration complete
}

func (*EventStartFindCloserQuery) behaviourEvent() {}
func (*EventStartFindCloserQuery) queryCommand() {}

Check warning on line 103 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L102-L103

Added lines #L102 - L103 were not covered by tests

type EventStopQuery struct {
QueryID coordt.QueryID
}

func (*EventStopQuery) behaviourEvent() {}
func (*EventStopQuery) queryCommand() {}

Check warning on line 110 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L109-L110

Added lines #L109 - L110 were not covered by tests

// EventAddNode notifies the routing behaviour of a potential new peer.
type EventAddNode struct {
NodeID kadt.PeerID
}

func (*EventAddNode) behaviourEvent() {}
func (*EventAddNode) routingCommand() {}

Check warning on line 118 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L117-L118

Added lines #L117 - L118 were not covered by tests

// EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has produced a successful response.
Expand All @@ -126,8 +126,8 @@
CloserNodes []kadt.PeerID
}

func (*EventGetCloserNodesSuccess) behaviourEvent() {}
func (*EventGetCloserNodesSuccess) nodeHandlerResponse() {}

Check warning on line 130 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L129-L130

Added lines #L129 - L130 were not covered by tests

// EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has failed to produce a valid response.
Expand All @@ -138,8 +138,8 @@
Err error
}

func (*EventGetCloserNodesFailure) behaviourEvent() {}
func (*EventGetCloserNodesFailure) nodeHandlerResponse() {}

Check warning on line 142 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L141-L142

Added lines #L141 - L142 were not covered by tests

// EventSendMessageSuccess notifies a behaviour that a SendMessage request, initiated by an
// [EventOutboundSendMessage] event has produced a successful response.
Expand All @@ -151,8 +151,8 @@
CloserNodes []kadt.PeerID
}

func (*EventSendMessageSuccess) behaviourEvent() {}
func (*EventSendMessageSuccess) nodeHandlerResponse() {}

Check warning on line 155 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L154-L155

Added lines #L154 - L155 were not covered by tests

// EventSendMessageFailure notifies a behaviour that a SendMessage request, initiated by an
// [EventOutboundSendMessage] event has failed to produce a valid response.
Expand All @@ -164,8 +164,8 @@
Err error
}

func (*EventSendMessageFailure) behaviourEvent() {}
func (*EventSendMessageFailure) nodeHandlerResponse() {}

Check warning on line 168 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L167-L168

Added lines #L167 - L168 were not covered by tests

// EventQueryProgressed is emitted by the coordinator when a query has received a
// response from a node.
Expand All @@ -176,7 +176,7 @@
Stats query.QueryStats
}

func (*EventQueryProgressed) behaviourEvent() {}

Check warning on line 179 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L179

Added line #L179 was not covered by tests

// EventQueryFinished is emitted by the coordinator when a query has finished, either through
// running to completion or by being canceled.
Expand All @@ -186,23 +186,23 @@
ClosestNodes []kadt.PeerID
}

func (*EventQueryFinished) behaviourEvent() {}

Check warning on line 189 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L189

Added line #L189 was not covered by tests

// EventRoutingUpdated is emitted by the coordinator when a new node has been verified and added to the routing table.
type EventRoutingUpdated struct {
NodeID kadt.PeerID
}

func (*EventRoutingUpdated) behaviourEvent() {}
func (*EventRoutingUpdated) routingNotification() {}

Check warning on line 197 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L196-L197

Added lines #L196 - L197 were not covered by tests

// EventRoutingRemoved is emitted by the coordinator when new node has been removed from the routing table.
type EventRoutingRemoved struct {
NodeID kadt.PeerID
}

func (*EventRoutingRemoved) behaviourEvent() {}
func (*EventRoutingRemoved) routingNotification() {}

Check warning on line 205 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L204-L205

Added lines #L204 - L205 were not covered by tests

// EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through
// running to completion or by being canceled.
Expand All @@ -210,8 +210,8 @@
Stats query.QueryStats
}

func (*EventBootstrapFinished) behaviourEvent() {}
func (*EventBootstrapFinished) routingNotification() {}

Check warning on line 214 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L213-L214

Added lines #L213 - L214 were not covered by tests

// EventNotifyConnectivity notifies a behaviour that a peer's connectivity and support for finding closer nodes
// has been confirmed such as from a successful query response or an inbound query. This should not be used for
Expand All @@ -221,8 +221,8 @@
NodeID kadt.PeerID
}

func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingNotification() {}
func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingCommand() {}

Check warning on line 225 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L224-L225

Added lines #L224 - L225 were not covered by tests

// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support
// finding closer nodes is known.
Expand All @@ -230,11 +230,11 @@
NodeID kadt.PeerID
}

func (*EventNotifyNonConnectivity) behaviourEvent() {}
func (*EventNotifyNonConnectivity) routingCommand() {}

Check warning on line 234 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L233-L234

Added lines #L233 - L234 were not covered by tests

// EventRoutingPoll notifies a routing behaviour that it may proceed with any pending work.
type EventRoutingPoll struct{}

func (*EventRoutingPoll) behaviourEvent() {}
func (*EventRoutingPoll) routingCommand() {}

Check warning on line 240 in v2/internal/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/event.go#L239-L240

Added lines #L239 - L240 were not covered by tests
2 changes: 2 additions & 0 deletions v2/internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@

func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQueryBehaviour, error) {
if cfg == nil {
cfg = DefaultPooledQueryConfig()

Check warning on line 122 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L122

Added line #L122 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 125 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L124-L125

Added lines #L124 - L125 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
Expand All @@ -133,8 +133,8 @@

pool, err := query.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}

Check warning on line 137 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L136-L137

Added lines #L136 - L137 were not covered by tests

h := &PooledQueryBehaviour{
cfg: *cfg,
Expand Down Expand Up @@ -200,6 +200,7 @@
}
case *EventGetCloserNodesFailure:
// queue an event that will notify the routing behaviour of a failed node
p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query")
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To,
})
Expand Down Expand Up @@ -231,6 +232,7 @@
}
case *EventSendMessageFailure:
// queue an event that will notify the routing behaviour of a failed node
p.cfg.Logger.Debug("peer has no connectivity", tele.LogAttrPeerID(ev.To), "source", "query")
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To,
})
Expand All @@ -240,8 +242,8 @@
QueryID: ev.QueryID,
Error: ev.Err,
}
default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))

Check warning on line 246 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L245-L246

Added lines #L245 - L246 were not covered by tests
}

// attempt to advance the query pool
Expand Down Expand Up @@ -285,14 +287,14 @@
}

// attempt to advance the query pool
ev, ok := p.advancePool(ctx, &query.EventPoolPoll{})
if ok {
return ev, true
}

Check warning on line 293 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L290-L293

Added lines #L290 - L293 were not covered by tests

if len(p.pending) == 0 {
return nil, false
}

Check warning on line 297 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L295-L297

Added lines #L295 - L297 were not covered by tests
}
}

Expand All @@ -319,9 +321,9 @@
Message: st.Message,
Notify: p,
}, true
case *query.StatePoolWaitingAtCapacity:

Check warning on line 324 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L324

Added line #L324 was not covered by tests
// nothing to do except wait for message response or timeout
case *query.StatePoolWaitingWithCapacity:

Check warning on line 326 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L326

Added line #L326 was not covered by tests
// nothing to do except wait for message response or timeout
case *query.StatePoolQueryFinished[kadt.Key, kadt.PeerID]:
waiter, ok := p.waiters[st.QueryID]
Expand All @@ -333,12 +335,12 @@
})
waiter.Close()
}
case *query.StatePoolQueryTimeout:

Check warning on line 338 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L338

Added line #L338 was not covered by tests
// TODO
case *query.StatePoolIdle:
// nothing to do
default:
panic(fmt.Sprintf("unexpected pool state: %T", st))

Check warning on line 343 in v2/internal/coord/query.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/query.go#L342-L343

Added lines #L342 - L343 were not covered by tests
}

return nil, false
Expand Down
10 changes: 9 additions & 1 deletion v2/internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@

func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *RoutingConfig) (*RoutingBehaviour, error) {
if cfg == nil {
cfg = DefaultRoutingConfig()

Check warning on line 312 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L312

Added line #L312 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 315 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L314-L315

Added lines #L314 - L315 were not covered by tests

bootstrapCfg := routing.DefaultBootstrapConfig()
bootstrapCfg.Clock = cfg.Clock
Expand All @@ -324,8 +324,8 @@

bootstrap, err := routing.NewBootstrap[kadt.Key](self, bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}

Check warning on line 328 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L327-L328

Added lines #L327 - L328 were not covered by tests

includeCfg := routing.DefaultIncludeConfig()
includeCfg.Clock = cfg.Clock
Expand All @@ -337,8 +337,8 @@

include, err := routing.NewInclude[kadt.Key, kadt.PeerID](rt, includeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}

Check warning on line 341 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L340-L341

Added lines #L340 - L341 were not covered by tests

probeCfg := routing.DefaultProbeConfig()
probeCfg.Clock = cfg.Clock
Expand All @@ -350,8 +350,8 @@

probe, err := routing.NewProbe[kadt.Key](rt, probeCfg)
if err != nil {
return nil, fmt.Errorf("probe: %w", err)
}

Check warning on line 354 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L353-L354

Added lines #L353 - L354 were not covered by tests

exploreCfg := routing.DefaultExploreConfig()
exploreCfg.Clock = cfg.Clock
Expand All @@ -363,13 +363,13 @@

schedule, err := routing.NewDynamicExploreSchedule(cfg.ExploreMaximumCpl, cfg.Clock.Now(), cfg.ExploreInterval, cfg.ExploreIntervalMultiplier, cfg.ExploreIntervalJitter)
if err != nil {
return nil, fmt.Errorf("explore schedule: %w", err)
}

Check warning on line 367 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L366-L367

Added lines #L366 - L367 were not covered by tests

explore, err := routing.NewExplore[kadt.Key](self, rt, cplutil.GenRandPeerID, schedule, exploreCfg)
if err != nil {
return nil, fmt.Errorf("explore: %w", err)
}

Check warning on line 372 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L371-L372

Added lines #L371 - L372 were not covered by tests

return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, cfg)
}
Expand All @@ -385,10 +385,10 @@
cfg *RoutingConfig,
) (*RoutingBehaviour, error) {
if cfg == nil {
cfg = DefaultRoutingConfig()

Check warning on line 388 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L388

Added line #L388 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 391 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L390-L391

Added lines #L390 - L391 were not covered by tests

r := &RoutingBehaviour{
self: self,
Expand Down Expand Up @@ -452,8 +452,8 @@
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 456 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L455-L456

Added lines #L455 - L456 were not covered by tests

case *EventGetCloserNodesSuccess:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesSuccess"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
Expand Down Expand Up @@ -483,35 +483,35 @@
NodeID: ev.To,
}
} else {
cmd = &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}
}

Check warning on line 490 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L486-L490

Added lines #L486 - L490 were not covered by tests
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

case ProbeQueryID:
var cmd routing.ProbeEvent
// require that the node responded with at least one closer node
if len(ev.CloserNodes) > 0 {
cmd = &routing.EventProbeConnectivityCheckSuccess[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
}
} else {
cmd = &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}
}

Check warning on line 509 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L497-L509

Added lines #L497 - L509 were not covered by tests
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 514 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L511-L514

Added lines #L511 - L514 were not covered by tests

case routing.ExploreQueryID:
for _, info := range ev.CloserNodes {
Expand All @@ -525,11 +525,11 @@
}
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 529 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L528-L529

Added lines #L528 - L529 were not covered by tests

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))

Check warning on line 532 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L531-L532

Added lines #L531 - L532 were not covered by tests
}
case *EventGetCloserNodesFailure:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
Expand All @@ -543,8 +543,8 @@
// attempt to advance the bootstrap
next, ok := r.advanceBootstrap(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 547 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L546-L547

Added lines #L546 - L547 were not covered by tests
case IncludeQueryID:
cmd := &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Expand All @@ -553,18 +553,18 @@
// attempt to advance the include state machine
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}
case ProbeQueryID:
cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 567 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L556-L567

Added lines #L556 - L567 were not covered by tests
case routing.ExploreQueryID:
cmd := &routing.EventExploreFindCloserFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Expand All @@ -573,18 +573,20 @@
// attempt to advance the explore
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 577 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L576-L577

Added lines #L576 - L577 were not covered by tests

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))

Check warning on line 580 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L579-L580

Added lines #L579 - L580 were not covered by tests
}
case *EventNotifyConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))
// ignore self
if r.self.Equal(ev.NodeID) {
break

Check warning on line 586 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L586

Added line #L586 was not covered by tests
}
r.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(ev.NodeID))

// tell the include state machine in case this is a new peer that could be added to the routing table
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
Expand All @@ -600,8 +602,8 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 606 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L605-L606

Added lines #L605 - L606 were not covered by tests
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

Expand All @@ -611,13 +613,13 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 617 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L616-L617

Added lines #L616 - L617 were not covered by tests
case *EventRoutingPoll:
r.pollChildren(ctx)

default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))

Check warning on line 622 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L621-L622

Added lines #L621 - L622 were not covered by tests
}

if len(r.pending) > 0 {
Expand Down Expand Up @@ -660,8 +662,8 @@

// finally check if any pending events were accumulated in the meantime
if len(r.pending) == 0 {
return nil, false
}

Check warning on line 666 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L665-L666

Added lines #L665 - L666 were not covered by tests
}
}

Expand All @@ -669,13 +671,13 @@
func (r *RoutingBehaviour) pollChildren(ctx context.Context) {
ev, ok := r.advanceBootstrap(ctx, &routing.EventBootstrapPoll{})
if ok {
r.pending = append(r.pending, ev)
}

Check warning on line 675 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L674-L675

Added lines #L674 - L675 were not covered by tests

ev, ok = r.advanceInclude(ctx, &routing.EventIncludePoll{})
if ok {
r.pending = append(r.pending, ev)
}

Check warning on line 680 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L679-L680

Added lines #L679 - L680 were not covered by tests

ev, ok = r.advanceProbe(ctx, &routing.EventProbePoll{})
if ok {
Expand All @@ -702,16 +704,17 @@
Notify: r,
}, true

case *routing.StateBootstrapWaiting:

Check warning on line 707 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L707

Added line #L707 was not covered by tests
// bootstrap waiting for a message response, nothing to do
case *routing.StateBootstrapFinished:
r.cfg.Logger.Debug("bootstrap finished", slog.Duration("elapsed", st.Stats.End.Sub(st.Stats.Start)), slog.Int("requests", st.Stats.Requests), slog.Int("failures", st.Stats.Failure))
return &EventBootstrapFinished{
Stats: st.Stats,
}, true
case *routing.StateBootstrapIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected bootstrap state: %T", st))

Check warning on line 717 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L716-L717

Added lines #L716 - L717 were not covered by tests
}

return nil, false
Expand All @@ -726,6 +729,7 @@
case *routing.StateIncludeConnectivityCheck[kadt.Key, kadt.PeerID]:
span.SetAttributes(attribute.String("out_event", "EventOutboundGetCloserNodes"))
// include wants to send a find node message to a node
r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "include")
return &EventOutboundGetCloserNodes{
QueryID: IncludeQueryID,
To: st.NodeID,
Expand All @@ -743,19 +747,20 @@

// return the event to notify outwards too
span.SetAttributes(attribute.String("out_event", "EventRoutingUpdated"))
r.cfg.Logger.Debug("peer added to routing table", tele.LogAttrPeerID(st.NodeID))
return &EventRoutingUpdated{
NodeID: st.NodeID,
}, true
case *routing.StateIncludeWaitingAtCapacity:

Check warning on line 754 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L754

Added line #L754 was not covered by tests
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingWithCapacity:
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingFull:

Check warning on line 758 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L758

Added line #L758 was not covered by tests
// nothing to do except wait for message response or timeout
case *routing.StateIncludeIdle:
// nothing to do except wait for new nodes to be added to queue
default:
panic(fmt.Sprintf("unexpected include state: %T", st))

Check warning on line 763 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L762-L763

Added lines #L762 - L763 were not covered by tests
}

return nil, false
Expand All @@ -768,6 +773,7 @@
switch st := st.(type) {
case *routing.StateProbeConnectivityCheck[kadt.Key, kadt.PeerID]:
// include wants to send a find node message to a node
r.cfg.Logger.Debug("starting connectivity check", tele.LogAttrPeerID(st.NodeID), "source", "probe")
return &EventOutboundGetCloserNodes{
QueryID: ProbeQueryID,
To: st.NodeID,
Expand All @@ -778,6 +784,7 @@
// a node has failed a connectivity check and been removed from the routing table and the probe list

// emit an EventRoutingRemoved event to notify clients that the node has been removed
r.cfg.Logger.Debug("peer removed from routing table", tele.LogAttrPeerID(st.NodeID))
r.pending = append(r.pending, &EventRoutingRemoved{
NodeID: st.NodeID,
})
Expand All @@ -786,17 +793,17 @@
r.notify(ctx, &EventAddNode{
NodeID: st.NodeID,
})
case *routing.StateProbeWaitingAtCapacity:

Check warning on line 796 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L796

Added line #L796 was not covered by tests
// the probe state machine is waiting for responses for checks and the maximum number of concurrent checks has been reached.
// nothing to do except wait for message response or timeout
case *routing.StateProbeWaitingWithCapacity:

Check warning on line 799 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L799

Added line #L799 was not covered by tests
// the probe state machine is waiting for responses for checks but has capacity to perform more
// nothing to do except wait for message response or timeout
case *routing.StateProbeIdle:
// the probe state machine is not running any checks.
// nothing to do except wait for message response or timeout
default:
panic(fmt.Sprintf("unexpected include state: %T", st))

Check warning on line 806 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L805-L806

Added lines #L805 - L806 were not covered by tests
}

return nil, false
Expand All @@ -809,6 +816,7 @@
switch st := bstate.(type) {

case *routing.StateExploreFindCloser[kadt.Key, kadt.PeerID]:
r.cfg.Logger.Debug("starting explore", slog.Int("cpl", st.Cpl), tele.LogAttrPeerID(st.NodeID))
return &EventOutboundGetCloserNodes{
QueryID: routing.ExploreQueryID,
To: st.NodeID,
Expand All @@ -816,18 +824,18 @@
Notify: r,
}, true

case *routing.StateExploreWaiting:

Check warning on line 827 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L827

Added line #L827 was not covered by tests
// explore waiting for a message response, nothing to do
case *routing.StateExploreQueryFinished:

Check warning on line 829 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L829

Added line #L829 was not covered by tests
// nothing to do except notify via telemetry
case *routing.StateExploreQueryTimeout:

Check warning on line 831 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L831

Added line #L831 was not covered by tests
// nothing to do except notify via telemetry
case *routing.StateExploreFailure:
r.cfg.Logger.Warn("explore failure", "cpl", st.Cpl, "error", st.Error)
r.cfg.Logger.Warn("explore failure", slog.Int("cpl", st.Cpl), tele.LogAttrError(st.Error))

Check warning on line 834 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L833-L834

Added lines #L833 - L834 were not covered by tests
case *routing.StateExploreIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected explore state: %T", st))

Check warning on line 838 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L837-L838

Added lines #L837 - L838 were not covered by tests
}

return nil, false
Expand Down
1 change: 1 addition & 0 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
var foundPeer peer.ID
fn := func(ctx context.Context, visited kadt.PeerID, msg *pb.Message, stats coordt.QueryStats) error {
if peer.ID(visited) == id {
foundPeer = peer.ID(visited)
return coordt.ErrSkipRemaining

Check warning on line 51 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L50-L51

Added lines #L50 - L51 were not covered by tests
}
return nil
}
Expand All @@ -62,7 +62,7 @@
return peer.AddrInfo{}, fmt.Errorf("peer record not found")
}

return d.host.Peerstore().PeerInfo(foundPeer), nil

Check warning on line 65 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L65

Added line #L65 was not covered by tests
}

func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
Expand Down Expand Up @@ -140,10 +140,10 @@

ps, ok := stored.(*providerSet)
if !ok {
span.RecordError(err)
d.log.Warn("Stored value is not a provider set", slog.String("cid", c.String()), slog.String("type", fmt.Sprintf("%T", stored)))
return
}

Check warning on line 146 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L143-L146

Added lines #L143 - L146 were not covered by tests

// send all providers onto the out channel until the desired count
// was reached. If no count was specified, continue with network lookup.
Expand Down Expand Up @@ -183,8 +183,8 @@

// actually send the provider information to the user
select {
case <-ctx.Done():
return coordt.ErrSkipRemaining

Check warning on line 187 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L186-L187

Added lines #L186 - L187 were not covered by tests
case out <- provider:
}

Expand All @@ -201,10 +201,10 @@

_, err = d.kad.QueryMessage(ctx, msg, fn, 20) // TODO: parameterize
if err != nil {
span.RecordError(err)
d.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}

Check warning on line 207 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L204-L207

Added lines #L204 - L207 were not covered by tests
}

// PutValue satisfies the [routing.Routing] interface and will add the given
Expand All @@ -219,8 +219,8 @@
// first parse the routing options
rOpt := routing.Options{} // routing config
if err := rOpt.Apply(opts...); err != nil {
return fmt.Errorf("apply routing options: %w", err)
}

Check warning on line 223 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L222-L223

Added lines #L222 - L223 were not covered by tests

// then always store the given value locally
if err := d.putValueLocal(ctx, keyStr, value); err != nil {
Expand All @@ -234,19 +234,19 @@

// construct Kademlia-key. Yes, we hash the complete key string which
// includes the namespace prefix.
msg := &pb.Message{
Type: pb.Message_PUT_VALUE,
Key: []byte(keyStr),
Record: record.MakePutRecord(keyStr, value),
}

// finally, find the closest peers to the target key.
err := d.kad.BroadcastRecord(ctx, msg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}

Check warning on line 247 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L237-L247

Added lines #L237 - L247 were not covered by tests

return nil

Check warning on line 249 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L249

Added line #L249 was not covered by tests
}

// putValueLocal stores a value in the local datastore without querying the network.
Expand Down Expand Up @@ -281,11 +281,11 @@

v, err := d.getValueLocal(ctx, key)
if err == nil {
return v, nil
}

Check warning on line 285 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L284-L285

Added lines #L284 - L285 were not covered by tests
if !errors.Is(err, ds.ErrNotFound) {
return nil, fmt.Errorf("put value locally: %w", err)
}

Check warning on line 288 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L287-L288

Added lines #L287 - L288 were not covered by tests

req := &pb.Message{
Type: pb.Message_GET_VALUE,
Expand All @@ -296,16 +296,16 @@
var value []byte
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error {
if resp == nil {
return nil
}

Check warning on line 300 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L299-L300

Added lines #L299 - L300 were not covered by tests

if resp.GetType() != pb.Message_GET_VALUE {
return nil
}

Check warning on line 304 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L303-L304

Added lines #L303 - L304 were not covered by tests

if string(resp.GetKey()) != key {
return nil
}

Check warning on line 308 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L307-L308

Added lines #L307 - L308 were not covered by tests

value = resp.GetRecord().GetValue()

Expand All @@ -314,8 +314,8 @@

_, err = d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize)
if err != nil {
return nil, fmt.Errorf("failed to run query: %w", err)
}

Check warning on line 318 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L317-L318

Added lines #L317 - L318 were not covered by tests

return value, nil
}
Expand All @@ -342,8 +342,8 @@

rec, ok := val.(*recpb.Record)
if !ok {
return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val)
}

Check warning on line 346 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L345-L346

Added lines #L345 - L346 were not covered by tests

return rec.GetValue(), nil
}
Expand All @@ -356,17 +356,18 @@
}

func (d *DHT) Bootstrap(ctx context.Context) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap")

Check warning on line 359 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L359

Added line #L359 was not covered by tests
defer span.End()
d.log.Info("Starting bootstrap")

seed := make([]kadt.PeerID, len(d.cfg.BootstrapPeers))
for i, addrInfo := range d.cfg.BootstrapPeers {
seed[i] = kadt.PeerID(addrInfo.ID)
// TODO: how to handle TTL if BootstrapPeers become dynamic and don't
// point to stable peers or consist of ephemeral peers that we have
// observed during a previous run.
d.host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL)
}

Check warning on line 370 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L361-L370

Added lines #L361 - L370 were not covered by tests

return d.kad.Bootstrap(ctx, seed)

Check warning on line 372 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L372

Added line #L372 was not covered by tests
}
6 changes: 3 additions & 3 deletions v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@

if err := s.Scope().SetService(ServiceName); err != nil {
d.log.LogAttrs(ctx, slog.LevelWarn, "error attaching stream to DHT service", slog.String("err", err.Error()))
d.logErr(s.Reset(), "failed to reset stream")
d.warnErr(s.Reset(), "failed to reset stream")

Check warning on line 45 in v2/stream.go

View check run for this annotation

Codecov / codecov/patch

v2/stream.go#L45

Added line #L45 was not covered by tests
span.RecordError(err)
return
}

if err := d.handleNewStream(ctx, s); err != nil {
// If we exited with an error, let the remote peer know.
d.logErr(s.Reset(), "failed to reset stream")
d.warnErr(s.Reset(), "failed to reset stream")
span.RecordError(err)
} else {
// If we exited without an error, close gracefully.
d.logErr(s.Close(), "failed to close stream")
d.warnErr(s.Close(), "failed to close stream")
}
}

Expand Down
27 changes: 25 additions & 2 deletions v2/tele/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,33 @@ package tele

import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"
)

func DefaultLogger(system string) *slog.Logger {
return slog.New(zapslog.NewHandler(logging.Logger(system).Desugar().Core()))
func DefaultLogger(name string) *slog.Logger {
return slog.New(zapslog.NewHandler(logging.Logger(name).Desugar().Core()))
}

// Attributes that can be used with logging or tracing
const (
AttrKeyError = "error"
AttrKeyPeerID = "peer_id"
AttrKeyKey = "key"
AttrKeyCacheHit = "hit"
AttrKeyInEvent = "in_event"
AttrKeyOutEvent = "out_event"
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are only used in the tele package we could unexport them

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're designed to be used wherever we need them, such as logging an attribute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all good 👍

)

func LogAttrError(err error) slog.Attr {
return slog.Attr{Key: AttrKeyError, Value: slog.AnyValue(err)}
}

func LogAttrPeerID(id kadt.PeerID) slog.Attr {
return slog.String(AttrKeyPeerID, id.String())
}

func LogAttrKey(kk kadt.Key) slog.Attr {
return slog.String(AttrKeyKey, kk.HexString())
}
8 changes: 4 additions & 4 deletions v2/tele/tele.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func AttrInstanceID(instanceID string) attribute.KeyValue {
}

func AttrPeerID(pid string) attribute.KeyValue {
return attribute.String("peer_id", pid)
return attribute.String(AttrKeyPeerID, pid)
}

func AttrCacheHit(hit bool) attribute.KeyValue {
return attribute.Bool("hit", hit)
return attribute.Bool(AttrKeyCacheHit, hit)
}

// AttrRecordType is currently only used for the provider backend LRU cache
Expand All @@ -101,12 +101,12 @@ func AttrKey(val string) attribute.KeyValue {

// AttrInEvent creates an attribute that records the type of an event
func AttrInEvent(t any) attribute.KeyValue {
return attribute.String("in_event", fmt.Sprintf("%T", t))
return attribute.String(AttrKeyInEvent, fmt.Sprintf("%T", t))
}

// AttrOutEvent creates an attribute that records the type of an event being returned
func AttrOutEvent(t any) attribute.KeyValue {
return attribute.String("out_event", fmt.Sprintf("%T", t))
return attribute.String(AttrKeyOutEvent, fmt.Sprintf("%T", t))
}

// WithAttributes is a function that attaches the provided attributes to the
Expand Down
Loading