diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 99096f2f757..17b9a57a11c 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -10,8 +10,6 @@ import ( u "github.com/jbenet/go-ipfs/util" ds "github.com/jbenet/datastore.go" - - "time" ) // PartnerWantListMax is the bound for the number of keys we'll store per @@ -75,63 +73,6 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR return bs } -// GetBlock attempts to retrieve a particular block from peers, within timeout. -func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( - *blocks.Block, error) { - u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty()) - begin := time.Now() - tleft := timeout - time.Now().Sub(begin) - provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) - - valchan := make(chan []byte) - after := time.After(tleft) - - // TODO: when the data is received, shut down this for loop ASAP - go func() { - for p := range provs_ch { - go func(pr *peer.Peer) { - blk, err := bs.getBlock(k, pr, tleft) - if err != nil { - u.PErr("getBlock returned: %v\n", err) - return - } - select { - case valchan <- blk: - default: - } - }(p) - } - }() - - select { - case blkdata := <-valchan: - close(valchan) - return blocks.NewBlock(blkdata) - case <-after: - return nil, u.ErrTimeout - } -} - -func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { - u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) - - pmes := new(PBMessage) - pmes.Wantlist = []string{string(k)} - - after := time.After(timeout) - resp := bs.listener.Listen(string(k), 1, timeout) - smes := swarm.NewMessage(p, pmes) - bs.meschan.Outgoing <- smes - - select { - case resp_mes := <-resp: - return resp_mes.Data, nil - case <-after: - u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) - return nil, u.ErrTimeout - } -} - // HaveBlock announces the existance of a block to BitSwap, potentially sending // it to peers (Partners) whose WantLists include it. func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { diff --git a/bitswap/get_block.go b/bitswap/get_block.go new file mode 100644 index 00000000000..2879168ab14 --- /dev/null +++ b/bitswap/get_block.go @@ -0,0 +1,134 @@ +package bitswap + +import ( + context "code.google.com/p/go.net/context" + + blocks "github.com/jbenet/go-ipfs/blocks" + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" + + "errors" + "time" +) + +const ( + MaxProvidersForGetBlock = 20 +) + +/* GetBlock attempts to retrieve the block given by |k| within the timeout + * period enforced by |ctx|. + * + * Once a result is obtained, sends cancellation signal to remaining async + * workers. + */ +func (bs *BitSwap) GetBlock(ctx context.Context, k u.Key) ( + *blocks.Block, error) { + u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty()) + + var block *blocks.Block + var err error + err = bs.emitBlockData(ctx, k, func(blockData []byte, err error) error { + if err != nil { + // TODO(brian): optionally log err + } + block, err = blocks.NewBlock(blockData) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return nil, err + } + return block, nil +} + +/* Asynchronously retrieves blockData providers. For each provider, retrieves + * block data. Results collected in blockDataChan and errChan are emitted to + * |f|. + * + * If |f| returns nil, the fan-out is aborted and this function returns. If |f| + * returns an error, this function emits blocks until the channels are closed + * or it encounters the deadline enforced by |ctx|. + * + * Return values: + * - If |ctx| signals Done, returns ctx.Err() + * - Otherwise, returns the return value of the last invocation of |f|. + */ +// TODO(brian): refactor this function so it depends on a function that +// returns a channel of objects |o| which expose functions g such that o.g() +// returns ([]byte, error) +func (bs *BitSwap) emitBlockData(ctx context.Context, k u.Key, f func([]byte, error) error) error { + + _, cancelFunc := context.WithCancel(ctx) + + blockDataChan := make(chan []byte) + errChan := make(chan error) + + go func() { + for p := range bs.routing.FindProvidersAsync(ctx, k, MaxProvidersForGetBlock) { + go func(provider *peer.Peer) { + block, err := bs.getBlock(ctx, k, provider) + if err != nil { + errChan <- err + } else { + blockDataChan <- block + } + }(p) + } + }() + + var err error + for { + select { + case blkdata := <-blockDataChan: + err = f(blkdata, nil) + if err == nil { + cancelFunc() + return nil + } + case err := <-errChan: + err = f(nil, err) + if err == nil { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } + return err + // TODO(brian): need to return the last return value of |f| +} + +/* Retrieves data for key |k| from peer |p| within timeout enforced by |ctx|. + */ +func (bs *BitSwap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) ([]byte, error) { + u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) + + deadline, ok := ctx.Deadline() + if !ok { + return nil, errors.New("Expected caller to provide a deadline") + } + timeout := deadline.Sub(time.Now()) + + pmes := new(PBMessage) + pmes.Wantlist = []string{string(k)} + + resp := bs.listener.Listen(string(k), 1, timeout) + smes := swarm.NewMessage(p, pmes) + bs.meschan.Outgoing <- smes + + select { + case resp_mes := <-resp: + return resp_mes.Data, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +type blockDataProvider interface { + ProvidersAsync(ctx context.Context, k u.Key, max int) chan *peer.Peer + BlockData(ctx context.Context, k u.Key, p *peer.Peer) ([]byte, error) +} diff --git a/bitswap/get_block_test.go b/bitswap/get_block_test.go new file mode 100644 index 00000000000..67a03afb70d --- /dev/null +++ b/bitswap/get_block_test.go @@ -0,0 +1 @@ +package bitswap diff --git a/bitswap/interface.go b/bitswap/interface.go new file mode 100644 index 00000000000..54fc41a37e8 --- /dev/null +++ b/bitswap/interface.go @@ -0,0 +1,5 @@ +package bitswap + +// TODO(brian): use a Bitswap interface. Let the struct be a private +// implementation. Let the factory method return the struct but +// expose the interface as its return value diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index ceb806f9b3c..f9758d1dd73 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + context "code.google.com/p/go.net/context" + ds "github.com/jbenet/datastore.go" bitswap "github.com/jbenet/go-ipfs/bitswap" blocks "github.com/jbenet/go-ipfs/blocks" @@ -64,7 +66,8 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) { }, nil } else if err == ds.ErrNotFound && s.Remote != nil { u.DOut("Blockservice: Searching bitswap.\n") - blk, err := s.Remote.GetBlock(k, time.Second*5) + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + blk, err := s.Remote.GetBlock(ctx, k) if err != nil { return nil, err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b1a6e59c9eb..236774dd852 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -15,6 +15,8 @@ import ( ds "github.com/jbenet/datastore.go" + context "code.google.com/p/go.net/context" + "code.google.com/p/goprotobuf/proto" ) @@ -575,7 +577,7 @@ func (dht *IpfsDHT) printTables() { } } -func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) { +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*PBDHTMessage, error) { pmes := Message{ Type: PBDHTMessage_GET_PROVIDERS, Key: string(key), @@ -587,11 +589,10 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) dht.netChan.Outgoing <- mes - after := time.After(timeout) select { - case <-after: + case <-ctx.Done(): dht.listener.Unlisten(pmes.ID) - return nil, u.ErrTimeout + return nil, ctx.Err() case resp := <-listenChan: u.DOut("FindProviders: got response.\n") pmesOut := new(PBDHTMessage) diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 817ecec37d4..fca6c0a86f6 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -3,6 +3,8 @@ package dht import ( "testing" + context "code.google.com/p/go.net/context" + ds "github.com/jbenet/datastore.go" peer "github.com/jbenet/go-ipfs/peer" swarm "github.com/jbenet/go-ipfs/swarm" @@ -193,7 +195,7 @@ func TestProvides(t *testing.T) { time.Sleep(time.Millisecond * 60) - provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second) + provs, err := dhts[0].FindProviders(context.TODO(), u.Key("hello"), time.Second) if err != nil { t.Fatal(err) } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 082f737f5ed..61a243ab225 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -6,6 +6,8 @@ import ( "errors" "time" + context "code.google.com/p/go.net/context" + proto "code.google.com/p/goprotobuf/proto" ma "github.com/jbenet/go-multiaddr" @@ -184,7 +186,8 @@ func (dht *IpfsDHT) Provide(key u.Key) error { return nil } -func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer { +// TODO(brian): signal errors to caller +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) chan *peer.Peer { peerOut := make(chan *peer.Peer, count) go func() { ps := newPeerSet() @@ -202,7 +205,8 @@ func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Durati peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) for _, pp := range peers { go func() { - pmes, err := dht.findProvidersSingle(pp, key, 0, timeout) + pmes, err := dht.findProvidersSingle(ctx, pp, key, 0) + // TODO(brian): propagate error back up to caller if err != nil { u.PErr("%v\n", err) return @@ -241,7 +245,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps * } // FindProviders searches for peers who can provide the value for given key. -func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { +func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key, timeout time.Duration) ([]*peer.Peer, error) { ll := startNewRPC("FindProviders") defer func() { ll.EndLog() @@ -254,7 +258,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee } for level := 0; level < len(dht.routingTables); { - pmes, err := dht.findProvidersSingle(p, key, level, timeout) + pmes, err := dht.findProvidersSingle(ctx, p, key, level) if err != nil { return nil, err } diff --git a/routing/routing.go b/routing/routing.go index fdf3507491b..5706a1a9b3f 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -3,6 +3,8 @@ package routing import ( "time" + context "code.google.com/p/go.net/context" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -26,7 +28,7 @@ type IpfsRouting interface { Provide(key u.Key) error // FindProviders searches for peers who can provide the value for given key. - FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) + FindProviders(ctx context.Context, key u.Key, timeout time.Duration) ([]*peer.Peer, error) // Find specific Peer