diff --git a/blocks/blocksutil/block_generator.go b/blocks/blocksutil/block_generator.go index 324da54a76d..a8520a7a2a6 100644 --- a/blocks/blocksutil/block_generator.go +++ b/blocks/blocksutil/block_generator.go @@ -25,8 +25,8 @@ func (bg *BlockGenerator) Next() *blocks.BasicBlock { } // Blocks generates as many BasicBlocks as specified by n. -func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock { - blocks := make([]*blocks.BasicBlock, 0) +func (bg *BlockGenerator) Blocks(n int) []blocks.Block { + blocks := make([]blocks.Block, 0, n) for i := 0; i < n; i++ { b := bg.Next() blocks = append(blocks, b) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 98afac84269..1f260363744 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -10,9 +10,10 @@ import ( "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" ) @@ -77,6 +78,23 @@ func (bs *blockService) Exchange() exchange.Interface { return bs.exchange } +// NewSession creates a bitswap session that allows for controlled exchange of +// wantlists to decrease the bandwidth overhead. +func NewSession(ctx context.Context, bs BlockService) *Session { + exchange := bs.Exchange() + if bswap, ok := exchange.(*bitswap.Bitswap); ok { + ses := bswap.NewSession(ctx) + return &Session{ + ses: ses, + bs: bs.Blockstore(), + } + } + return &Session{ + ses: exchange, + bs: bs.Blockstore(), + } +} + // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { @@ -141,16 +159,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.blockstore.Get(c) + var f exchange.Fetcher + if s.exchange != nil { + f = s.exchange + } + + return getBlock(ctx, c, s.blockstore, f) +} + +func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) { + block, err := bs.Get(c) if err == nil { return block, nil } - if err == blockstore.ErrNotFound && s.exchange != nil { + if err == blockstore.ErrNotFound && f != nil { // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.exchange.GetBlock(ctx, c) + blk, err := f.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -172,12 +199,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.blockstore, s.exchange) +} + +func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) var misses []*cid.Cid for _, c := range ks { - hit, err := s.blockstore.Get(c) + hit, err := bs.Get(c) if err != nil { misses = append(misses, c) continue @@ -194,7 +225,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc return } - rblocks, err := s.exchange.GetBlocks(ctx, misses) + rblocks, err := f.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) return @@ -220,3 +251,19 @@ func (s *blockService) Close() error { log.Debug("blockservice is shutting down...") return s.exchange.Close() } + +// Session is a helper type to provide higher level access to bitswap sessions +type Session struct { + bs blockstore.Blockstore + ses exchange.Fetcher +} + +// GetBlock gets a block in the context of a request session +func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { + return getBlock(ctx, c, s.bs, s.ses) +} + +// GetBlocks gets blocks in the context of a request session +func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.bs, s.ses) +} diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index e7d94e9125d..98dd54f59cf 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{ ks = append(ks, c) } - bs.CancelWants(ks) + // TODO: This should maybe find *all* sessions for this request and cancel them? + // (why): in reality, i think this command should be removed. Its + // messing with the internal state of bitswap. You should cancel wants + // by killing the command that caused the want. + bs.CancelWants(ks, 0) }, } @@ -107,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`, res.SetError(err, cmds.ErrNormal) return } + if pid == nd.Identity { + res.SetOutput(&KeyList{bs.GetWantlist()}) + return + } + res.SetOutput(&KeyList{bs.WantlistForPeer(pid)}) } else { res.SetOutput(&KeyList{bs.GetWantlist()}) diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index 6ec2c38937e..a8ebc390a07 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -27,7 +27,7 @@ import ( node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" - multibase "gx/ipfs/QmcxkxTVuURV2Ptse8TvkqH5BQDwV62X1x19JqqvbBzwUM/go-multibase" + multibase "gx/ipfs/Qme4T6BE4sQxg7ZouamF5M7Tx1ZFTqzcns7BkyQPXpoT99/go-multibase" ) const ( diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index ce7bd6b2606..d9f4fea9a9c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -7,6 +7,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" @@ -17,13 +18,12 @@ import ( notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" flags "github.com/ipfs/go-ipfs/flags" "github.com/ipfs/go-ipfs/thirdparty/delay" - blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" ) @@ -99,6 +99,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, newBlocks: make(chan *cid.Cid, HasBlockBufferSize), provideKeys: make(chan *cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), + counters: new(counters), dupMetric: dupHist, allMetric: allHist, @@ -152,17 +153,29 @@ type Bitswap struct { process process.Process // Counters for various statistics - counterLk sync.Mutex - blocksRecvd int - dupBlocksRecvd int - dupDataRecvd uint64 - blocksSent int - dataSent uint64 - dataRecvd uint64 + counterLk sync.Mutex + counters *counters // Metrics interface metrics dupMetric metrics.Histogram allMetric metrics.Histogram + + // Sessions + sessions []*Session + sessLk sync.Mutex + + sessID uint64 + sessIDLk sync.Mutex +} + +type counters struct { + blocksRecvd uint64 + dupBlocksRecvd uint64 + dupDataRecvd uint64 + blocksSent uint64 + dataSent uint64 + dataRecvd uint64 + messagesRecvd uint64 } type blockRequest struct { @@ -173,45 +186,7 @@ type blockRequest struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { - if k == nil { - log.Error("nil cid in GetBlock") - return nil, blockstore.ErrNotFound - } - - // Any async work initiated by this function must end when this function - // returns. To ensure this, derive a new context. Note that it is okay to - // listen on parent in this scope, but NOT okay to pass |parent| to - // functions called by this one. Otherwise those functions won't return - // when this context's cancel func is executed. This is difficult to - // enforce. May this comment keep you safe. - ctx, cancelFunc := context.WithCancel(parent) - - // TODO: this request ID should come in from a higher layer so we can track - // across multiple 'GetBlock' invocations - ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest")) - log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) - defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k) - defer cancelFunc() - - promise, err := bs.GetBlocks(ctx, []*cid.Cid{k}) - if err != nil { - return nil, err - } - - select { - case block, ok := <-promise: - if !ok { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - return nil, errors.New("promise channel was closed") - } - } - return block, nil - case <-parent.Done(): - return nil, parent.Err() - } + return getBlock(parent, k, bs.GetBlocks) } func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { @@ -251,7 +226,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } - bs.wm.WantBlocks(ctx, keys) + mses := bs.getNextSessionID() + + bs.wm.WantBlocks(ctx, keys, nil, mses) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most @@ -273,7 +250,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block defer close(out) defer func() { // can't just defer this call on its own, arguments are resolved *when* the defer is created - bs.CancelWants(remaining.Keys()) + bs.CancelWants(remaining.Keys(), mses) }() for { select { @@ -282,6 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block return } + bs.CancelWants([]*cid.Cid{blk.Cid()}, mses) remaining.Remove(blk.Cid()) select { case out <- blk: @@ -302,9 +280,19 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block } } +func (bs *Bitswap) getNextSessionID() uint64 { + bs.sessIDLk.Lock() + defer bs.sessIDLk.Unlock() + bs.sessID++ + return bs.sessID +} + // CancelWant removes a given key from the wantlist -func (bs *Bitswap) CancelWants(cids []*cid.Cid) { - bs.wm.CancelWants(cids) +func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { + if len(cids) == 0 { + return + } + bs.wm.CancelWants(context.Background(), cids, nil, ses) } // HasBlock announces the existance of a block to this bitswap service. The @@ -329,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { // it now as it requires more thought and isnt causing immediate problems. bs.notifications.Publish(blk) + for _, s := range bs.SessionsForBlock(blk.Cid()) { + s.receiveBlockFrom("", blk) + } + bs.engine.AddBlock(blk) select { @@ -340,7 +332,23 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { return nil } +// SessionsForBlock returns a slice of all sessions that may be interested in the given cid +func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { + bs.sessLk.Lock() + defer bs.sessLk.Unlock() + + var out []*Session + for _, s := range bs.sessions { + if s.interestedIn(c) { + out = append(out, s) + } + } + return out +} + func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { + atomic.AddUint64(&bs.counters.messagesRecvd, 1) + // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) @@ -362,12 +370,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } keys = append(keys, block.Cid()) } - bs.wm.CancelWants(keys) wg := sync.WaitGroup{} for _, block := range iblocks { wg.Add(1) - go func(b blocks.Block) { + go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine... defer wg.Done() bs.updateReceiveCounters(b) @@ -375,7 +382,15 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg k := b.Cid() log.Event(ctx, "Bitswap.GetBlockRequest.End", k) + for _, ses := range bs.SessionsForBlock(k) { + ses.receiveBlockFrom(p, b) + bs.CancelWants([]*cid.Cid{k}, ses.id) + } + log.Debugf("got block %s from %s", b, p) + // TODO: rework this to not call 'HasBlock'. 'HasBlock' is really + // designed to be called when blocks are coming in from non-bitswap + // places (like the user manually adding data) if err := bs.HasBlock(b); err != nil { log.Warningf("ReceiveMessage HasBlock error: %s", err) } @@ -401,12 +416,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { bs.counterLk.Lock() defer bs.counterLk.Unlock() + c := bs.counters - bs.blocksRecvd++ - bs.dataRecvd += uint64(len(b.RawData())) + c.blocksRecvd++ + c.dataRecvd += uint64(len(b.RawData())) if has { - bs.dupBlocksRecvd++ - bs.dupDataRecvd += uint64(blkLen) + c.dupBlocksRecvd++ + c.dupDataRecvd += uint64(blkLen) } } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 770041c9fc9..506b8d0c157 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -291,7 +291,7 @@ func TestEmptyKey(t *testing.T) { } } -func assertStat(st *Stat, sblks, rblks int, sdata, rdata uint64) error { +func assertStat(st *Stat, sblks, rblks, sdata, rdata uint64) error { if sblks != st.BlocksSent { return fmt.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent) } @@ -318,7 +318,7 @@ func TestBasicBitswap(t *testing.T) { t.Log("Test a one node trying to get one block from another") - instances := sg.Instances(2) + instances := sg.Instances(3) blocks := bg.Blocks(1) err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { @@ -332,6 +332,15 @@ func TestBasicBitswap(t *testing.T) { t.Fatal(err) } + time.Sleep(time.Millisecond * 25) + wl := instances[2].Exchange.WantlistForPeer(instances[1].Peer) + if len(wl) != 0 { + t.Fatal("should have no items in other peers wantlist") + } + if len(instances[1].Exchange.GetWantlist()) != 0 { + t.Fatal("shouldnt have anything in wantlist") + } + st0, err := instances[0].Exchange.Stat() if err != nil { t.Fatal(err) @@ -370,6 +379,9 @@ func TestDoubleGet(t *testing.T) { instances := sg.Instances(2) blocks := bg.Blocks(1) + // NOTE: A race condition can happen here where these GetBlocks requests go + // through before the peers even get connected. This is okay, bitswap + // *should* be able to handle this. ctx1, cancel1 := context.WithCancel(context.Background()) blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()}) if err != nil { @@ -385,7 +397,7 @@ func TestDoubleGet(t *testing.T) { } // ensure both requests make it into the wantlist at the same time - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 20) cancel1() _, ok := <-blkch1 @@ -405,6 +417,14 @@ func TestDoubleGet(t *testing.T) { } t.Log(blk) case <-time.After(time.Second * 5): + p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer) + if len(p1wl) != 1 { + t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl)) + } else if !p1wl[0].Equals(blocks[0].Cid()) { + t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0]) + } else { + t.Log("had correct wantlist, somehow") + } t.Fatal("timed out waiting on block") } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index a51610e60ee..600df11f216 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -2,10 +2,10 @@ package decision import ( + "context" "sync" "time" - context "context" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" @@ -105,13 +105,10 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) { - e.lock.Lock() - partner, ok := e.ledgerMap[p] - if ok { - out = partner.wantList.SortedEntries() - } - e.lock.Unlock() - return out + partner := e.findOrCreate(p) + partner.lk.Lock() + defer partner.lk.Unlock() + return partner.wantList.SortedEntries() } func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { diff --git a/exchange/bitswap/get.go b/exchange/bitswap/get.go new file mode 100644 index 00000000000..a72ead83ad0 --- /dev/null +++ b/exchange/bitswap/get.go @@ -0,0 +1,100 @@ +package bitswap + +import ( + "context" + "errors" + + blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" + notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" +) + +type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error) + +func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) { + if k == nil { + log.Error("nil cid in GetBlock") + return nil, blockstore.ErrNotFound + } + + // Any async work initiated by this function must end when this function + // returns. To ensure this, derive a new context. Note that it is okay to + // listen on parent in this scope, but NOT okay to pass |parent| to + // functions called by this one. Otherwise those functions won't return + // when this context's cancel func is executed. This is difficult to + // enforce. May this comment keep you safe. + ctx, cancel := context.WithCancel(p) + defer cancel() + + promise, err := gb(ctx, []*cid.Cid{k}) + if err != nil { + return nil, err + } + + select { + case block, ok := <-promise: + if !ok { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, errors.New("promise channel was closed") + } + } + return block, nil + case <-p.Done(): + return nil, p.Err() + } +} + +type wantFunc func(context.Context, []*cid.Cid) + +func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) { + if len(keys) == 0 { + out := make(chan blocks.Block) + close(out) + return out, nil + } + + remaining := cid.NewSet() + promise := notif.Subscribe(ctx, keys...) + for _, k := range keys { + log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) + remaining.Add(k) + } + + want(ctx, keys) + + out := make(chan blocks.Block) + go handleIncoming(ctx, remaining, promise, out, cwants) + return out, nil +} + +func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) { + ctx, cancel := context.WithCancel(ctx) + defer func() { + cancel() + close(out) + // can't just defer this call on its own, arguments are resolved *when* the defer is created + cfun(remaining.Keys()) + }() + for { + select { + case blk, ok := <-in: + if !ok { + return + } + + remaining.Remove(blk.Cid()) + select { + case out <- blk: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +} diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go new file mode 100644 index 00000000000..53db1a28a43 --- /dev/null +++ b/exchange/bitswap/session.go @@ -0,0 +1,309 @@ +package bitswap + +import ( + "context" + "time" + + notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" + + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" + peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" +) + +const activeWantsLimit = 16 + +// Session holds state for an individual bitswap transfer operation. +// This allows bitswap to make smarter decisions about who to send wantlist +// info to, and who to request blocks from +type Session struct { + ctx context.Context + tofetch *cidQueue + activePeers map[peer.ID]struct{} + activePeersArr []peer.ID + + bs *Bitswap + incoming chan blkRecv + newReqs chan []*cid.Cid + cancelKeys chan []*cid.Cid + interestReqs chan interestReq + + interest *lru.Cache + liveWants map[string]time.Time + + tick *time.Timer + baseTickDelay time.Duration + + latTotal time.Duration + fetchcnt int + + notif notifications.PubSub + + uuid logging.Loggable + + id uint64 +} + +// NewSession creates a new bitswap session whose lifetime is bounded by the +// given context +func (bs *Bitswap) NewSession(ctx context.Context) *Session { + s := &Session{ + activePeers: make(map[peer.ID]struct{}), + liveWants: make(map[string]time.Time), + newReqs: make(chan []*cid.Cid), + cancelKeys: make(chan []*cid.Cid), + tofetch: newCidQueue(), + interestReqs: make(chan interestReq), + ctx: ctx, + bs: bs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + id: bs.getNextSessionID(), + } + + cache, _ := lru.New(2048) + s.interest = cache + + bs.sessLk.Lock() + bs.sessions = append(bs.sessions, s) + bs.sessLk.Unlock() + + go s.run(ctx) + + return s +} + +type blkRecv struct { + from peer.ID + blk blocks.Block +} + +func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { + s.incoming <- blkRecv{from: from, blk: blk} +} + +type interestReq struct { + c *cid.Cid + resp chan bool +} + +// TODO: PERF: this is using a channel to guard a map access against race +// conditions. This is definitely much slower than a mutex, though its unclear +// if it will actually induce any noticeable slowness. This is implemented this +// way to avoid adding a more complex set of mutexes around the liveWants map. +// note that in the average case (where this session *is* interested in the +// block we received) this function will not be called, as the cid will likely +// still be in the interest cache. +func (s *Session) isLiveWant(c *cid.Cid) bool { + resp := make(chan bool) + s.interestReqs <- interestReq{ + c: c, + resp: resp, + } + return <-resp +} + +func (s *Session) interestedIn(c *cid.Cid) bool { + return s.interest.Contains(c.KeyString()) || s.isLiveWant(c) +} + +const provSearchDelay = time.Second * 10 + +func (s *Session) addActivePeer(p peer.ID) { + if _, ok := s.activePeers[p]; !ok { + s.activePeers[p] = struct{}{} + s.activePeersArr = append(s.activePeersArr, p) + } +} + +func (s *Session) resetTick() { + if s.latTotal == 0 { + s.tick.Reset(provSearchDelay) + } else { + avLat := s.latTotal / time.Duration(s.fetchcnt) + s.tick.Reset(s.baseTickDelay + (3 * avLat)) + } +} + +func (s *Session) run(ctx context.Context) { + s.tick = time.NewTimer(provSearchDelay) + newpeers := make(chan peer.ID, 16) + for { + select { + case blk := <-s.incoming: + s.tick.Stop() + + s.addActivePeer(blk.from) + + s.receiveBlock(ctx, blk.blk) + + s.resetTick() + case keys := <-s.newReqs: + for _, k := range keys { + s.interest.Add(k.KeyString(), nil) + } + if len(s.liveWants) < activeWantsLimit { + toadd := activeWantsLimit - len(s.liveWants) + if toadd > len(keys) { + toadd = len(keys) + } + + now := keys[:toadd] + keys = keys[toadd:] + + s.wantBlocks(ctx, now) + } + for _, k := range keys { + s.tofetch.Push(k) + } + case keys := <-s.cancelKeys: + s.cancel(keys) + + case <-s.tick.C: + var live []*cid.Cid + for c := range s.liveWants { + cs, _ := cid.Cast([]byte(c)) + live = append(live, cs) + s.liveWants[c] = time.Now() + } + + // Broadcast these keys to everyone we're connected to + s.bs.wm.WantBlocks(ctx, live, nil, s.id) + + if len(live) > 0 { + go func(k *cid.Cid) { + // TODO: have a task queue setup for this to: + // - rate limit + // - manage timeouts + // - ensure two 'findprovs' calls for the same block don't run concurrently + // - share peers between sessions based on interest set + for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) { + newpeers <- p + } + }(live[0]) + } + s.resetTick() + case p := <-newpeers: + s.addActivePeer(p) + case lwchk := <-s.interestReqs: + lwchk.resp <- s.cidIsWanted(lwchk.c) + case <-ctx.Done(): + s.tick.Stop() + return + } + } +} + +func (s *Session) cidIsWanted(c *cid.Cid) bool { + _, ok := s.liveWants[c.KeyString()] + if !ok { + ok = s.tofetch.Has(c) + } + + return ok +} + +func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { + c := blk.Cid() + if s.cidIsWanted(c) { + ks := c.KeyString() + tval, ok := s.liveWants[ks] + if ok { + s.latTotal += time.Since(tval) + delete(s.liveWants, ks) + } else { + s.tofetch.Remove(c) + } + s.fetchcnt++ + s.notif.Publish(blk) + + if next := s.tofetch.Pop(); next != nil { + s.wantBlocks(ctx, []*cid.Cid{next}) + } + } +} + +func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { + for _, c := range ks { + s.liveWants[c.KeyString()] = time.Now() + } + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) +} + +func (s *Session) cancel(keys []*cid.Cid) { + for _, c := range keys { + s.tofetch.Remove(c) + } +} + +func (s *Session) cancelWants(keys []*cid.Cid) { + s.cancelKeys <- keys +} + +func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { + select { + case s.newReqs <- keys: + case <-ctx.Done(): + } +} + +// GetBlocks fetches a set of blocks within the context of this session and +// returns a channel that found blocks will be returned on. No order is +// guaranteed on the returned blocks. +func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { + ctx = logging.ContextWithLoggable(ctx, s.uuid) + return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) +} + +// GetBlock fetches a single block +func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { + return getBlock(parent, k, s.GetBlocks) +} + +type cidQueue struct { + elems []*cid.Cid + eset *cid.Set +} + +func newCidQueue() *cidQueue { + return &cidQueue{eset: cid.NewSet()} +} + +func (cq *cidQueue) Pop() *cid.Cid { + for { + if len(cq.elems) == 0 { + return nil + } + + out := cq.elems[0] + cq.elems = cq.elems[1:] + + if cq.eset.Has(out) { + cq.eset.Remove(out) + return out + } + } +} + +func (cq *cidQueue) Push(c *cid.Cid) { + if cq.eset.Visit(c) { + cq.elems = append(cq.elems, c) + } +} + +func (cq *cidQueue) Remove(c *cid.Cid) { + cq.eset.Remove(c) +} + +func (cq *cidQueue) Has(c *cid.Cid) bool { + return cq.eset.Has(c) +} + +func (cq *cidQueue) Len() int { + return cq.eset.Len() +} diff --git a/exchange/bitswap/session_test.go b/exchange/bitswap/session_test.go new file mode 100644 index 00000000000..dfdae79cb5b --- /dev/null +++ b/exchange/bitswap/session_test.go @@ -0,0 +1,244 @@ +package bitswap + +import ( + "context" + "fmt" + "testing" + "time" + + blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" + + blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" +) + +func TestBasicSessions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + block := bgen.Next() + inst := sesgen.Instances(2) + + a := inst[0] + b := inst[1] + + if err := b.Blockstore().Put(block); err != nil { + t.Fatal(err) + } + + sesa := a.Exchange.NewSession(ctx) + + blkout, err := sesa.GetBlock(ctx, block.Cid()) + if err != nil { + t.Fatal(err) + } + + if !blkout.Cid().Equals(block.Cid()) { + t.Fatal("got wrong block") + } +} + +func assertBlockLists(got, exp []blocks.Block) error { + if len(got) != len(exp) { + return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp)) + } + + h := cid.NewSet() + for _, b := range got { + h.Add(b.Cid()) + } + for _, b := range exp { + if !h.Has(b.Cid()) { + return fmt.Errorf("didnt have: %s", b.Cid()) + } + } + return nil +} + +func TestSessionBetweenPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + inst := sesgen.Instances(10) + + blks := bgen.Blocks(101) + if err := inst[0].Blockstore().PutMany(blks); err != nil { + t.Fatal(err) + } + + var cids []*cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + ses := inst[1].Exchange.NewSession(ctx) + if _, err := ses.GetBlock(ctx, cids[0]); err != nil { + t.Fatal(err) + } + blks = blks[1:] + cids = cids[1:] + + for i := 0; i < 10; i++ { + ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10]) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + t.Fatal(err) + } + } + for _, is := range inst[2:] { + if is.Exchange.counters.messagesRecvd > 2 { + t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.counters.messagesRecvd) + } + } +} + +func TestSessionSplitFetch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + inst := sesgen.Instances(11) + + blks := bgen.Blocks(100) + for i := 0; i < 10; i++ { + if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil { + t.Fatal(err) + } + } + + var cids []*cid.Cid + for _, blk := range blks { + cids = append(cids, blk.Cid()) + } + + ses := inst[10].Exchange.NewSession(ctx) + ses.baseTickDelay = time.Millisecond * 10 + + for i := 0; i < 10; i++ { + ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10]) + if err != nil { + t.Fatal(err) + } + + var got []blocks.Block + for b := range ch { + got = append(got, b) + } + if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil { + t.Fatal(err) + } + } +} + +func TestInterestCacheOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + blks := bgen.Blocks(2049) + inst := sesgen.Instances(2) + + a := inst[0] + b := inst[1] + + ses := a.Exchange.NewSession(ctx) + zeroch, err := ses.GetBlocks(ctx, []*cid.Cid{blks[0].Cid()}) + if err != nil { + t.Fatal(err) + } + + var restcids []*cid.Cid + for _, blk := range blks[1:] { + restcids = append(restcids, blk.Cid()) + } + + restch, err := ses.GetBlocks(ctx, restcids) + if err != nil { + t.Fatal(err) + } + + // wait to ensure that all the above cids were added to the sessions cache + time.Sleep(time.Millisecond * 50) + + if err := b.Exchange.HasBlock(blks[0]); err != nil { + t.Fatal(err) + } + + select { + case blk, ok := <-zeroch: + if ok && blk.Cid().Equals(blks[0].Cid()) { + // success! + } else { + t.Fatal("failed to get the block") + } + case <-restch: + t.Fatal("should not get anything on restch") + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for block") + } +} + +func TestPutAfterSessionCacheEvict(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vnet := getVirtualNetwork() + sesgen := NewTestSessionGenerator(vnet) + defer sesgen.Close() + bgen := blocksutil.NewBlockGenerator() + + blks := bgen.Blocks(2500) + inst := sesgen.Instances(1) + + a := inst[0] + + ses := a.Exchange.NewSession(ctx) + + var allcids []*cid.Cid + for _, blk := range blks[1:] { + allcids = append(allcids, blk.Cid()) + } + + blkch, err := ses.GetBlocks(ctx, allcids) + if err != nil { + t.Fatal(err) + } + + // wait to ensure that all the above cids were added to the sessions cache + time.Sleep(time.Millisecond * 50) + + if err := a.Exchange.HasBlock(blks[17]); err != nil { + t.Fatal(err) + } + + select { + case <-blkch: + case <-time.After(time.Millisecond * 50): + t.Fatal("timed out waiting for block") + } +} diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index 2f95d9e8b7d..fb5eb5011ba 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -10,11 +10,11 @@ type Stat struct { ProvideBufLen int Wantlist []*cid.Cid Peers []string - BlocksReceived int + BlocksReceived uint64 DataReceived uint64 - BlocksSent int + BlocksSent uint64 DataSent uint64 - DupBlksReceived int + DupBlksReceived uint64 DupDataReceived uint64 } @@ -23,12 +23,13 @@ func (bs *Bitswap) Stat() (*Stat, error) { st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() bs.counterLk.Lock() - st.BlocksReceived = bs.blocksRecvd - st.DupBlksReceived = bs.dupBlocksRecvd - st.DupDataReceived = bs.dupDataRecvd - st.BlocksSent = bs.blocksSent - st.DataSent = bs.dataSent - st.DataReceived = bs.dataRecvd + c := bs.counters + st.BlocksReceived = c.blocksRecvd + st.DupBlksReceived = c.dupBlocksRecvd + st.DupDataReceived = c.dupDataRecvd + st.BlocksSent = c.blocksSent + st.DataSent = c.dataSent + st.DataReceived = c.dataRecvd bs.counterLk.Unlock() for _, p := range bs.engine.Peers() { diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 4bae29ce3d5..d3bb98b0e06 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -47,7 +47,7 @@ func (g *SessionGenerator) Next() Instance { if err != nil { panic("FIXME") // TODO change signature } - return Session(g.ctx, g.net, p) + return MkSession(g.ctx, g.net, p) } func (g *SessionGenerator) Instances(n int) []Instance { @@ -86,7 +86,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's // just a much better idea. -func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance { +func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instance { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 7c77998b3ba..de340ea6add 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -10,8 +10,8 @@ import ( ) type ThreadSafe struct { - lk sync.RWMutex - Wantlist Wantlist + lk sync.RWMutex + set map[string]*Entry } // not threadsafe @@ -23,7 +23,16 @@ type Entry struct { Cid *cid.Cid Priority int - RefCnt int + SesTrk map[uint64]struct{} +} + +// NewRefEntry creates a new reference tracked wantlist entry +func NewRefEntry(c *cid.Cid, p int) *Entry { + return &Entry{ + Cid: c, + Priority: p, + SesTrk: make(map[uint64]struct{}), + } } type entrySlice []*Entry @@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit func NewThreadSafe() *ThreadSafe { return &ThreadSafe{ - Wantlist: *New(), + set: make(map[string]*Entry), } } @@ -44,46 +53,101 @@ func New() *Wantlist { } } -func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool { +// Add adds the given cid to the wantlist with the specified priority, governed +// by the session ID 'ses'. if a cid is added under multiple session IDs, then +// it must be removed by each of those sessions before it is no longer 'in the +// wantlist'. Calls to Add are idempotent given the same arguments. Subsequent +// calls with different values for priority will not update the priority +// TODO: think through priority changes here +// Add returns true if the cid did not exist in the wantlist before this call +// (even if it was under a different session) +func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Add(k, priority) + k := c.KeyString() + if e, ok := w.set[k]; ok { + e.SesTrk[ses] = struct{}{} + return false + } + + w.set[k] = &Entry{ + Cid: c, + Priority: priority, + SesTrk: map[uint64]struct{}{ses: struct{}{}}, + } + + return true } -func (w *ThreadSafe) AddEntry(e *Entry) bool { +// AddEntry adds given Entry to the wantlist. For more information see Add method. +func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.AddEntry(e) + k := e.Cid.KeyString() + if ex, ok := w.set[k]; ok { + ex.SesTrk[ses] = struct{}{} + return false + } + w.set[k] = e + e.SesTrk[ses] = struct{}{} + return true } -func (w *ThreadSafe) Remove(k *cid.Cid) bool { +// Remove removes the given cid from being tracked by the given session. +// 'true' is returned if this call to Remove removed the final session ID +// tracking the cid. (meaning true will be returned iff this call caused the +// value of 'Contains(c)' to change from true to false) +func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() - return w.Wantlist.Remove(k) + k := c.KeyString() + e, ok := w.set[k] + if !ok { + return false + } + + delete(e.SesTrk, ses) + if len(e.SesTrk) == 0 { + delete(w.set, k) + return true + } + return false } +// Contains returns true if the given cid is in the wantlist tracked by one or +// more sessions func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Contains(k) + e, ok := w.set[k.KeyString()] + return e, ok } func (w *ThreadSafe) Entries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Entries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + return es } func (w *ThreadSafe) SortedEntries() []*Entry { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.SortedEntries() + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + sort.Sort(es) + return es } func (w *ThreadSafe) Len() int { w.lk.RLock() defer w.lk.RUnlock() - return w.Wantlist.Len() + return len(w.set) } func (w *Wantlist) Len() int { @@ -92,15 +156,13 @@ func (w *Wantlist) Len() int { func (w *Wantlist) Add(c *cid.Cid, priority int) bool { k := c.KeyString() - if e, ok := w.set[k]; ok { - e.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = &Entry{ Cid: c, Priority: priority, - RefCnt: 1, } return true @@ -108,8 +170,7 @@ func (w *Wantlist) Add(c *cid.Cid, priority int) bool { func (w *Wantlist) AddEntry(e *Entry) bool { k := e.Cid.KeyString() - if ex, ok := w.set[k]; ok { - ex.RefCnt++ + if _, ok := w.set[k]; ok { return false } w.set[k] = e @@ -118,17 +179,13 @@ func (w *Wantlist) AddEntry(e *Entry) bool { func (w *Wantlist) Remove(c *cid.Cid) bool { k := c.KeyString() - e, ok := w.set[k] + _, ok := w.set[k] if !ok { return false } - e.RefCnt-- - if e.RefCnt <= 0 { - delete(w.set, k) - return true - } - return false + delete(w.set, k) + return true } func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) { diff --git a/exchange/bitswap/wantlist/wantlist_test.go b/exchange/bitswap/wantlist/wantlist_test.go new file mode 100644 index 00000000000..d6027a718c8 --- /dev/null +++ b/exchange/bitswap/wantlist/wantlist_test.go @@ -0,0 +1,104 @@ +package wantlist + +import ( + "testing" + + cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" +) + +var testcids []*cid.Cid + +func init() { + strs := []string{ + "QmQL8LqkEgYXaDHdNYCG2mmpow7Sp8Z8Kt3QS688vyBeC7", + "QmcBDsdjgSXU7BP4A4V8LJCXENE5xVwnhrhRGVTJr9YCVj", + "QmQakgd2wDxc3uUF4orGdEm28zUT9Mmimp5pyPG2SFS9Gj", + } + for _, s := range strs { + c, err := cid.Decode(s) + if err != nil { + panic(err) + } + testcids = append(testcids, c) + } + +} + +type wli interface { + Contains(*cid.Cid) (*Entry, bool) +} + +func assertHasCid(t *testing.T, w wli, c *cid.Cid) { + e, ok := w.Contains(c) + if !ok { + t.Fatal("expected to have ", c) + } + if !e.Cid.Equals(c) { + t.Fatal("returned entry had wrong cid value") + } +} + +func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) { + _, ok := w.Contains(c) + if ok { + t.Fatal("expected not to have ", c) + } +} + +func TestBasicWantlist(t *testing.T) { + wl := New() + + if !wl.Add(testcids[0], 5) { + t.Fatal("expected true") + } + assertHasCid(t, wl, testcids[0]) + if !wl.Add(testcids[1], 4) { + t.Fatal("expected true") + } + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + if wl.Add(testcids[1], 4) { + t.Fatal("add shouldnt report success on second add") + } + assertHasCid(t, wl, testcids[0]) + assertHasCid(t, wl, testcids[1]) + + if wl.Len() != 2 { + t.Fatal("should have had two items") + } + + if !wl.Remove(testcids[0]) { + t.Fatal("should have gotten true") + } + + assertHasCid(t, wl, testcids[1]) + if _, has := wl.Contains(testcids[0]); has { + t.Fatal("shouldnt have this cid") + } +} + +func TestSesRefWantlist(t *testing.T) { + wl := NewThreadSafe() + + if !wl.Add(testcids[0], 5, 1) { + t.Fatal("should have added") + } + assertHasCid(t, wl, testcids[0]) + if wl.Remove(testcids[0], 2) { + t.Fatal("shouldnt have removed") + } + assertHasCid(t, wl, testcids[0]) + if wl.Add(testcids[0], 5, 1) { + t.Fatal("shouldnt have added") + } + assertHasCid(t, wl, testcids[0]) + if !wl.Remove(testcids[0], 1) { + t.Fatal("should have removed") + } + assertNotHasCid(t, wl, testcids[0]) +} diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index c6cce7ff7ee..800fa1c4008 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -17,7 +17,7 @@ import ( type WantManager struct { // sync channels for Run loop - incoming chan []*bsmsg.Entry + incoming chan *wantSet connect chan peer.ID // notification channel for new peers connecting disconnect chan peer.ID // notification channel for peers disconnecting peerReqs chan chan []peer.ID // channel to request connected peers on @@ -25,6 +25,7 @@ type WantManager struct { // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue wl *wantlist.ThreadSafe + bcwl *wantlist.ThreadSafe network bsnet.BitSwapNetwork ctx context.Context @@ -41,12 +42,13 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ " this bitswap").Histogram(metricsBuckets) return &WantManager{ - incoming: make(chan []*bsmsg.Entry, 10), + incoming: make(chan *wantSet, 10), connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peerReqs: make(chan chan []peer.ID), peers: make(map[peer.ID]*msgQueue), wl: wantlist.NewThreadSafe(), + bcwl: wantlist.NewThreadSafe(), network: network, ctx: ctx, cancel: cancel, @@ -61,6 +63,7 @@ type msgQueue struct { outlk sync.Mutex out bsmsg.BitSwapMessage network bsnet.BitSwapNetwork + wl *wantlist.ThreadSafe sender bsnet.MessageSender @@ -70,30 +73,33 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) { +// WantBlocks adds the given cids to the wantlist, tracked by the given session +func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) - pm.addEntries(ctx, ks, false) + pm.addEntries(ctx, ks, peers, false, ses) } -func (pm *WantManager) CancelWants(ks []*cid.Cid) { - log.Infof("cancel wants: %s", ks) - pm.addEntries(context.TODO(), ks, true) +// CancelWants removes the given cids from the wantlist, tracked by the given session +func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { + pm.addEntries(context.Background(), ks, peers, true, ses) } -func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) { +type wantSet struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} + +func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, - Entry: &wantlist.Entry{ - Cid: k, - Priority: kMaxPriority - i, - RefCnt: 1, - }, + Entry: wantlist.NewRefEntry(k, kMaxPriority-i), }) } select { - case pm.incoming <- entries: + case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: case <-pm.ctx.Done(): case <-ctx.Done(): } @@ -132,7 +138,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) - for _, e := range pm.wl.Entries() { + for _, e := range pm.bcwl.Entries() { + for k := range e.SesTrk { + mq.wl.AddEntry(e, k) + } fullwantlist.AddEntry(e.Cid, e.Priority) } mq.out = fullwantlist @@ -278,43 +287,47 @@ func (pm *WantManager) Run() { defer tock.Stop() for { select { - case entries := <-pm.incoming: + case ws := <-pm.incoming: + + // is this a broadcast or not? + brdc := len(ws.targets) == 0 // add changes to our wantlist - var filtered []*bsmsg.Entry - for _, e := range entries { + for _, e := range ws.entries { if e.Cancel { - if pm.wl.Remove(e.Cid) { + if brdc { + pm.bcwl.Remove(e.Cid, ws.from) + } + + if pm.wl.Remove(e.Cid, ws.from) { pm.wantlistGauge.Dec() - filtered = append(filtered, e) } } else { - if pm.wl.AddEntry(e.Entry) { + if brdc { + pm.bcwl.AddEntry(e.Entry, ws.from) + } + if pm.wl.AddEntry(e.Entry, ws.from) { pm.wantlistGauge.Inc() - filtered = append(filtered, e) } } } // broadcast those wantlist changes - for _, p := range pm.peers { - p.addMessage(filtered) - } - - case <-tock.C: - // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) - var es []*bsmsg.Entry - for _, e := range pm.wl.Entries() { - es = append(es, &bsmsg.Entry{Entry: e}) + if len(ws.targets) == 0 { + for _, p := range pm.peers { + p.addMessage(ws.entries, ws.from) + } + } else { + for _, t := range ws.targets { + p, ok := pm.peers[t] + if !ok { + log.Warning("tried sending wantlist change to non-partner peer") + continue + } + p.addMessage(ws.entries, ws.from) + } } - for _, p := range pm.peers { - p.outlk.Lock() - p.out = bsmsg.New(true) - p.outlk.Unlock() - - p.addMessage(es) - } case p := <-pm.connect: pm.startPeerHandler(p) case p := <-pm.disconnect: @@ -335,16 +348,21 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { return &msgQueue{ done: make(chan struct{}), work: make(chan struct{}, 1), + wl: wantlist.NewThreadSafe(), network: wm.network, p: p, refcnt: 1, } } -func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { +func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) { + var work bool mq.outlk.Lock() defer func() { mq.outlk.Unlock() + if !work { + return + } select { case mq.work <- struct{}{}: default: @@ -361,9 +379,15 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // one passed in for _, e := range entries { if e.Cancel { - mq.out.Cancel(e.Cid) + if mq.wl.Remove(e.Cid, ses) { + work = true + mq.out.Cancel(e.Cid) + } } else { - mq.out.AddEntry(e.Cid, e.Priority) + if mq.wl.Add(e.Cid, e.Priority, ses) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } } } } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 648bfa40378..a899f06bb2c 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -49,7 +49,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) taskWorker(ctx context.Context, id int) { idmap := logging.LoggableMap{"ID": id} - defer log.Info("bitswap task worker shutting down...") + defer log.Debug("bitswap task worker shutting down...") for { log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap) select { @@ -73,8 +73,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { bs.wm.SendBlock(ctx, envelope) bs.counterLk.Lock() - bs.blocksSent++ - bs.dataSent += uint64(len(envelope.Block.RawData())) + bs.counters.blocksSent++ + bs.counters.dataSent += uint64(len(envelope.Block.RawData())) bs.counterLk.Unlock() case <-ctx.Done(): return diff --git a/exchange/interface.go b/exchange/interface.go index fb590d25a7f..ac494ff9934 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,10 +13,7 @@ import ( // Any type that implements exchange.Interface may be used as an IPFS block // exchange protocol. type Interface interface { // type Exchanger interface - // GetBlock returns the block associated with a given key. - GetBlock(context.Context, *cid.Cid) (blocks.Block, error) - - GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) + Fetcher // TODO Should callers be concerned with whether the block was made // available on the network? @@ -26,3 +23,10 @@ type Interface interface { // type Exchanger interface io.Closer } + +// Fetcher is an object that can be used to retrieve blocks +type Fetcher interface { + // GetBlock returns the block associated with a given key. + GetBlock(context.Context, *cid.Cid) (blocks.Block, error) + GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index d23a42d078f..587c481cbec 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -155,17 +155,39 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks { return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { node, err := serv.Get(ctx, c) if err != nil { + if err == bserv.ErrNotFound { + err = ErrNotFound + } return nil, err } return node.Links(), nil } } +type sesGetter struct { + bs *bserv.Session +} + +func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) { + blk, err := sg.bs.GetBlock(ctx, c) + if err != nil { + return nil, err + } + + return decodeBlock(blk) +} + // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { + var ng node.NodeGetter = serv + ds, ok := serv.(*dagService) + if ok { + ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)} + } + v, _ := ctx.Value("progress").(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) } set := cid.NewSet() visit := func(c *cid.Cid) bool { @@ -176,7 +198,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { return false } } - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) } // FindLinks searches this nodes links for the given key,