From a159e6825c2c8c82b734e290e8fc2116d3afd429 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 8 May 2015 23:55:35 -0700 Subject: [PATCH 01/15] implement peermanager to control outgoing messages Also more refactoring of bitswap in general, including some perf improvements and eventlog removal. clean up, and buffer channels move some things around correctly buffer work messages more cleanup, and improve test perf remove unneccessary test revert changes to bitswap message, they werent necessary --- exchange/bitswap/bitswap.go | 88 ++------ exchange/bitswap/bitswap_test.go | 35 +-- exchange/bitswap/decision/engine.go | 22 +- exchange/bitswap/decision/engine_test.go | 2 +- .../bitswap/decision/peer_request_queue.go | 2 +- exchange/bitswap/message/message.go | 11 +- exchange/bitswap/network/interface.go | 2 + exchange/bitswap/network/ipfs_impl.go | 4 + exchange/bitswap/peermanager.go | 203 ++++++++++++++++++ exchange/bitswap/testnet/virtual.go | 9 + exchange/bitswap/testutils.go | 11 +- exchange/bitswap/workers.go | 6 +- 12 files changed, 275 insertions(+), 120 deletions(-) create mode 100644 exchange/bitswap/peermanager.go diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 757c9067eb9..b8dcdab1e9b 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -91,7 +91,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), + pm: NewPeerManager(network), } + go bs.pm.Run(ctx) network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -108,6 +110,10 @@ type Bitswap struct { // network delivers messages on behalf of the session network bsnet.BitSwapNetwork + // the peermanager manages sending messages to peers in a way that + // wont block bitswap operation + pm *PeerManager + // blockstore is the local database // NB: ensure threadsafety blockstore blockstore.Blockstore @@ -217,7 +223,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { - log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -227,6 +232,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { if err := bs.blockstore.Put(blk); err != nil { return err } + bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) select { @@ -239,7 +245,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() - wg := sync.WaitGroup{} loop: for { @@ -253,37 +258,22 @@ loop: continue } - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - if err := bs.send(ctx, p, m); err != nil { - log.Debug(err) // TODO remove if too verbose - } - }(peerToQuery) + bs.pm.Send(peerToQuery, m) case <-ctx.Done(): return nil } } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } return nil } func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { + entries := bs.wantlist.Entries() + if len(entries) == 0 { + return nil + } message := bsmsg.New() message.SetFull(true) - for _, wanted := range bs.wantlist.Entries() { + for _, wanted := range entries { message.AddEntry(wanted.Key, wanted.Priority) } return bs.sendWantlistMsgToPeers(ctx, message, peers) @@ -326,7 +316,7 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() + //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() // This call records changes to wantlists, blocks received, // and number of bytes transfered. @@ -356,6 +346,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? + bs.pm.Connected(p) peers := make(chan peer.ID, 1) peers <- p close(peers) @@ -367,6 +358,7 @@ func (bs *Bitswap) PeerConnected(p peer.ID) { // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } @@ -381,19 +373,7 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { message.Cancel(k) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Warningf("Error sending message: %s", err) - return - } - }(p) - } - wg.Wait() + bs.pm.Broadcast(message) return } @@ -408,29 +388,7 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { message.AddEntry(k, kMaxPriority-i) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Debugf("Error sending message: %s", err) - } - }(p) - } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } + bs.pm.Broadcast(message) } func (bs *Bitswap) ReceiveError(err error) { @@ -439,16 +397,6 @@ func (bs *Bitswap) ReceiveError(err error) { // TODO bubble the network error up to the parent context/error logger } -// send strives to ensure that accounting is always performed when a message is -// sent -func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "sendMessage", p, m).Done() - if err := bs.network.SendMessage(ctx, p, m); err != nil { - return err - } - return bs.engine.MessageSent(p, m) -} - func (bs *Bitswap) Close() error { return bs.process.Close() } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 354eb73e5af..c04946692ec 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -13,7 +13,6 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" - p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" u "github.com/ipfs/go-ipfs/util" @@ -36,30 +35,6 @@ func TestClose(t *testing.T) { bitswap.Exchange.GetBlock(context.Background(), block.Key()) } -func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this - - rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewTestSessionGenerator(net) - defer g.Close() - - block := blocks.NewBlock([]byte("block")) - pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) - rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network - - solo := g.Next() - defer solo.Exchange.Close() - - ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) - _, err := solo.Exchange.GetBlock(ctx, block.Key()) - - if err != context.DeadlineExceeded { - t.Fatal("Expected DeadlineExceeded error") - } -} - -// TestGetBlockAfterRequesting... - func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) @@ -67,14 +42,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { g := NewTestSessionGenerator(net) defer g.Close() - hasBlock := g.Next() + peers := g.Instances(2) + hasBlock := peers[0] defer hasBlock.Exchange.Close() if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } - wantsBlock := g.Next() + wantsBlock := peers[1] defer wantsBlock.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Second) @@ -196,8 +172,9 @@ func TestSendToWantingPeer(t *testing.T) { prev := rebroadcastDelay.Set(time.Second / 2) defer func() { rebroadcastDelay.Set(prev) }() - peerA := sg.Next() - peerB := sg.Next() + peers := sg.Instances(2) + peerA := peers[0] + peerB := peers[1] t.Logf("Session %v\n", peerA.Peer) t.Logf("Session %v\n", peerB.Peer) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 60b95e469b7..0b08a55fb44 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -5,6 +5,7 @@ import ( "sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + blocks "github.com/ipfs/go-ipfs/blocks" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" @@ -53,8 +54,9 @@ const ( type Envelope struct { // Peer is the intended recipient Peer peer.ID - // Message is the payload - Message bsmsg.BitSwapMessage + + // Block is the payload + Block *blocks.Block // A callback to notify the decision queue that the task is complete Sent func() @@ -151,12 +153,10 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { continue } - m := bsmsg.New() // TODO: maybe add keys from our wantlist? - m.AddBlock(block) return &Envelope{ - Peer: nextTask.Target, - Message: m, - Sent: nextTask.Done, + Peer: nextTask.Target, + Block: block, + Sent: nextTask.Done, }, nil } } @@ -185,7 +185,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { defer e.lock.Unlock() if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { - log.Debug("received empty message from", p) + log.Debugf("received empty message from %s", p) } newWorkExists := false @@ -202,11 +202,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debug("cancel", entry.Key) + log.Debugf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debug("wants", entry.Key, entry.Priority) + log.Debugf("wants %s", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) @@ -216,7 +216,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, block := range m.Blocks() { - log.Debug("got block %s %d bytes", block.Key(), len(block.Data)) + log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) for _, l := range e.ledgerMap { if entry, ok := l.WantListContains(block.Key()); ok { diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index afe6ba9addd..31e46c77635 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { next := <-e.Outbox() envelope := <-next - received := envelope.Message.Blocks()[0] + received := envelope.Block expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 42928487dcc..15f52da745b 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) { // taskKey returns a key that uniquely identifies a task. func taskKey(p peer.ID, k u.Key) string { - return string(p.String() + k.String()) + return string(p) + string(k) } // FIFO is a basic task comparator that returns tasks in the order created. diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 3a7d70aae0e..4e88e738c31 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -29,6 +29,8 @@ type BitSwapMessage interface { Cancel(key u.Key) + Empty() bool + // Sets whether or not the contained wantlist represents the entire wantlist // true = full wantlist // false = wantlist 'patch' @@ -51,7 +53,7 @@ type Exportable interface { type impl struct { full bool wantlist map[u.Key]Entry - blocks map[u.Key]*blocks.Block // map to detect duplicates + blocks map[u.Key]*blocks.Block } func New() BitSwapMessage { @@ -92,6 +94,10 @@ func (m *impl) Full() bool { return m.full } +func (m *impl) Empty() bool { + return len(m.blocks) == 0 && len(m.wantlist) == 0 +} + func (m *impl) Wantlist() []Entry { var out []Entry for _, e := range m.wantlist { @@ -101,7 +107,7 @@ func (m *impl) Wantlist() []Entry { } func (m *impl) Blocks() []*blocks.Block { - bs := make([]*blocks.Block, 0) + bs := make([]*blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } @@ -109,6 +115,7 @@ func (m *impl) Blocks() []*blocks.Block { } func (m *impl) Cancel(k u.Key) { + delete(m.wantlist, k) m.addEntry(k, 0, true) } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index a6ed070c03c..849a1c28e1e 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -23,6 +23,8 @@ type BitSwapNetwork interface { // network. SetDelegate(Receiver) + ConnectTo(context.Context, peer.ID) error + Routing } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 97745e32da2..4e5a1317f58 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) { bsnet.receiver = r } +func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { + return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}) +} + // FindProvidersAsync returns a channel of providers for the given key func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go new file mode 100644 index 00000000000..ff3d9ab31f0 --- /dev/null +++ b/exchange/bitswap/peermanager.go @@ -0,0 +1,203 @@ +package bitswap + +import ( + "sync" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + peer "github.com/ipfs/go-ipfs/p2p/peer" + u "github.com/ipfs/go-ipfs/util" +) + +type PeerManager struct { + receiver bsnet.Receiver + + incoming chan *msgPair + connect chan peer.ID + disconnect chan peer.ID + + peers map[peer.ID]*msgQueue + + network bsnet.BitSwapNetwork +} + +func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { + return &PeerManager{ + incoming: make(chan *msgPair, 10), + connect: make(chan peer.ID, 10), + disconnect: make(chan peer.ID, 10), + peers: make(map[peer.ID]*msgQueue), + network: network, + } +} + +type msgPair struct { + to peer.ID + msg bsmsg.BitSwapMessage +} + +type cancellation struct { + who peer.ID + blk u.Key +} + +type msgQueue struct { + p peer.ID + + lk sync.Mutex + wlmsg bsmsg.BitSwapMessage + + work chan struct{} + done chan struct{} +} + +func (pm *PeerManager) SendBlock(env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msg := bsmsg.New() + msg.AddBlock(env.Block) + err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + if err != nil { + log.Error(err) + } +} + +func (pm *PeerManager) startPeerHandler(p peer.ID) { + _, ok := pm.peers[p] + if ok { + // TODO: log an error? + return + } + + mq := new(msgQueue) + mq.done = make(chan struct{}) + mq.work = make(chan struct{}, 1) + mq.p = p + + pm.peers[p] = mq + go pm.runQueue(mq) +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peers[p] + if !ok { + // TODO: log error? + return + } + + close(pq.done) + delete(pm.peers, p) +} + +func (pm *PeerManager) runQueue(mq *msgQueue) { + for { + select { + case <-mq.work: // there is work to be done + + // TODO: this might not need to be done every time, figure out + // a good heuristic + err := pm.network.ConnectTo(context.TODO(), mq.p) + if err != nil { + log.Error(err) + // TODO: cant connect, what now? + } + + // grab messages from queue + mq.lk.Lock() + wlm := mq.wlmsg + mq.wlmsg = nil + mq.lk.Unlock() + + if wlm != nil && !wlm.Empty() { + // send wantlist updates + err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + if err != nil { + log.Error("bitswap send error: ", err) + // TODO: what do we do if this fails? + } + } + case <-mq.done: + return + } + } +} + +func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { + if len(msg.Blocks()) > 0 { + panic("no blocks here!") + } + pm.incoming <- &msgPair{to: to, msg: msg} +} + +func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) { + pm.incoming <- &msgPair{msg: msg} +} + +func (pm *PeerManager) Connected(p peer.ID) { + pm.connect <- p +} + +func (pm *PeerManager) Disconnected(p peer.ID) { + pm.disconnect <- p +} + +// TODO: use goprocess here once i trust it +func (pm *PeerManager) Run(ctx context.Context) { + for { + select { + case msgp := <-pm.incoming: + + // Broadcast message to all if recipient not set + if msgp.to == "" { + for _, p := range pm.peers { + p.addMessage(msgp.msg) + } + continue + } + + p, ok := pm.peers[msgp.to] + if !ok { + //TODO: decide, drop message? or dial? + pm.startPeerHandler(msgp.to) + p = pm.peers[msgp.to] + } + + p.addMessage(msgp.msg) + case p := <-pm.connect: + pm.startPeerHandler(p) + case p := <-pm.disconnect: + pm.stopPeerHandler(p) + case <-ctx.Done(): + return + } + } +} + +func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { + mq.lk.Lock() + defer func() { + mq.lk.Unlock() + select { + case mq.work <- struct{}{}: + default: + } + }() + + if mq.wlmsg == nil || msg.Full() { + mq.wlmsg = msg + return + } + + // TODO: add a msg.Combine(...) method + for _, e := range msg.Wantlist() { + if e.Cancel { + mq.wlmsg.Cancel(e.Key) + } else { + mq.wlmsg.AddEntry(e.Key, e.Priority) + } + } +} diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index feb5fd722a9..f2c814f819e 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -119,3 +119,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } + +func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { + if !nc.network.HasPeer(p) { + return errors.New("no such peer in network") + } + nc.network.clients[p].PeerConnected(nc.local) + nc.Receiver.PeerConnected(p) + return nil +} diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 2ce035c3de8..47930de694d 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -7,7 +7,6 @@ import ( ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" - exchange "github.com/ipfs/go-ipfs/exchange" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" peer "github.com/ipfs/go-ipfs/p2p/peer" p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" @@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance { inst := g.Next() instances = append(instances, inst) } + for i, inst := range instances { + for j := i + 1; j < len(instances); j++ { + oinst := instances[j] + inst.Exchange.PeerConnected(oinst.Peer) + } + } return instances } type Instance struct { Peer peer.ID - Exchange exchange.Interface + Exchange *Bitswap blockstore blockstore.Blockstore blockstoreDelay delay.D @@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance const alwaysSendToPeer = true - bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer) + bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap) return Instance{ Peer: p.ID(), diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index dff3d911c20..c6c2bbb25de 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -70,9 +70,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { if !ok { continue } - log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - envelope.Sent() + + //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) + bs.pm.SendBlock(envelope) case <-ctx.Done(): return } From 8443b99c1d8060f626079114f0bea9f0e2784a2f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 12 May 2015 23:50:57 -0700 Subject: [PATCH 02/15] update comments and reintroduce test --- exchange/bitswap/bitswap_test.go | 23 +++++++++++++++++++++++ exchange/bitswap/peermanager.go | 30 +++++++++++++++++------------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index c04946692ec..9f9fbae2599 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -13,6 +13,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" + p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" u "github.com/ipfs/go-ipfs/util" @@ -35,6 +36,28 @@ func TestClose(t *testing.T) { bitswap.Exchange.GetBlock(context.Background(), block.Key()) } +func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this + + rs := mockrouting.NewServer() + net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) + g := NewTestSessionGenerator(net) + defer g.Close() + + block := blocks.NewBlock([]byte("block")) + pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) + rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network + + solo := g.Next() + defer solo.Exchange.Close() + + ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) + _, err := solo.Exchange.GetBlock(ctx, block.Key()) + + if err != context.DeadlineExceeded { + t.Fatal("Expected DeadlineExceeded error") + } +} + func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index ff3d9ab31f0..a91acd45bb6 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -46,8 +46,8 @@ type cancellation struct { type msgQueue struct { p peer.ID - lk sync.Mutex - wlmsg bsmsg.BitSwapMessage + outlk sync.Mutex + out bsmsg.BitSwapMessage work chan struct{} done chan struct{} @@ -106,11 +106,11 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { // TODO: cant connect, what now? } - // grab messages from queue - mq.lk.Lock() - wlm := mq.wlmsg - mq.wlmsg = nil - mq.lk.Unlock() + // grab outgoin message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() if wlm != nil && !wlm.Empty() { // send wantlist updates @@ -178,26 +178,30 @@ func (pm *PeerManager) Run(ctx context.Context) { } func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { - mq.lk.Lock() + mq.outlk.Lock() defer func() { - mq.lk.Unlock() + mq.outlk.Unlock() select { case mq.work <- struct{}{}: default: } }() - if mq.wlmsg == nil || msg.Full() { - mq.wlmsg = msg + // if we have no message held, or the one we are given is full + // overwrite the one we are holding + if mq.out == nil || msg.Full() { + mq.out = msg return } // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in for _, e := range msg.Wantlist() { if e.Cancel { - mq.wlmsg.Cancel(e.Key) + mq.out.Cancel(e.Key) } else { - mq.wlmsg.AddEntry(e.Key, e.Priority) + mq.out.AddEntry(e.Key, e.Priority) } } } From ef967ceeef50995877c0245ccbde1bd87a0fe9c9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 13 May 2015 16:35:08 -0700 Subject: [PATCH 03/15] contextify peermanager --- exchange/bitswap/bitswap.go | 2 -- exchange/bitswap/decision/engine.go | 2 +- exchange/bitswap/peermanager.go | 22 +++++++++++----------- exchange/bitswap/workers.go | 4 ++-- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index b8dcdab1e9b..a05ea80910e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -316,8 +316,6 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() - // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 0b08a55fb44..2644885d3b1 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -206,7 +206,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debugf("wants %s", entry.Key, entry.Priority) + log.Debugf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index a91acd45bb6..a1ce7c7a87c 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -53,24 +53,24 @@ type msgQueue struct { done chan struct{} } -func (pm *PeerManager) SendBlock(env *engine.Envelope) { +func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() msg := bsmsg.New() msg.AddBlock(env.Block) - err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) } } -func (pm *PeerManager) startPeerHandler(p peer.ID) { +func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? - return + return nil } mq := new(msgQueue) @@ -79,7 +79,8 @@ func (pm *PeerManager) startPeerHandler(p peer.ID) { mq.p = p pm.peers[p] = mq - go pm.runQueue(mq) + go pm.runQueue(ctx, mq) + return mq } func (pm *PeerManager) stopPeerHandler(p peer.ID) { @@ -93,14 +94,14 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *PeerManager) runQueue(mq *msgQueue) { +func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { for { select { case <-mq.work: // there is work to be done // TODO: this might not need to be done every time, figure out // a good heuristic - err := pm.network.ConnectTo(context.TODO(), mq.p) + err := pm.network.ConnectTo(ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -114,7 +115,7 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { if wlm != nil && !wlm.Empty() { // send wantlist updates - err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + err = pm.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -162,13 +163,12 @@ func (pm *PeerManager) Run(ctx context.Context) { p, ok := pm.peers[msgp.to] if !ok { //TODO: decide, drop message? or dial? - pm.startPeerHandler(msgp.to) - p = pm.peers[msgp.to] + p = pm.startPeerHandler(ctx, msgp.to) } p.addMessage(msgp.msg) case p := <-pm.connect: - pm.startPeerHandler(p) + pm.startPeerHandler(ctx, p) case p := <-pm.disconnect: pm.stopPeerHandler(p) case <-ctx.Done(): diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index c6c2bbb25de..ba9a77549ca 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { bs.rebroadcastWorker(ctx) }) + // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { bs.provideCollector(ctx) }) @@ -71,8 +72,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { continue } - //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.pm.SendBlock(envelope) + bs.pm.SendBlock(ctx, envelope) case <-ctx.Done(): return } From 6bf33ad62fc4d209932fb8b81b4f2dc28ca6123a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 12:30:13 -0700 Subject: [PATCH 04/15] WIP: super awesome bitswap cleanup fixtime --- exchange/bitswap/bitswap.go | 134 +++------------ exchange/bitswap/bitswap_test.go | 14 +- exchange/bitswap/decision/engine.go | 16 +- .../bitswap/decision/peer_request_queue.go | 18 ++- exchange/bitswap/network/interface.go | 2 +- exchange/bitswap/peermanager.go | 152 ++++++++++++------ exchange/bitswap/testnet/network_test.go | 16 +- exchange/bitswap/testnet/virtual.go | 3 +- exchange/bitswap/workers.go | 45 +++--- 9 files changed, 191 insertions(+), 209 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index a05ea80910e..881de153807 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -4,7 +4,6 @@ package bitswap import ( "errors" - "fmt" "math" "sync" "time" @@ -23,7 +22,6 @@ import ( "github.com/ipfs/go-ipfs/thirdparty/delay" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" u "github.com/ipfs/go-ipfs/util" - pset "github.com/ipfs/go-ipfs/util/peerset" // TODO move this to peerstore ) var log = eventlog.Logger("bitswap") @@ -45,9 +43,7 @@ const ( provideWorkers = 4 ) -var ( - rebroadcastDelay = delay.Fixed(time.Second * 10) -) +var rebroadcastDelay = delay.Fixed(time.Second * 10) // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network @@ -86,14 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - wantlist: wantlist.NewThreadSafe(), batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), - pm: NewPeerManager(network), + wm: NewWantManager(network), } - go bs.pm.Run(ctx) + go bs.wm.Run(ctx) network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -112,7 +107,7 @@ type Bitswap struct { // the peermanager manages sending messages to peers in a way that // wont block bitswap operation - pm *PeerManager + wm *WantManager // blockstore is the local database // NB: ensure threadsafety @@ -127,8 +122,6 @@ type Bitswap struct { engine *decision.Engine - wantlist *wantlist.ThreadSafe - process process.Process newBlocks chan *blocks.Block @@ -233,60 +226,21 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return err } - bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) select { case bs.newBlocks <- blk: + // send block off to be reprovided case <-ctx.Done(): return ctx.Err() } return nil } -func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { - set := pset.New() - -loop: - for { - select { - case peerToQuery, ok := <-peers: - if !ok { - break loop - } - - if !set.TryAdd(peerToQuery) { //Do once per peer - continue - } - - bs.pm.Send(peerToQuery, m) - case <-ctx.Done(): - return nil - } - } - return nil -} - -func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { - entries := bs.wantlist.Entries() - if len(entries) == 0 { - return nil - } - message := bsmsg.New() - message.SetFull(true) - for _, wanted := range entries { - message.AddEntry(wanted.Key, wanted.Priority) - } - return bs.sendWantlistMsgToPeers(ctx, message, peers) -} - -func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) { +func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { ctx, cancel := context.WithCancel(ctx) defer cancel() - // prepare a channel to hand off to sendWantlistToPeers - sendToPeers := make(chan peer.ID) - // Get providers for all entries in wantlist (could take a while) wg := sync.WaitGroup{} for _, e := range entries { @@ -298,97 +252,61 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli defer cancel() providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - sendToPeers <- prov + go func(p peer.ID) { + bs.network.ConnectTo(ctx, p) + }(prov) } }(e.Key) } - go func() { - wg.Wait() // make sure all our children do finish. - close(sendToPeers) - }() - - err := bs.sendWantlistToPeers(ctx, sendToPeers) - if err != nil { - log.Debugf("sendWantlistToPeers error: %s", err) - } + wg.Wait() // make sure all our children do finish. } -// TODO(brian): handle errors -func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { +func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger + if len(incoming.Blocks()) == 0 { + return + } + + // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key + for _, block := range incoming.Blocks() { + keys = append(keys, block.Key()) + } + bs.wm.CancelWants(keys) + for _, block := range incoming.Blocks() { bs.blocksRecvd++ if has, err := bs.blockstore.Has(block.Key()); err == nil && has { bs.dupBlocksRecvd++ } log.Debugf("got block %s from %s", block, p) + hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { - return fmt.Errorf("ReceiveMessage HasBlock error: %s", err) + log.Warningf("ReceiveMessage HasBlock error: %s", err) } cancel() - keys = append(keys, block.Key()) } - - bs.cancelBlocks(ctx, keys) - return nil } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? - bs.pm.Connected(p) - peers := make(chan peer.ID, 1) - peers <- p - close(peers) - err := bs.sendWantlistToPeers(context.TODO(), peers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) - } + bs.wm.Connected(p) } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.pm.Disconnected(p) + bs.wm.Disconnected(p) bs.engine.PeerDisconnected(p) } -func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { - if len(bkeys) < 1 { - return - } - message := bsmsg.New() - message.SetFull(false) - for _, k := range bkeys { - log.Debug("cancel block: %s", k) - message.Cancel(k) - } - - bs.pm.Broadcast(message) - return -} - -func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { - if len(bkeys) < 1 { - return - } - - message := bsmsg.New() - message.SetFull(false) - for i, k := range bkeys { - message.AddEntry(k, kMaxPriority-i) - } - - bs.pm.Broadcast(message) -} - func (bs *Bitswap) ReceiveError(err error) { log.Debugf("Bitswap ReceiveError: %s", err) // TODO log the network error @@ -401,7 +319,7 @@ func (bs *Bitswap) Close() error { func (bs *Bitswap) GetWantlist() []u.Key { var out []u.Key - for _, e := range bs.wantlist.Entries() { + for _, e := range bs.wm.wl.Entries() { out = append(out, e.Key) } return out diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 9f9fbae2599..fa5b3b97d3d 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -120,6 +120,16 @@ func TestLargeFile(t *testing.T) { PerformDistributionTest(t, numInstances, numBlocks) } +func TestLargeFileTwoPeers(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Parallel() + numInstances := 2 + numBlocks := 100 + PerformDistributionTest(t, numInstances, numBlocks) +} + func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() @@ -129,8 +139,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { defer sg.Close() bg := blocksutil.NewBlockGenerator() - t.Log("Test a few nodes trying to get one file with a lot of blocks") - instances := sg.Instances(numInstances) blocks := bg.Blocks(numBlocks) @@ -238,7 +246,7 @@ func TestBasicBitswap(t *testing.T) { defer sg.Close() bg := blocksutil.NewBlockGenerator() - t.Log("Test a few nodes trying to get one file with a lot of blocks") + t.Log("Test a one node trying to get one block from another") instances := sg.Instances(2) blocks := bg.Blocks(1) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 2644885d3b1..186c7ba1a5b 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -92,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { bs: bs, peerRequestQueue: newPRQ(), outbox: make(chan (<-chan *Envelope), outboxChanBuffer), - workSignal: make(chan struct{}), + workSignal: make(chan struct{}, 1), } go e.taskWorker(ctx) return e @@ -156,7 +156,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { return &Envelope{ Peer: nextTask.Target, Block: block, - Sent: nextTask.Done, + Sent: func() { + nextTask.Done() + select { + case e.workSignal <- struct{}{}: + // work completing may mean that our queue will provide new + // work to be done. + default: + } + }, }, nil } } @@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debugf("cancel %s", entry.Key) + log.Errorf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debugf("wants %s - %d", entry.Key, entry.Priority) + log.Errorf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 15f52da745b..1d15578ede5 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { tl.partners[to] = partner } - if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { - task.Entry.Priority = entry.Priority - partner.taskQueue.Update(task.index) - return - } - partner.activelk.Lock() defer partner.activelk.Unlock() _, ok = partner.activeBlocks[entry.Key] @@ -64,6 +58,12 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { return } + if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { + task.Entry.Priority = entry.Priority + partner.taskQueue.Update(task.index) + return + } + task := &peerRequestTask{ Entry: entry, Target: to, @@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool { if pb.requests == 0 { return true } + if pa.active == pb.active { + // sorting by taskQueue.Len() aids in cleaning out trash entries faster + // if we sorted instead by requests, one peer could potentially build up + // a huge number of cancelled entries in the queue resulting in a memory leak + return pa.taskQueue.Len() > pb.taskQueue.Len() + } return pa.active < pb.active } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 849a1c28e1e..83fca07937a 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -33,7 +33,7 @@ type Receiver interface { ReceiveMessage( ctx context.Context, sender peer.ID, - incoming bsmsg.BitSwapMessage) error + incoming bsmsg.BitSwapMessage) ReceiveError(error) diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index a1ce7c7a87c..2eaf36fa51f 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -7,28 +7,36 @@ import ( engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" peer "github.com/ipfs/go-ipfs/p2p/peer" u "github.com/ipfs/go-ipfs/util" ) -type PeerManager struct { +type WantManager struct { receiver bsnet.Receiver - incoming chan *msgPair - connect chan peer.ID + incoming chan []*bsmsg.Entry + + // notification channel for new peers connecting + connect chan peer.ID + + // notification channel for peers disconnecting disconnect chan peer.ID peers map[peer.ID]*msgQueue + wl *wantlist.Wantlist + network bsnet.BitSwapNetwork } -func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { - return &PeerManager{ - incoming: make(chan *msgPair, 10), +func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { + return &WantManager{ + incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peers: make(map[peer.ID]*msgQueue), + wl: wantlist.New(), network: network, } } @@ -53,37 +61,68 @@ type msgQueue struct { done chan struct{} } -func (pm *PeerManager) SendBlock(ctx context.Context, env *engine.Envelope) { +func (pm *WantManager) WantBlocks(ks []u.Key) { + log.Error("WANT: ", ks) + pm.addEntries(ks, false) +} + +func (pm *WantManager) CancelWants(ks []u.Key) { + log.Error("CANCEL: ", ks) + pm.addEntries(ks, true) +} + +func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { + var entries []*bsmsg.Entry + for i, k := range ks { + entries = append(entries, &bsmsg.Entry{ + Cancel: cancel, + Entry: wantlist.Entry{ + Key: k, + Priority: kMaxPriority - i, + }, + }) + } + pm.incoming <- entries +} + +func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // Blocks need to be sent synchronously to maintain proper backpressure // throughout the network stack defer env.Sent() msg := bsmsg.New() msg.AddBlock(env.Block) + msg.SetFull(false) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) } } -func (pm *PeerManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { +func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? return nil } - mq := new(msgQueue) - mq.done = make(chan struct{}) - mq.work = make(chan struct{}, 1) - mq.p = p + mq := newMsgQueue(p) + + // new peer, we will want to give them our full wantlist + fullwantlist := bsmsg.New() + for _, e := range pm.wl.Entries() { + fullwantlist.AddEntry(e.Key, e.Priority) + } + fullwantlist.SetFull(true) + mq.out = fullwantlist + mq.work <- struct{}{} pm.peers[p] = mq go pm.runQueue(ctx, mq) return mq } -func (pm *PeerManager) stopPeerHandler(p peer.ID) { +func (pm *WantManager) stopPeerHandler(p peer.ID) { pq, ok := pm.peers[p] if !ok { // TODO: log error? @@ -94,32 +133,38 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { +func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { for { select { case <-mq.work: // there is work to be done - // TODO: this might not need to be done every time, figure out - // a good heuristic err := pm.network.ConnectTo(ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? } - // grab outgoin message + // grab outgoing message mq.outlk.Lock() wlm := mq.out mq.out = nil mq.outlk.Unlock() - if wlm != nil && !wlm.Empty() { - // send wantlist updates - err = pm.network.SendMessage(ctx, mq.p, wlm) - if err != nil { - log.Error("bitswap send error: ", err) - // TODO: what do we do if this fails? - } + // no message or empty message, continue + if wlm == nil { + log.Error("nil wantlist") + continue + } + if wlm.Empty() { + log.Error("empty wantlist") + continue + } + + // send wantlist updates + err = pm.network.SendMessage(ctx, mq.p, wlm) + if err != nil { + log.Error("bitswap send error: ", err) + // TODO: what do we do if this fails? } case <-mq.done: return @@ -127,46 +172,38 @@ func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) { } } -func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { - if len(msg.Blocks()) > 0 { - panic("no blocks here!") - } - pm.incoming <- &msgPair{to: to, msg: msg} -} - -func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) { - pm.incoming <- &msgPair{msg: msg} -} - -func (pm *PeerManager) Connected(p peer.ID) { +func (pm *WantManager) Connected(p peer.ID) { pm.connect <- p } -func (pm *PeerManager) Disconnected(p peer.ID) { +func (pm *WantManager) Disconnected(p peer.ID) { pm.disconnect <- p } // TODO: use goprocess here once i trust it -func (pm *PeerManager) Run(ctx context.Context) { +func (pm *WantManager) Run(ctx context.Context) { for { select { - case msgp := <-pm.incoming: - - // Broadcast message to all if recipient not set - if msgp.to == "" { - for _, p := range pm.peers { - p.addMessage(msgp.msg) + case entries := <-pm.incoming: + + msg := bsmsg.New() + msg.SetFull(false) + // add changes to our wantlist + for _, e := range entries { + if e.Cancel { + pm.wl.Remove(e.Key) + msg.Cancel(e.Key) + } else { + pm.wl.Add(e.Key, e.Priority) + msg.AddEntry(e.Key, e.Priority) } - continue } - p, ok := pm.peers[msgp.to] - if !ok { - //TODO: decide, drop message? or dial? - p = pm.startPeerHandler(ctx, msgp.to) + // broadcast those wantlist changes + for _, p := range pm.peers { + p.addMessage(msg) } - p.addMessage(msgp.msg) case p := <-pm.connect: pm.startPeerHandler(ctx, p) case p := <-pm.disconnect: @@ -177,6 +214,15 @@ func (pm *PeerManager) Run(ctx context.Context) { } } +func newMsgQueue(p peer.ID) *msgQueue { + mq := new(msgQueue) + mq.done = make(chan struct{}) + mq.work = make(chan struct{}, 1) + mq.p = p + + return mq +} + func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { mq.outlk.Lock() defer func() { @@ -187,6 +233,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { } }() + if msg.Full() { + log.Error("GOt FULL MESSAGE") + } + // if we have no message held, or the one we are given is full // overwrite the one we are holding if mq.out == nil || msg.Full() { @@ -199,8 +249,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { // one passed in for _, e := range msg.Wantlist() { if e.Cancel { + log.Error("add message cancel: ", e.Key, mq.p) mq.out.Cancel(e.Key) } else { + log.Error("add message want: ", e.Key, mq.p) mq.out.AddEntry(e.Key, e.Priority) } } diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 9091ff255c6..c963ae9ac5c 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { responder.SetDelegate(lambda(func( ctx context.Context, fromWaiter peer.ID, - msgFromWaiter bsmsg.BitSwapMessage) error { + msgFromWaiter bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) waiter.SendMessage(ctx, fromWaiter, msgToWaiter) - - return nil })) waiter.SetDelegate(lambda(func( ctx context.Context, fromResponder peer.ID, - msgFromResponder bsmsg.BitSwapMessage) error { + msgFromResponder bsmsg.BitSwapMessage) { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false @@ -54,9 +52,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { if !ok { t.Fatal("Message not received from the responder") - } - return nil })) messageSentAsync := bsmsg.New() @@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } type receiverFunc func(ctx context.Context, p peer.ID, - incoming bsmsg.BitSwapMessage) error + incoming bsmsg.BitSwapMessage) // lambda returns a Receiver instance given a receiver function func lambda(f receiverFunc) bsnet.Receiver { @@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver { } type lambdaImpl struct { - f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error + f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) } func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, - p peer.ID, incoming bsmsg.BitSwapMessage) error { - return lam.f(ctx, p, incoming) + p peer.ID, incoming bsmsg.BitSwapMessage) { + lam.f(ctx, p, incoming) } func (lam *lambdaImpl) ReceiveError(err error) { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index f2c814f819e..f8ca0cd5516 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -72,7 +72,8 @@ func (n *network) deliver( n.delay.Wait() - return r.ReceiveMessage(context.TODO(), from, message) + r.ReceiveMessage(context.TODO(), from, message) + return nil } type networkClient struct { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index ba9a77549ca..82fb40de939 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -42,9 +42,11 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { } // Start up a worker to manage periodically resending our wantlist out to peers - px.Go(func(px process.Process) { - bs.rebroadcastWorker(ctx) - }) + /* + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) + */ // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { @@ -72,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { continue } - bs.pm.SendBlock(ctx, envelope) + bs.wm.SendBlock(ctx, envelope) case <-ctx.Done(): return } @@ -146,30 +148,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) { log.Warning("Received batch request for zero blocks") continue } - for i, k := range keys { - bs.wantlist.Add(k, kMaxPriority-i) - } - done := make(chan struct{}) - go func() { - bs.wantNewBlocks(req.ctx, keys) - close(done) - }() + bs.wm.WantBlocks(keys) // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most // every situation. Later, this assumption may not hold as true. child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - err := bs.sendWantlistToPeers(req.ctx, providers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) + for p := range providers { + go bs.network.ConnectTo(req.ctx, p) } cancel() - // Wait for wantNewBlocks to finish - <-done - case <-parent.Done(): return } @@ -180,22 +171,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() - broadcastSignal := time.After(rebroadcastDelay.Get()) - tick := time.Tick(10 * time.Second) + broadcastSignal := time.NewTicker(rebroadcastDelay.Get()) + defer broadcastSignal.Stop() + + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() for { select { - case <-tick: - n := bs.wantlist.Len() + case <-tick.C: + n := bs.wm.wl.Len() if n > 0 { log.Debug(n, "keys in bitswap wantlist") } - case <-broadcastSignal: // resend unfulfilled wantlist keys - entries := bs.wantlist.Entries() + case <-broadcastSignal.C: // resend unfulfilled wantlist keys + entries := bs.wm.wl.Entries() if len(entries) > 0 { - bs.sendWantlistToProviders(ctx, entries) + bs.connectToProviders(ctx, entries) } - broadcastSignal = time.After(rebroadcastDelay.Get()) case <-parent.Done(): return } From 32da687774427bdd44efd1686218448e8c834fd0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 14:26:29 -0700 Subject: [PATCH 05/15] fix race bugs --- exchange/bitswap/bitswap.go | 3 +++ exchange/bitswap/decision/engine.go | 4 ++-- exchange/bitswap/message/message.go | 2 +- exchange/bitswap/peermanager.go | 37 +++++++---------------------- exchange/bitswap/stat.go | 2 ++ 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 881de153807..6a1e58ff43a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -128,6 +128,7 @@ type Bitswap struct { provideKeys chan u.Key + counterLk sync.Mutex blocksRecvd int dupBlocksRecvd int } @@ -281,10 +282,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.wm.CancelWants(keys) for _, block := range incoming.Blocks() { + bs.counterLk.Lock() bs.blocksRecvd++ if has, err := bs.blockstore.Has(block.Key()); err == nil && has { bs.dupBlocksRecvd++ } + bs.counterLk.Unlock() log.Debugf("got block %s from %s", block, p) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 186c7ba1a5b..d08636d800c 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -210,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Errorf("cancel %s", entry.Key) + log.Debugf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Errorf("wants %s - %d", entry.Key, entry.Priority) + log.Debugf("wants %s - %d", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 4e88e738c31..63f7f28b5ff 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -162,7 +162,7 @@ func (m *impl) ToProto() *pb.Message { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{ Block: proto.String(string(e.Key)), Priority: proto.Int32(int32(e.Priority)), - Cancel: &e.Cancel, + Cancel: proto.Bool(e.Cancel), }) } for _, b := range m.Blocks() { diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go index 2eaf36fa51f..8ec89c8e3c8 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/peermanager.go @@ -62,12 +62,10 @@ type msgQueue struct { } func (pm *WantManager) WantBlocks(ks []u.Key) { - log.Error("WANT: ", ks) pm.addEntries(ks, false) } func (pm *WantManager) CancelWants(ks []u.Key) { - log.Error("CANCEL: ", ks) pm.addEntries(ks, true) } @@ -147,18 +145,12 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { // grab outgoing message mq.outlk.Lock() wlm := mq.out - mq.out = nil - mq.outlk.Unlock() - - // no message or empty message, continue - if wlm == nil { - log.Error("nil wantlist") - continue - } - if wlm.Empty() { - log.Error("empty wantlist") + if wlm == nil || wlm.Empty() { + mq.outlk.Unlock() continue } + mq.out = nil + mq.outlk.Unlock() // send wantlist updates err = pm.network.SendMessage(ctx, mq.p, wlm) @@ -186,22 +178,18 @@ func (pm *WantManager) Run(ctx context.Context) { select { case entries := <-pm.incoming: - msg := bsmsg.New() - msg.SetFull(false) // add changes to our wantlist for _, e := range entries { if e.Cancel { pm.wl.Remove(e.Key) - msg.Cancel(e.Key) } else { pm.wl.Add(e.Key, e.Priority) - msg.AddEntry(e.Key, e.Priority) } } // broadcast those wantlist changes for _, p := range pm.peers { - p.addMessage(msg) + p.addMessage(entries) } case p := <-pm.connect: @@ -223,7 +211,7 @@ func newMsgQueue(p peer.ID) *msgQueue { return mq } -func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { +func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { mq.outlk.Lock() defer func() { mq.outlk.Unlock() @@ -233,26 +221,19 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { } }() - if msg.Full() { - log.Error("GOt FULL MESSAGE") - } - // if we have no message held, or the one we are given is full // overwrite the one we are holding - if mq.out == nil || msg.Full() { - mq.out = msg - return + if mq.out == nil { + mq.out = bsmsg.New() } // TODO: add a msg.Combine(...) method // otherwise, combine the one we are holding with the // one passed in - for _, e := range msg.Wantlist() { + for _, e := range entries { if e.Cancel { - log.Error("add message cancel: ", e.Key, mq.p) mq.out.Cancel(e.Key) } else { - log.Error("add message want: ", e.Key, mq.p) mq.out.AddEntry(e.Key, e.Priority) } } diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index ceab4b2ee2a..a4db4c9c578 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -17,8 +17,10 @@ func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() + bs.counterLk.Lock() st.BlocksReceived = bs.blocksRecvd st.DupBlksReceived = bs.dupBlocksRecvd + bs.counterLk.Unlock() for _, p := range bs.engine.Peers() { st.Peers = append(st.Peers, p.Pretty()) From 65f815a27b20c75e2e59043176f3452f642b53b5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 17:16:09 -0700 Subject: [PATCH 06/15] move taskdone inside lock boundaries --- exchange/bitswap/decision/peer_request_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 1d15578ede5..397a1622332 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -69,8 +69,8 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { Target: to, created: time.Now(), Done: func() { - partner.TaskDone(entry.Key) tl.lock.Lock() + partner.TaskDone(entry.Key) tl.pQueue.Update(partner.Index()) tl.lock.Unlock() }, From 6ab4bfea95464a3e995425a716213ed342dbeac5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 17:46:26 -0700 Subject: [PATCH 07/15] turn tests down a bit and better context passing --- exchange/bitswap/bitswap.go | 4 +-- exchange/bitswap/bitswap_test.go | 4 +-- .../{peermanager.go => wantmanager.go} | 26 ++++++++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) rename exchange/bitswap/{peermanager.go => wantmanager.go} (89%) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 6a1e58ff43a..c6f3c74a9fe 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -86,9 +86,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), - wm: NewWantManager(network), + wm: NewWantManager(ctx, network), } - go bs.wm.Run(ctx) + go bs.wm.Run() network.SetDelegate(bs) // Start up bitswaps async worker routines diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index fa5b3b97d3d..86eb2d764cc 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -92,7 +92,7 @@ func TestLargeSwarm(t *testing.T) { if testing.Short() { t.SkipNow() } - numInstances := 500 + numInstances := 100 numBlocks := 2 if detectrace.WithRace() { // when running with the race detector, 500 instances launches @@ -124,7 +124,6 @@ func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() } - t.Parallel() numInstances := 2 numBlocks := 100 PerformDistributionTest(t, numInstances, numBlocks) @@ -164,6 +163,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } + log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/wantmanager.go similarity index 89% rename from exchange/bitswap/peermanager.go rename to exchange/bitswap/wantmanager.go index 8ec89c8e3c8..3b2067914fb 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/wantmanager.go @@ -28,9 +28,11 @@ type WantManager struct { wl *wantlist.Wantlist network bsnet.BitSwapNetwork + + ctx context.Context } -func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { +func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { return &WantManager{ incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), @@ -38,6 +40,7 @@ func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { peers: make(map[peer.ID]*msgQueue), wl: wantlist.New(), network: network, + ctx: ctx, } } @@ -80,7 +83,10 @@ func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { }, }) } - pm.incoming <- entries + select { + case pm.incoming <- entries: + case <-pm.ctx.Done(): + } } func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { @@ -97,7 +103,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { } } -func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { +func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? @@ -116,7 +122,7 @@ func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueu mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(ctx, mq) + go pm.runQueue(mq) return mq } @@ -131,12 +137,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { +func (pm *WantManager) runQueue(mq *msgQueue) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(ctx, mq.p) + err := pm.network.ConnectTo(pm.ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -153,7 +159,7 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(ctx, mq.p, wlm) + err = pm.network.SendMessage(pm.ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -173,7 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { } // TODO: use goprocess here once i trust it -func (pm *WantManager) Run(ctx context.Context) { +func (pm *WantManager) Run() { for { select { case entries := <-pm.incoming: @@ -193,10 +199,10 @@ func (pm *WantManager) Run(ctx context.Context) { } case p := <-pm.connect: - pm.startPeerHandler(ctx, p) + pm.startPeerHandler(p) case p := <-pm.disconnect: pm.stopPeerHandler(p) - case <-ctx.Done(): + case <-pm.ctx.Done(): return } } From 594c7786c3349d7b83c11d677329e7ebd79451ae Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 20:38:42 -0700 Subject: [PATCH 08/15] turn rebroadcast back on --- exchange/bitswap/workers.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 82fb40de939..1083566a1fb 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -42,11 +42,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { } // Start up a worker to manage periodically resending our wantlist out to peers - /* - px.Go(func(px process.Process) { - bs.rebroadcastWorker(ctx) - }) - */ + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) // Start up a worker to manage sending out provides messages px.Go(func(px process.Process) { From 829b88420e0a50c9d03f6ef5689123baec8487fb Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 16 May 2015 22:08:18 -0700 Subject: [PATCH 09/15] explicitly set bitswap message fullness --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/bitswap_test.go | 1 - exchange/bitswap/decision/engine_test.go | 8 ++++---- exchange/bitswap/message/message.go | 22 ++++++---------------- exchange/bitswap/message/message_test.go | 14 +++++++------- exchange/bitswap/testnet/network_test.go | 4 ++-- exchange/bitswap/wantmanager.go | 23 ++++++++++++++++++----- 7 files changed, 38 insertions(+), 36 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index c6f3c74a9fe..57359c0ecf2 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.dupBlocksRecvd++ } bs.counterLk.Unlock() - log.Debugf("got block %s from %s", block, p) + log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 86eb2d764cc..6548472c9dd 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -163,7 +163,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } - log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index 31e46c77635..8337c480032 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) { // Send messages from Ernie to Bert for i := 0; i < 1000; i++ { - m := message.New() + m := message.New(false) content := []string{"this", "is", "message", "i"} m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) @@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { sanfrancisco := newEngine(ctx, "sf") seattle := newEngine(ctx, "sea") - m := message.New() + m := message.New(true) sanfrancisco.Engine.MessageSent(seattle.Peer, m) seattle.Engine.MessageReceived(sanfrancisco.Peer, m) @@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { } func partnerWants(e *Engine, keys []string, partner peer.ID) { - add := message.New() + add := message.New(false) for i, letter := range keys { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Key(), math.MaxInt32-i) @@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) { } func partnerCancels(e *Engine, keys []string, partner peer.ID) { - cancels := message.New() + cancels := message.New(false) for _, k := range keys { block := blocks.NewBlock([]byte(k)) cancels.Cancel(block.Key()) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 63f7f28b5ff..d885bb373a8 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -31,12 +31,7 @@ type BitSwapMessage interface { Empty() bool - // Sets whether or not the contained wantlist represents the entire wantlist - // true = full wantlist - // false = wantlist 'patch' - // default: true - SetFull(isFull bool) - + // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool AddBlock(*blocks.Block) @@ -56,15 +51,15 @@ type impl struct { blocks map[u.Key]*blocks.Block } -func New() BitSwapMessage { - return newMsg() +func New(full bool) BitSwapMessage { + return newMsg(full) } -func newMsg() *impl { +func newMsg(full bool) *impl { return &impl{ blocks: make(map[u.Key]*blocks.Block), wantlist: make(map[u.Key]Entry), - full: true, + full: full, } } @@ -74,8 +69,7 @@ type Entry struct { } func newMessageFromProto(pbm pb.Message) BitSwapMessage { - m := newMsg() - m.SetFull(pbm.GetWantlist().GetFull()) + m := newMsg(pbm.GetWantlist().GetFull()) for _, e := range pbm.GetWantlist().GetEntries() { m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) } @@ -86,10 +80,6 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { return m } -func (m *impl) SetFull(full bool) { - m.full = full -} - func (m *impl) Full() bool { return m.full } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index cbeed88927d..7a6a28a0430 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -13,7 +13,7 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" - m := New() + m := New(true) m.AddEntry(u.Key(str), 1) if !wantlistContains(m.ToProto().GetWantlist(), str) { @@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) { strs = append(strs, "Celeritas") strs = append(strs, "Incendia") - m := New() + m := New(true) for _, str := range strs { block := blocks.NewBlock([]byte(str)) m.AddBlock(block) @@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) { func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} - m := New() + m := New(true) for _, s := range keystrs { m.AddEntry(u.Key(s), 1) } @@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) { func TestCopyProtoByValue(t *testing.T) { const str = "foo" - m := New() + m := New(true) protoBeforeAppend := m.ToProto() m.AddEntry(u.Key(str), 1) if wantlistContains(protoBeforeAppend.GetWantlist(), str) { @@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) { } func TestToNetFromNetPreservesWantList(t *testing.T) { - original := New() + original := New(true) original.AddEntry(u.Key("M"), 1) original.AddEntry(u.Key("B"), 1) original.AddEntry(u.Key("D"), 1) @@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { - original := New() + original := New(true) original.AddBlock(blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("F"))) @@ -172,7 +172,7 @@ func contains(strs []string, x string) bool { func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) - msg := New() + msg := New(true) msg.AddEntry(b.Key(), 1) msg.AddEntry(b.Key(), 1) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index c963ae9ac5c..9624df5f8e8 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -31,7 +31,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { fromWaiter peer.ID, msgFromWaiter bsmsg.BitSwapMessage) { - msgToWaiter := bsmsg.New() + msgToWaiter := bsmsg.New(true) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) waiter.SendMessage(ctx, fromWaiter, msgToWaiter) })) @@ -55,7 +55,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } })) - messageSentAsync := bsmsg.New() + messageSentAsync := bsmsg.New(true) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), responderPeer.ID(), messageSentAsync) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 3b2067914fb..eb49201a6dd 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -2,6 +2,7 @@ package bitswap import ( "sync" + "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" @@ -94,9 +95,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // throughout the network stack defer env.Sent() - msg := bsmsg.New() + msg := bsmsg.New(false) msg.AddBlock(env.Block) - msg.SetFull(false) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) @@ -113,11 +113,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq := newMsgQueue(p) // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New() + fullwantlist := bsmsg.New(true) for _, e := range pm.wl.Entries() { fullwantlist.AddEntry(e.Key, e.Priority) } - fullwantlist.SetFull(true) mq.out = fullwantlist mq.work <- struct{}{} @@ -180,6 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { // TODO: use goprocess here once i trust it func (pm *WantManager) Run() { + tock := time.NewTicker(rebroadcastDelay.Get()) for { select { case entries := <-pm.incoming: @@ -198,6 +198,19 @@ func (pm *WantManager) Run() { p.addMessage(entries) } + case <-tock.C: + // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) + var es []*bsmsg.Entry + for _, e := range pm.wl.Entries() { + es = append(es, &bsmsg.Entry{Entry: e}) + } + for _, p := range pm.peers { + p.outlk.Lock() + p.out = bsmsg.New(true) + p.outlk.Unlock() + + p.addMessage(es) + } case p := <-pm.connect: pm.startPeerHandler(p) case p := <-pm.disconnect: @@ -230,7 +243,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // if we have no message held, or the one we are given is full // overwrite the one we are holding if mq.out == nil { - mq.out = bsmsg.New() + mq.out = bsmsg.New(false) } // TODO: add a msg.Combine(...) method From 2eac921e1d27403539589a8abe684a6e43fb2f35 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 21 May 2015 01:11:57 -0400 Subject: [PATCH 10/15] fixup the bitswap readme --- exchange/bitswap/README.md | 82 +++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 46 deletions(-) diff --git a/exchange/bitswap/README.md b/exchange/bitswap/README.md index bfa0aaa868b..cfdbd27e08e 100644 --- a/exchange/bitswap/README.md +++ b/exchange/bitswap/README.md @@ -1,47 +1,37 @@ -#Welcome to Bitswap -###(The data trading engine) +# Bitswap + +## Protocol +Bitswap is the data trading module for ipfs, it manages requesting and sending +blocks to and from other peers in the network. Bitswap has two main jobs, the +first is to acquire blocks requested by the client from the network. The second +is to judiciously send blocks in its posession to other peers who want them. + +Bitswap is a message based protocol, as opposed to response-reply. All messages +contain wantlists, or blocks. Upon receiving a wantlist, a node should consider +sending out wanted blocks if they have them. Upon receiving blocks, the node +should send out a notification called a 'Cancel' signifying that they no longer +want the block. At a protocol level, bitswap is very simple. + +## go-ipfs Implementation +Internally, when a message with a wantlist is received, it is sent to the +decision engine to be considered, and blocks that we have that are wanted are +placed into the peer request queue. Any block we possess that is wanted by +another peer has a task in the peer request queue created for it. The peer +request queue is a priority queue that sorts available tasks by some metric, +currently, that metric is very simple and aims to fairly address the tasks +of each other peer. More advanced decision logic will be implemented in the +future. Task workers pull tasks to be done off of the queue, retreive the block +to be sent, and send it off. The number of task workers is limited by a constant +factor. + +Client requests for new blocks are handled by the want manager, for every new +block (or set of blocks) wanted, the 'WantBlocks' method is invoked. The want +manager then ensures that connected peers are notified of the new block that we +want by sending the new entries to a message queue for each peer. The message +queue will loop while there is work available and do the following: 1) Ensure it +has a connection to its peer, 2) grab the message to be sent, and 3) send it. +If new messages are added while the loop is in steps 1 or 3, the messages are +combined into one to avoid having to keep an actual queue and send multiple +messages. The same process occurs when the client receives a block and sends a +cancel message for it. -Bitswap is the module that is responsible for requesting and providing data -blocks over the network to and from other ipfs peers. The role of bitswap is -to be a merchant in the large global marketplace of data. - -##Main Operations -Bitswap has three high level operations: - -- **GetBlocks** - - `GetBlocks` is a bitswap method used to request multiple blocks that are likely -to all be provided by the same set of peers (part of a single file, for example). - -- **GetBlock** - - `GetBlock` is a special case of `GetBlocks` that just requests a single block. - -- **HasBlock** - - `HasBlock` registers a local block with bitswap. Bitswap will then send that -block to any connected peers who want it (with the strategies approval), record -that transaction in the ledger and announce to the DHT that the block is being -provided. - -##Internal Details -All `GetBlock` requests are relayed into a single for-select loop via channels. -Calls to `GetBlocks` will have `FindProviders` called for only the first key in -the set initially, This is an optimization attempting to cut down on the number -of RPCs required. After a timeout (specified by the strategies -`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local -wantlist, perform a find providers call for each, and sent the wantlist out to -those providers. This is the fallback behaviour for cases where our initial -assumption about one peer potentially having multiple blocks in a set does not -hold true. - -When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap -message may contain the wantlist of the peer who sent the message, and an array -of blocks that were on our local wantlist. Any blocks we receive in a bitswap -message will be passed to `HasBlock`, and the other peers wantlist gets updated -in the strategy by `bs.strategy.MessageReceived`. -If another peers wantlist is received, Bitswap will call its strategies -`ShouldSendBlockToPeer` method to determine whether or not the other peer will -be sent the block they are requesting (if we even have it). - -##Outstanding TODOs: -- [ ] Ensure only one request active per key -- [ ] More involved strategies -- [ ] Ensure only wanted blocks are counted in ledgers From 2882c793e23457bedf3b2955c1a57fc9fcdb086d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 17 May 2015 14:08:05 -0700 Subject: [PATCH 11/15] add a distribution test with the rebroadcast delay disabled --- exchange/bitswap/bitswap_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 6548472c9dd..803bcd223b8 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -120,6 +120,18 @@ func TestLargeFile(t *testing.T) { PerformDistributionTest(t, numInstances, numBlocks) } +func TestLargeFileNoRebroadcast(t *testing.T) { + rbd := rebroadcastDelay.Get() + rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough + if testing.Short() { + t.SkipNow() + } + numInstances := 10 + numBlocks := 100 + PerformDistributionTest(t, numInstances, numBlocks) + rebroadcastDelay.Set(rbd) +} + func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() From c273a3bd4f616cb95cebb1cf0b4b665bac8a6d39 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 17 May 2015 17:09:53 -0700 Subject: [PATCH 12/15] better bitswap logging --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/wantmanager.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 57359c0ecf2..db7bc033f5a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.dupBlocksRecvd++ } bs.counterLk.Unlock() - log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) + log.Infof("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index eb49201a6dd..74372f7f066 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -66,6 +66,7 @@ type msgQueue struct { } func (pm *WantManager) WantBlocks(ks []u.Key) { + log.Infof("want blocks: %s", ks) pm.addEntries(ks, false) } @@ -97,6 +98,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { msg := bsmsg.New(false) msg.AddBlock(env.Block) + log.Infof("Sending block %s to %s", env.Peer, env.Block) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) @@ -143,8 +145,9 @@ func (pm *WantManager) runQueue(mq *msgQueue) { err := pm.network.ConnectTo(pm.ctx, mq.p) if err != nil { - log.Error(err) + log.Errorf("cant connect to peer %s: %s", mq.p, err) // TODO: cant connect, what now? + continue } // grab outgoing message From b71a0aced018909ca2d56797394e4ae3b02516b8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 11:26:50 -0700 Subject: [PATCH 13/15] clarify synhronization constructs --- exchange/bitswap/wantmanager.go | 38 +++++++++++++++------------------ 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 74372f7f066..4efd120ef6a 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -14,23 +14,17 @@ import ( ) type WantManager struct { - receiver bsnet.Receiver - - incoming chan []*bsmsg.Entry - - // notification channel for new peers connecting - connect chan peer.ID - - // notification channel for peers disconnecting - disconnect chan peer.ID + // sync channels for Run loop + incoming chan []*bsmsg.Entry + connect chan peer.ID // notification channel for new peers connecting + disconnect chan peer.ID // notification channel for peers disconnecting + // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - - wl *wantlist.Wantlist + wl *wantlist.Wantlist network bsnet.BitSwapNetwork - - ctx context.Context + ctx context.Context } func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { @@ -58,8 +52,9 @@ type cancellation struct { type msgQueue struct { p peer.ID - outlk sync.Mutex - out bsmsg.BitSwapMessage + outlk sync.Mutex + out bsmsg.BitSwapMessage + network bsnet.BitSwapNetwork work chan struct{} done chan struct{} @@ -112,7 +107,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { return nil } - mq := newMsgQueue(p) + mq := pm.newMsgQueue(p) // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) @@ -123,7 +118,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(mq) + go mq.runQueue(pm.ctx) return mq } @@ -138,12 +133,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(mq *msgQueue) { +func (mq *msgQueue) runQueue(ctx context.Context) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(pm.ctx, mq.p) + err := mq.network.ConnectTo(ctx, mq.p) if err != nil { log.Errorf("cant connect to peer %s: %s", mq.p, err) // TODO: cant connect, what now? @@ -161,7 +156,7 @@ func (pm *WantManager) runQueue(mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(pm.ctx, mq.p, wlm) + err = mq.network.SendMessage(ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -224,10 +219,11 @@ func (pm *WantManager) Run() { } } -func newMsgQueue(p peer.ID) *msgQueue { +func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { mq := new(msgQueue) mq.done = make(chan struct{}) mq.work = make(chan struct{}, 1) + mq.network = wm.network mq.p = p return mq From 2f934e8c58697d7b6fbc374d4903f85725cc1fc5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 13:13:38 -0700 Subject: [PATCH 14/15] warning -> notice --- exchange/bitswap/wantmanager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 4efd120ef6a..a1ab8a022ce 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -96,7 +96,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { log.Infof("Sending block %s to %s", env.Peer, env.Block) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { - log.Error(err) + log.Noticef("sendblock error: %s", err) } } @@ -158,7 +158,7 @@ func (mq *msgQueue) runQueue(ctx context.Context) { // send wantlist updates err = mq.network.SendMessage(ctx, mq.p, wlm) if err != nil { - log.Error("bitswap send error: ", err) + log.Noticef("bitswap send error: %s", err) // TODO: what do we do if this fails? } case <-mq.done: From ce0d2f46d6ff2c9f03c5cac8358812dd532afacf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 19 May 2015 15:48:12 -0700 Subject: [PATCH 15/15] defer tock.Stop() --- exchange/bitswap/wantmanager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index a1ab8a022ce..29706710f99 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -178,6 +178,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { // TODO: use goprocess here once i trust it func (pm *WantManager) Run() { tock := time.NewTicker(rebroadcastDelay.Get()) + defer tock.Stop() for { select { case entries := <-pm.incoming: