Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace timeouts. Instead, use Contexts with Deadlines/Timeouts #33

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
u "github.com/jbenet/go-ipfs/util"

ds "github.com/jbenet/datastore.go"

"time"
)

// PartnerWantListMax is the bound for the number of keys we'll store per
Expand Down Expand Up @@ -75,63 +73,6 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
return bs
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
*blocks.Block, error) {
u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
begin := time.Now()
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)

valchan := make(chan []byte)
after := time.After(tleft)

// TODO: when the data is received, shut down this for loop ASAP
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("getBlock returned: %v\n", err)
return
}
select {
case valchan <- blk:
default:
}
}(p)
}
}()

select {
case blkdata := <-valchan:
close(valchan)
return blocks.NewBlock(blkdata)
case <-after:
return nil, u.ErrTimeout
}
}

func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())

pmes := new(PBMessage)
pmes.Wantlist = []string{string(k)}

after := time.After(timeout)
resp := bs.listener.Listen(string(k), 1, timeout)
smes := swarm.NewMessage(p, pmes)
bs.meschan.Outgoing <- smes

select {
case resp_mes := <-resp:
return resp_mes.Data, nil
case <-after:
u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
return nil, u.ErrTimeout
}
}

// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
Expand Down
134 changes: 134 additions & 0 deletions bitswap/get_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package bitswap

import (
context "code.google.com/p/go.net/context"

blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"

"errors"
"time"
)

const (
MaxProvidersForGetBlock = 20
)

/* GetBlock attempts to retrieve the block given by |k| within the timeout
* period enforced by |ctx|.
*
* Once a result is obtained, sends cancellation signal to remaining async
* workers.
*/
func (bs *BitSwap) GetBlock(ctx context.Context, k u.Key) (
*blocks.Block, error) {
u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())

var block *blocks.Block
var err error
err = bs.emitBlockData(ctx, k, func(blockData []byte, err error) error {
if err != nil {
// TODO(brian): optionally log err
}
block, err = blocks.NewBlock(blockData)
if err != nil {
return err
}
return nil
})

if err != nil {
return nil, err
}
return block, nil
}

