Skip to content

Commit

Permalink
feat(allocator): add allocator for memory backpressure
Browse files Browse the repository at this point in the history
add an allocator that manages global memory allocations on responder and blocks peerresponsesenders
as needed
  • Loading branch information
hannahhoward committed Oct 21, 2020
1 parent 536970d commit 24c557b
Show file tree
Hide file tree
Showing 6 changed files with 611 additions and 24 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-pq v0.0.2
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
Expand All @@ -32,6 +33,7 @@ require (
github.com/libp2p/go-libp2p v0.6.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-record v0.1.1 // indirect
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.6
Expand Down
38 changes: 31 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/allocator"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
Expand All @@ -26,6 +27,8 @@ import (
var log = logging.Logger("graphsync")

const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(4 * 1 << 30)
const defaultMaxMemoryPerPeer = uint64(1 << 30)

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
Expand Down Expand Up @@ -53,6 +56,9 @@ type GraphSync struct {
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
allocator *allocator.Allocator
totalMaxMemory uint64
maxMemoryPerPeer uint64
}

// Option defines the functional option type that can be used to configure
Expand All @@ -67,6 +73,18 @@ func RejectAllRequestsByDefault() Option {
}
}

func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *GraphSync) {
gs.totalMaxMemory = totalMaxMemory
}
}

func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *GraphSync) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand All @@ -83,10 +101,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingBlockHooks := requestorhooks.NewBlockHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)

persistenceOptions := persistenceoptions.New()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
Expand All @@ -95,7 +110,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
blockSentListeners := responderhooks.NewBlockSentListeners()
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -116,8 +130,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
Expand All @@ -126,7 +140,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
for _, option := range options {
option(graphSync)
}
allocator := allocator.NewAllocator(ctx, graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
graphSync.allocator = allocator
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
graphSync.peerResponseManager = peerResponseManager
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
graphSync.responseManager = responseManager

allocator.Start()
asyncLoader.Startup()
requestManager.SetDelegate(peerManager)
requestManager.Startup()
Expand Down
232 changes: 232 additions & 0 deletions responsemanager/allocator/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package allocator

import (
"context"
"errors"

pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)

type Allocator struct {
ctx context.Context
totalMemoryMax uint64
perPeerMax uint64
total uint64
nextAllocIndex uint64
messages chan allocationRequest
peerStatuses map[peer.ID]*peerStatus
peerStatusQueue pq.PQ
}

func NewAllocator(ctx context.Context, totalMemoryMax uint64, perPeerMax uint64) *Allocator {
return &Allocator{
ctx: ctx,
totalMemoryMax: totalMemoryMax,
perPeerMax: perPeerMax,
total: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
messages: make(chan allocationRequest, 16),
}
}

func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
responseChan := make(chan error, 1)
done := make(chan struct{}, 1)
select {
case <-a.ctx.Done():
responseChan <- errors.New("context closed")
case a.messages <- allocationRequest{p, amount, false, responseChan, done}:
}
select {
case <-a.ctx.Done():
case <-done:
}
return responseChan
}

func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
responseChan := make(chan error, 1)
select {
case <-a.ctx.Done():
responseChan <- errors.New("context closed")
case a.messages <- allocationRequest{p, amount, true, responseChan, nil}:
}
select {
case <-a.ctx.Done():
return errors.New("context closed")
case err := <-responseChan:
return err
}
}

func (a *Allocator) Start() {
go func() {
a.run()
a.cleanup()
}()
}

func (a *Allocator) run() {
for {
select {
case <-a.ctx.Done():
return
case request := <-a.messages:
status, ok := a.peerStatuses[request.p]
if request.isDelloc {
if !ok {
request.response <- errors.New("cannot deallocate from peer with no allocations")
continue
}
a.handleDeallocRequest(request, status)
} else {
if !ok {
status = &peerStatus{
p: request.p,
totalAllocated: 0,
}
a.peerStatusQueue.Push(status)
a.peerStatuses[request.p] = status
}
a.handleAllocRequest(request, status)
}
}
}
}

func (a *Allocator) cleanup() {
for {
if a.peerStatusQueue.Len() == 0 {
return
}
nextPeer := a.peerStatusQueue.Peek().(*peerStatus)
if len(nextPeer.pendingAllocations) == 0 {
return
}
pendingAllocation := nextPeer.pendingAllocations[0]
nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:]
pendingAllocation.response <- errors.New("never allocated")
a.peerStatusQueue.Update(nextPeer.Index())
}
}

func (a *Allocator) handleAllocRequest(request allocationRequest, status *peerStatus) {
if (a.total+request.amount <= a.totalMemoryMax) && (status.totalAllocated+request.amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
a.total += request.amount
status.totalAllocated += request.amount
request.response <- nil
} else {
pendingAllocation := pendingAllocation{
allocationRequest: request,
allocIndex: a.nextAllocIndex,
}
a.nextAllocIndex++
status.pendingAllocations = append(status.pendingAllocations, pendingAllocation)
}
a.peerStatusQueue.Update(status.Index())
request.done <- struct{}{}
}

func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peerStatus) {
status.totalAllocated -= request.amount
a.total -= request.amount
a.peerStatusQueue.Update(status.Index())
for a.processNextPendingAllocation() {
}
request.response <- nil
}

func (a *Allocator) processNextPendingAllocation() bool {
if a.peerStatusQueue.Len() == 0 {
return false
}
nextPeer := a.peerStatusQueue.Peek().(*peerStatus)

if len(nextPeer.pendingAllocations) > 0 {
if !a.processNextPendingAllocationForPeer(nextPeer) {
return false
}
a.peerStatusQueue.Update(nextPeer.Index())
} else {
if nextPeer.totalAllocated > 0 {
return false
}
a.peerStatusQueue.Pop()
target := nextPeer.p
delete(a.peerStatuses, target)
}
return true
}

func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool {
pendingAllocation := nextPeer.pendingAllocations[0]
if a.total+pendingAllocation.amount > a.totalMemoryMax {
return false
}
if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax {
return false
}
a.total += pendingAllocation.amount
nextPeer.totalAllocated += pendingAllocation.amount
nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:]
pendingAllocation.response <- nil
return true
}

type allocationRequest struct {
p peer.ID
amount uint64
isDelloc bool
response chan error
done chan struct{}
}

type peerStatus struct {
p peer.ID
totalAllocated uint64
index int
pendingAllocations []pendingAllocation
}

type pendingAllocation struct {
allocationRequest
allocIndex uint64
}

// SetIndex stores the int index.
func (ps *peerStatus) SetIndex(index int) {
ps.index = index
}

// Index returns the last given by SetIndex(int).
func (ps *peerStatus) Index() int {
return ps.index
}

func makePeerStatusCompare(maxPerPeer uint64) pq.ElemComparator {
return func(a, b pq.Elem) bool {
pa := a.(*peerStatus)
pb := b.(*peerStatus)
if len(pa.pendingAllocations) == 0 {
if len(pb.pendingAllocations) == 0 {
return pa.totalAllocated < pb.totalAllocated
}
return false
}
if len(pb.pendingAllocations) == 0 {
return true
}
if pa.totalAllocated+pa.pendingAllocations[0].amount > maxPerPeer {
return false
}
if pb.totalAllocated+pb.pendingAllocations[0].amount > maxPerPeer {
return true
}
if pa.pendingAllocations[0].allocIndex < pb.pendingAllocations[0].allocIndex {
return true
}
return false
}
}
Loading

0 comments on commit 24c557b

Please sign in to comment.