Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Sep 20, 2023
1 parent 8317a98 commit d4da2c8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 18 deletions.
6 changes: 4 additions & 2 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,11 @@ func (c *Coordinator) eventLoop(ctx context.Context) {

ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()

for {
var ev BehaviourEvent
var ok bool

select {
case <-ctx.Done():
// coordinator is closing
Expand Down Expand Up @@ -383,7 +385,7 @@ func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn Quer

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, kadt.PeerID(s.ID()))
seedIDs = append(seedIDs, s.ID())
}

waiter := NewWaiter[BehaviourEvent]()
Expand Down Expand Up @@ -432,7 +434,7 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn Quer

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, kadt.PeerID(s.ID()))
seedIDs = append(seedIDs, s.ID())
}

waiter := NewWaiter[BehaviourEvent]()
Expand Down
14 changes: 6 additions & 8 deletions v2/internal/coord/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,19 @@ func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
switch ev := ev.(type) {
case *EventOutboundGetCloserNodes:
b.nodeHandlersMu.Lock()
p := kadt.PeerID(ev.To)
nh, ok := b.nodeHandlers[p]
nh, ok := b.nodeHandlers[ev.To]
if !ok {
nh = NewNodeHandler(p, b.rtr, b.logger, b.tracer)
b.nodeHandlers[p] = nh
nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer)
b.nodeHandlers[ev.To] = nh
}
b.nodeHandlersMu.Unlock()
nh.Notify(ctx, ev)
case *EventOutboundSendMessage:
b.nodeHandlersMu.Lock()
p := kadt.PeerID(ev.To)
nh, ok := b.nodeHandlers[p]
nh, ok := b.nodeHandlers[ev.To]
if !ok {
nh = NewNodeHandler(p, b.rtr, b.logger, b.tracer)
b.nodeHandlers[p] = nh
nh = NewNodeHandler(ev.To, b.rtr, b.logger, b.tracer)
b.nodeHandlers[ev.To] = nh
}
b.nodeHandlersMu.Unlock()
nh.Notify(ctx, ev)
Expand Down
10 changes: 7 additions & 3 deletions v2/internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"

Expand Down Expand Up @@ -189,9 +190,12 @@ func (p *PooledQueryBehaviour) Perform(ctx context.Context) (BehaviourEvent, boo
}
}

func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (BehaviourEvent, bool) {
ctx, span := p.tracer.Start(ctx, "PooledQueryBehaviour.advancePool")
defer span.End()
func (p *PooledQueryBehaviour) advancePool(ctx context.Context, ev query.PoolEvent) (out BehaviourEvent, term bool) {
ctx, span := p.tracer.Start(ctx, "PooledQueryBehaviour.advancePool", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

pstate := p.pool.Advance(ctx, ev)
switch st := pstate.(type) {
Expand Down
22 changes: 17 additions & 5 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-ch
panic("implement me")
}

// PutValue satisfies the [routing.Routing] interface and will add the given
// value to the k-closest nodes to keyStr. The parameter keyStr should have the
// format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To
// identify the closest peers to keyStr, that complete string will be SHA256
// hashed.
func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue")
defer span.End()
Expand All @@ -133,18 +138,25 @@ func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ..
return nil
}

fn := func(ctx context.Context, node coord.Node, stats coord.QueryStats) error {
return nil
}

// construct Kademlia-key. Yes, we hash the complete key string which
// includes the namespace prefix.
h := sha256.Sum256([]byte(keyStr))
kadKey := key.NewKey256(h[:])

_, err := d.kad.Query(ctx, kadKey, fn)
// define the query function that will be called after each request to a
// remote peer.
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coord.QueryStats) error {
return nil
}

Check warning on line 150 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L143-L150

Added lines #L143 - L150 were not covered by tests

// finally, find the closest peers to the target key.
closest, _, err := d.kad.QueryClosest(ctx, kadKey, fn, 20)
if err != nil {
return fmt.Errorf("query error: %w", err)
}

Check warning on line 156 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L153-L156

Added lines #L153 - L156 were not covered by tests

_ = closest

Check warning on line 159 in v2/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/routing.go#L158-L159

Added lines #L158 - L159 were not covered by tests
panic("implement me")
}

Expand Down

0 comments on commit d4da2c8

Please sign in to comment.