/* Asynchronously retrieves blockData providers. For each provider, retrieves
* block data. Results collected in blockDataChan and errChan are emitted to
* |f|.
*
* If |f| returns nil, the fan-out is aborted and this function returns. If |f|
* returns an error, this function emits blocks until the channels are closed
* or it encounters the deadline enforced by |ctx|.
*
* Return values:
* - If |ctx| signals Done, returns ctx.Err()
* - Otherwise, returns the return value of the last invocation of |f|.
*/
// TODO(brian): refactor this function so it depends on a function that
// returns a channel of objects |o| which expose functions g such that o.g()
// returns ([]byte, error)
func (bs *BitSwap) emitBlockData(ctx context.Context, k u.Key, f func([]byte, error) error) error {

_, cancelFunc := context.WithCancel(ctx)

blockDataChan := make(chan []byte)
errChan := make(chan error)

go func() {
for p := range bs.routing.FindProvidersAsync(ctx, k, MaxProvidersForGetBlock) {
go func(provider *peer.Peer) {
block, err := bs.getBlock(ctx, k, provider)
if err != nil {
errChan <- err
} else {
blockDataChan <- block
}
}(p)
}
}()

var err error
for {
select {
case blkdata := <-blockDataChan:
err = f(blkdata, nil)
if err == nil {
cancelFunc()
return nil
}
case err := <-errChan:
err = f(nil, err)
if err == nil {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
return err
// TODO(brian): need to return the last return value of |f|
}

/* Retrieves data for key |k| from peer |p| within timeout enforced by |ctx|.
*/
func (bs *BitSwap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) ([]byte, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())

deadline, ok := ctx.Deadline()
if !ok {
return nil, errors.New("Expected caller to provide a deadline")
}
timeout := deadline.Sub(time.Now())

pmes := new(PBMessage)
pmes.Wantlist = []string{string(k)}

resp := bs.listener.Listen(string(k), 1, timeout)
smes := swarm.NewMessage(p, pmes)
bs.meschan.Outgoing <- smes

select {
case resp_mes := <-resp:
return resp_mes.Data, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

type blockDataProvider interface {
ProvidersAsync(ctx context.Context, k u.Key, max int) chan *peer.Peer
BlockData(ctx context.Context, k u.Key, p *peer.Peer) ([]byte, error)
}
1 change: 1 addition & 0 deletions bitswap/get_block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package bitswap
5 changes: 5 additions & 0 deletions bitswap/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package bitswap

// TODO(brian): use a Bitswap interface. Let the struct be a private
// implementation. Let the factory method return the struct but
// expose the interface as its return value
5 changes: 4 additions & 1 deletion blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

context "code.google.com/p/go.net/context"

ds "github.com/jbenet/datastore.go"
bitswap "github.com/jbenet/go-ipfs/bitswap"
blocks "github.com/jbenet/go-ipfs/blocks"
Expand Down Expand Up @@ -64,7 +66,8 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
}, nil
} else if err == ds.ErrNotFound && s.Remote != nil {
u.DOut("Blockservice: Searching bitswap.\n")
blk, err := s.Remote.GetBlock(k, time.Second*5)
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
blk, err := s.Remote.GetBlock(ctx, k)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

ds "github.com/jbenet/datastore.go"

context "code.google.com/p/go.net/context"

"code.google.com/p/goprotobuf/proto"
)

Expand Down Expand Up @@ -575,7 +577,7 @@ func (dht *IpfsDHT) printTables() {
}
}

func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) {
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*PBDHTMessage, error) {
pmes := Message{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Expand All @@ -587,11 +589,10 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time

listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.netChan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
case <-ctx.Done():
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
return nil, ctx.Err()
case resp := <-listenChan:
u.DOut("FindProviders: got response.\n")
pmesOut := new(PBDHTMessage)
Expand Down
4 changes: 3 additions & 1 deletion routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dht
import (
"testing"

context "code.google.com/p/go.net/context"

ds "github.com/jbenet/datastore.go"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
Expand Down Expand Up @@ -193,7 +195,7 @@ func TestProvides(t *testing.T) {

time.Sleep(time.Millisecond * 60)

provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second)
provs, err := dhts[0].FindProviders(context.TODO(), u.Key("hello"), time.Second)
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 8 additions & 4 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"time"

context "code.google.com/p/go.net/context"

proto "code.google.com/p/goprotobuf/proto"

ma "github.com/jbenet/go-multiaddr"
Expand Down Expand Up @@ -184,7 +186,8 @@ func (dht *IpfsDHT) Provide(key u.Key) error {
return nil
}

func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
// TODO(brian): signal errors to caller
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
go func() {
ps := newPeerSet()
Expand All @@ -202,7 +205,8 @@ func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Durati
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
go func() {
pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
pmes, err := dht.findProvidersSingle(ctx, pp, key, 0)
// TODO(brian): propagate error back up to caller
if err != nil {
u.PErr("%v\n", err)
return
Expand Down Expand Up @@ -241,7 +245,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *
}

// FindProviders searches for peers who can provide the value for given key.
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRPC("FindProviders")
defer func() {
ll.EndLog()
Expand All @@ -254,7 +258,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee
}

for level := 0; level < len(dht.routingTables); {
pmes, err := dht.findProvidersSingle(p, key, level, timeout)
pmes, err := dht.findProvidersSingle(ctx, p, key, level)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package routing
import (
"time"

context "code.google.com/p/go.net/context"

peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
Expand All @@ -26,7 +28,7 @@ type IpfsRouting interface {
Provide(key u.Key) error

// FindProviders searches for peers who can provide the value for given key.
FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error)
FindProviders(ctx context.Context, key u.Key, timeout time.Duration) ([]*peer.Peer, error)

// Find specific Peer

Expand Down