Skip to content

Commit

Permalink
fix(routing/dht) _always_ close chan on exit of FindProvidersAsync
Browse files Browse the repository at this point in the history
the important change here is that within FindProvidersAsync, the channel
is closed using a `defer`. This ensures the channel is always closed,
regardless of the path taken to exit.

+ misc cleanup

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
  • Loading branch information
Brian Tiger Chow committed Dec 2, 2014
1 parent 9ba2ecb commit 0faf926
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
9 changes: 3 additions & 6 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,21 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa

start := time.Now()

log.Event(ctx, "sentMessage", dht.self, p, pmes)

rmes, err := dht.sender.SendRequest(ctx, mes)
rmes, err := dht.sender.SendRequest(ctx, mes) // respect?
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
log.Event(ctx, "sentMessage", dht.self, p, pmes)

rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
rmes.Peer().SetLatency(time.Since(start))

rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}

return rpmes, nil
}

Expand Down
30 changes: 19 additions & 11 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,27 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
peerOut := make(chan peer.Peer, count)
go func() {
defer close(peerOut)

ps := newPeerSet()
// TODO may want to make this function async to hide latency
provs := dht.providers.GetProviders(key)
for _, p := range provs {
count--
// NOTE: assuming that this list of peers is unique
ps.Add(p)
peerOut <- p
select {
case peerOut <- p:
case <-ctx.Done():
return
}
if count <= 0 {
return
}
}

wg := new(sync.WaitGroup)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
var wg sync.WaitGroup
for _, pp := range dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) {
wg.Add(1)
go func(p peer.Peer) {
defer wg.Done()
Expand All @@ -155,16 +161,16 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}(pp)
}
wg.Wait()
close(peerOut)
}()
return peerOut
}

func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{})
var wg sync.WaitGroup
for _, pbp := range peers {
wg.Add(1)
go func(mp *pb.Message_Peer) {
defer func() { done <- struct{}{} }()
defer wg.Done()
// construct new peer
p, err := dht.ensureConnectedToPeer(ctx, mp)
if err != nil {
Expand All @@ -178,15 +184,17 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M

dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
select {
case out <- p:
case <-ctx.Done():
return
}
} else if ps.Size() >= count {
return
}
}(pbp)
}
for _ = range peers {
<-done
}
wg.Wait()
}

// Find specific Peer
Expand Down

0 comments on commit 0faf926

Please sign in to comment.