From 24c557b5451905de7d3b618411f0ee072d97b561 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 21 Oct 2020 04:43:53 -0700 Subject: [PATCH] feat(allocator): add allocator for memory backpressure add an allocator that manages global memory allocations on responder and blocks peerresponsesenders as needed --- go.mod | 2 + impl/graphsync.go | 38 ++- responsemanager/allocator/allocator.go | 232 ++++++++++++++++++ responsemanager/allocator/allocator_test.go | 219 +++++++++++++++++ .../peerresponsemanager/peerresponsesender.go | 60 ++++- .../peerresponsesender_test.go | 84 ++++++- 6 files changed, 611 insertions(+), 24 deletions(-) create mode 100644 responsemanager/allocator/allocator.go create mode 100644 responsemanager/allocator/allocator_test.go diff --git a/go.mod b/go.mod index 05fbe725..5e785807 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 + github.com/ipfs/go-ipfs-pq v0.0.2 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-ipld-cbor v0.0.4 // indirect @@ -32,6 +33,7 @@ require ( github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-core v0.5.0 github.com/libp2p/go-libp2p-netutil v0.1.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-record v0.1.1 // indirect github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-msgio v0.0.6 diff --git a/impl/graphsync.go b/impl/graphsync.go index 85398f3d..2d06698d 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-graphsync/requestmanager/asyncloader" requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/responsemanager" + "github.com/ipfs/go-graphsync/responsemanager/allocator" responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" "github.com/ipfs/go-graphsync/responsemanager/persistenceoptions" @@ -26,6 +27,8 @@ import ( var log = logging.Logger("graphsync") const maxRecursionDepth = 100 +const defaultTotalMaxMemory = uint64(4 * 1 << 30) +const defaultMaxMemoryPerPeer = uint64(1 << 30) // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol. @@ -53,6 +56,9 @@ type GraphSync struct { ctx context.Context cancel context.CancelFunc unregisterDefaultValidator graphsync.UnregisterHookFunc + allocator *allocator.Allocator + totalMaxMemory uint64 + maxMemoryPerPeer uint64 } // Option defines the functional option type that can be used to configure @@ -67,6 +73,18 @@ func RejectAllRequestsByDefault() Option { } } +func MaxMemoryResponder(totalMaxMemory uint64) Option { + return func(gs *GraphSync) { + gs.totalMaxMemory = totalMaxMemory + } +} + +func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option { + return func(gs *GraphSync) { + gs.maxMemoryPerPeer = maxMemoryPerPeer + } +} + // New creates a new GraphSync Exchange on the given network, // and the given link loader+storer. func New(parent context.Context, network gsnet.GraphSyncNetwork, @@ -83,10 +101,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingBlockHooks := requestorhooks.NewBlockHooks() requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks) peerTaskQueue := peertaskqueue.New() - createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { - return peerresponsemanager.NewResponseSender(ctx, p, peerManager) - } - peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + persistenceOptions := persistenceoptions.New() incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() @@ -95,7 +110,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() blockSentListeners := responderhooks.NewBlockSentListeners() networkErrorListeners := responderhooks.NewNetworkErrorListeners() - responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, @@ -116,8 +130,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, outgoingRequestHooks: outgoingRequestHooks, incomingBlockHooks: incomingBlockHooks, peerTaskQueue: peerTaskQueue, - peerResponseManager: peerResponseManager, - responseManager: responseManager, + totalMaxMemory: defaultTotalMaxMemory, + maxMemoryPerPeer: defaultMaxMemoryPerPeer, ctx: ctx, cancel: cancel, unregisterDefaultValidator: unregisterDefaultValidator, @@ -126,7 +140,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, for _, option := range options { option(graphSync) } + allocator := allocator.NewAllocator(ctx, graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer) + graphSync.allocator = allocator + createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { + return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator) + } + peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + graphSync.peerResponseManager = peerResponseManager + responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) + graphSync.responseManager = responseManager + allocator.Start() asyncLoader.Startup() requestManager.SetDelegate(peerManager) requestManager.Startup() diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go new file mode 100644 index 00000000..9ecb6d76 --- /dev/null +++ b/responsemanager/allocator/allocator.go @@ -0,0 +1,232 @@ +package allocator + +import ( + "context" + "errors" + + pq "github.com/ipfs/go-ipfs-pq" + peer "github.com/libp2p/go-libp2p-peer" +) + +type Allocator struct { + ctx context.Context + totalMemoryMax uint64 + perPeerMax uint64 + total uint64 + nextAllocIndex uint64 + messages chan allocationRequest + peerStatuses map[peer.ID]*peerStatus + peerStatusQueue pq.PQ +} + +func NewAllocator(ctx context.Context, totalMemoryMax uint64, perPeerMax uint64) *Allocator { + return &Allocator{ + ctx: ctx, + totalMemoryMax: totalMemoryMax, + perPeerMax: perPeerMax, + total: 0, + peerStatuses: make(map[peer.ID]*peerStatus), + peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)), + messages: make(chan allocationRequest, 16), + } +} + +func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { + responseChan := make(chan error, 1) + done := make(chan struct{}, 1) + select { + case <-a.ctx.Done(): + responseChan <- errors.New("context closed") + case a.messages <- allocationRequest{p, amount, false, responseChan, done}: + } + select { + case <-a.ctx.Done(): + case <-done: + } + return responseChan +} + +func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { + responseChan := make(chan error, 1) + select { + case <-a.ctx.Done(): + responseChan <- errors.New("context closed") + case a.messages <- allocationRequest{p, amount, true, responseChan, nil}: + } + select { + case <-a.ctx.Done(): + return errors.New("context closed") + case err := <-responseChan: + return err + } +} + +func (a *Allocator) Start() { + go func() { + a.run() + a.cleanup() + }() +} + +func (a *Allocator) run() { + for { + select { + case <-a.ctx.Done(): + return + case request := <-a.messages: + status, ok := a.peerStatuses[request.p] + if request.isDelloc { + if !ok { + request.response <- errors.New("cannot deallocate from peer with no allocations") + continue + } + a.handleDeallocRequest(request, status) + } else { + if !ok { + status = &peerStatus{ + p: request.p, + totalAllocated: 0, + } + a.peerStatusQueue.Push(status) + a.peerStatuses[request.p] = status + } + a.handleAllocRequest(request, status) + } + } + } +} + +func (a *Allocator) cleanup() { + for { + if a.peerStatusQueue.Len() == 0 { + return + } + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + if len(nextPeer.pendingAllocations) == 0 { + return + } + pendingAllocation := nextPeer.pendingAllocations[0] + nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] + pendingAllocation.response <- errors.New("never allocated") + a.peerStatusQueue.Update(nextPeer.Index()) + } +} + +func (a *Allocator) handleAllocRequest(request allocationRequest, status *peerStatus) { + if (a.total+request.amount <= a.totalMemoryMax) && (status.totalAllocated+request.amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { + a.total += request.amount + status.totalAllocated += request.amount + request.response <- nil + } else { + pendingAllocation := pendingAllocation{ + allocationRequest: request, + allocIndex: a.nextAllocIndex, + } + a.nextAllocIndex++ + status.pendingAllocations = append(status.pendingAllocations, pendingAllocation) + } + a.peerStatusQueue.Update(status.Index()) + request.done <- struct{}{} +} + +func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peerStatus) { + status.totalAllocated -= request.amount + a.total -= request.amount + a.peerStatusQueue.Update(status.Index()) + for a.processNextPendingAllocation() { + } + request.response <- nil +} + +func (a *Allocator) processNextPendingAllocation() bool { + if a.peerStatusQueue.Len() == 0 { + return false + } + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + + if len(nextPeer.pendingAllocations) > 0 { + if !a.processNextPendingAllocationForPeer(nextPeer) { + return false + } + a.peerStatusQueue.Update(nextPeer.Index()) + } else { + if nextPeer.totalAllocated > 0 { + return false + } + a.peerStatusQueue.Pop() + target := nextPeer.p + delete(a.peerStatuses, target) + } + return true +} + +func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool { + pendingAllocation := nextPeer.pendingAllocations[0] + if a.total+pendingAllocation.amount > a.totalMemoryMax { + return false + } + if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax { + return false + } + a.total += pendingAllocation.amount + nextPeer.totalAllocated += pendingAllocation.amount + nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] + pendingAllocation.response <- nil + return true +} + +type allocationRequest struct { + p peer.ID + amount uint64 + isDelloc bool + response chan error + done chan struct{} +} + +type peerStatus struct { + p peer.ID + totalAllocated uint64 + index int + pendingAllocations []pendingAllocation +} + +type pendingAllocation struct { + allocationRequest + allocIndex uint64 +} + +// SetIndex stores the int index. +func (ps *peerStatus) SetIndex(index int) { + ps.index = index +} + +// Index returns the last given by SetIndex(int). +func (ps *peerStatus) Index() int { + return ps.index +} + +func makePeerStatusCompare(maxPerPeer uint64) pq.ElemComparator { + return func(a, b pq.Elem) bool { + pa := a.(*peerStatus) + pb := b.(*peerStatus) + if len(pa.pendingAllocations) == 0 { + if len(pb.pendingAllocations) == 0 { + return pa.totalAllocated < pb.totalAllocated + } + return false + } + if len(pb.pendingAllocations) == 0 { + return true + } + if pa.totalAllocated+pa.pendingAllocations[0].amount > maxPerPeer { + return false + } + if pb.totalAllocated+pb.pendingAllocations[0].amount > maxPerPeer { + return true + } + if pa.pendingAllocations[0].allocIndex < pb.pendingAllocations[0].allocIndex { + return true + } + return false + } +} diff --git a/responsemanager/allocator/allocator_test.go b/responsemanager/allocator/allocator_test.go new file mode 100644 index 00000000..4ea6ac05 --- /dev/null +++ b/responsemanager/allocator/allocator_test.go @@ -0,0 +1,219 @@ +package allocator_test + +import ( + "context" + "testing" + + "github.com/ipfs/go-graphsync/responsemanager/allocator" + "github.com/ipfs/go-graphsync/testutil" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAllocator(t *testing.T) { + peers := testutil.GeneratePeers(3) + ctx := context.Background() + testCases := map[string]struct { + total uint64 + maxPerPeer uint64 + allocs []alloc + totals []map[peer.ID]uint64 + }{ + "single peer against total": { + total: 1000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "single peer against self limit": { + total: 2000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "multiple peers against total": { + total: 2000, + maxPerPeer: 2000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 500, peers[1]: 1300}, + {peers[0]: 500, peers[1]: 800}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against self limit": { + total: 5000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 400}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against mix of limits": { + total: 2700, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 800, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[2], 1000, false}, + {peers[2], 300, false}, + {peers[0], 200, true}, + {peers[2], 200, true}, + {peers[2], 100, false}, + {peers[1], 200, true}, + {peers[2], 100, true}, + {peers[1], 100, true}, + {peers[2], 200, true}, + {peers[0], 200, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 800}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 700}, + {peers[0]: 900, peers[1]: 700, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 800}, + {peers[0]: 900, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 900}, + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + //defer cancel() + allocator := allocator.NewAllocator(ctx, data.total, data.maxPerPeer) + allocator.Start() + totals := map[peer.ID]uint64{} + currentTotal := 0 + var pending []pendingResult + for _, alloc := range data.allocs { + var changedTotals bool + pending, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + if alloc.isDealloc { + err := allocator.ReleaseBlockMemory(alloc.p, alloc.amount) + assert.NoError(t, err) + totals[alloc.p] = totals[alloc.p] - alloc.amount + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } else { + allocated := allocator.AllocateBlockMemory(alloc.p, alloc.amount) + select { + case <-allocated: + totals[alloc.p] = totals[alloc.p] + alloc.amount + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + default: + pending = append(pending, pendingResult{alloc.p, alloc.amount, allocated}) + } + } + } + var changedTotals bool + _, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + require.Equal(t, len(data.totals), currentTotal) + }) + } +} + +func readPending(t *testing.T, pending []pendingResult, totals map[peer.ID]uint64) ([]pendingResult, bool) { + morePending := true + changedTotals := false + for morePending && len(pending) > 0 { + morePending = false + doneIter: + for i, next := range pending { + select { + case err := <-next.response: + require.NoError(t, err) + copy(pending[i:], pending[i+1:]) + pending[len(pending)-1] = pendingResult{} + pending = pending[:len(pending)-1] + totals[next.p] = totals[next.p] + next.amount + changedTotals = true + morePending = true + break doneIter + default: + } + } + } + return pending, changedTotals +} + +type alloc struct { + p peer.ID + amount uint64 + isDealloc bool +} + +type pendingResult struct { + p peer.ID + amount uint64 + response <-chan error +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index e42e39d3..baab375e 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -49,6 +49,11 @@ type PeerMessageHandler interface { SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block, ...notifications.Notifee) } +type Allocator interface { + AllocateBlockMemory(p peer.ID, amount uint64) <-chan error + ReleaseBlockMemory(p peer.ID, amount uint64) error +} + // Transaction is a series of operations that should be send together in a single response type Transaction func(PeerResponseTransactionSender) error @@ -57,18 +62,20 @@ type peerResponseSender struct { ctx context.Context cancel context.CancelFunc peerHandler PeerMessageHandler + allocator Allocator outgoingWork chan struct{} - linkTrackerLk sync.RWMutex - linkTracker *linktracker.LinkTracker - altTrackers map[string]*linktracker.LinkTracker - dedupKeys map[graphsync.RequestID]string - responseBuildersLk sync.RWMutex - responseBuilders []*responsebuilder.ResponseBuilder - nextBuilderTopic responsebuilder.Topic - queuedMessages chan responsebuilder.Topic - subscriber notifications.MappableSubscriber - publisher notifications.Publisher + linkTrackerLk sync.RWMutex + linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string + responseBuildersLk sync.RWMutex + responseBuilders []*responsebuilder.ResponseBuilder + nextBuilderTopic responsebuilder.Topic + queuedMessages chan responsebuilder.Topic + subscriber notifications.MappableSubscriber + allocatorSubscriber notifications.MappableSubscriber + publisher notifications.Publisher } // PeerResponseSender handles batching, deduping, and sending responses for @@ -109,7 +116,7 @@ type PeerResponseTransactionSender interface { // NewResponseSender generates a new PeerResponseSender for the given context, peer ID, // using the given peer message handler. -func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender { +func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler, allocator Allocator) PeerResponseSender { ctx, cancel := context.WithCancel(ctx) prs := &peerResponseSender{ p: p, @@ -122,8 +129,10 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa altTrackers: make(map[string]*linktracker.LinkTracker), queuedMessages: make(chan responsebuilder.Topic, 1), publisher: notifications.NewPublisher(), + allocator: allocator, } prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform) + prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform) return prs } @@ -383,6 +392,12 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { } func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { + allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) + select { + case <-prs.ctx.Done(): + return false + case <-allocResponse: + } prs.responseBuildersLk.Lock() defer prs.responseBuildersLk.Unlock() if shouldBeginNewResponse(prs.responseBuilders, blkSize) { @@ -436,6 +451,10 @@ func (prs *peerResponseSender) sendResponseMessages() { if builder.Empty() { continue } + notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{ + Topic: builder.BlockSize(), + Subscriber: prs.allocatorSubscriber, + }) responses, blks, err := builder.Build() if err != nil { log.Errorf("Unable to assemble GraphSync response: %s", err.Error()) @@ -493,3 +512,22 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event func (s *subscriber) OnClose(topic notifications.Topic) { s.prs.publisher.Close(topic) } + +type allocatorSubscriber struct { + prs *peerResponseSender +} + +func (as *allocatorSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + blkSize, ok := topic.(uint64) + if !ok { + return + } + _, ok = event.(Event) + if !ok { + return + } + _ = as.prs.allocator.ReleaseBlockMemory(as.prs.p, blkSize) +} + +func (as *allocatorSubscriber) OnClose(topic notifications.Topic) { +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index ecdf91e8..70ef3ce0 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -18,6 +18,7 @@ import ( gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/allocator" "github.com/ipfs/go-graphsync/testutil" ) @@ -41,7 +42,9 @@ func TestPeerResponseSenderSendsResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData(), sendResponseNotifee1) @@ -125,7 +128,9 @@ func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -185,7 +190,9 @@ func TestPeerResponseSenderSendsExtensionData(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -228,7 +235,9 @@ func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() notifee, notifeeVerifier := testutil.NewTestNotifee("transaction", 10) err := peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { @@ -270,7 +279,9 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.IgnoreBlocks(requestID1, links) @@ -326,7 +337,9 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.DedupKey(requestID1, "applesauce") @@ -382,6 +395,65 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { } +func TestPeerResponseSenderSendsResponsesMemoryPressure(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + fph := newFakePeerHandler(ctx, t) + allocator := allocator.NewAllocator(ctx, 300, 300) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) + peerResponseSender.Startup() + + bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) + assertSentOnWire(t, bd, blks[0]) + fph.AssertHasMessage("did not send first message") + + fph.AssertBlocks(blks[0]) + fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse}) + + finishes := make(chan string, 2) + go func() { + _ = peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { + bd = peerResponseSender.SendResponse(links[1], blks[1].RawData()) + assertSentOnWire(t, bd, blks[1]) + bd = peerResponseSender.SendResponse(links[2], blks[2].RawData()) + assertSentOnWire(t, bd, blks[2]) + bd = peerResponseSender.SendResponse(links[3], blks[3].RawData()) + assertSentOnWire(t, bd, blks[3]) + peerResponseSender.FinishRequest() + return nil + }) + finishes <- "sent message" + }() + go func() { + time.Sleep(100 * time.Millisecond) + // let peer reponse manager know last message was sent so message sending can continue + finishes <- "freed memory" + fph.notifySuccess() + }() + + var finishMessages []string + for i := 0; i < 2; i++ { + var finishMessage string + testutil.AssertReceive(ctx, t, finishes, &finishMessage, "should have completed") + finishMessages = append(finishMessages, finishMessage) + } + require.Equal(t, []string{"freed memory", "sent message"}, finishMessages) + fph.AssertHasMessage("did not send second message") + fph.AssertBlocks(blks[1], blks[2], blks[3]) + fph.AssertResponses(expectedResponses{ + requestID1: graphsync.RequestCompletedFull, + }) +} + func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { for _, response := range responses { if response.RequestID() == requestID {