Skip to content

Commit

Permalink
feat(allocator): refactor w/ mutex
Browse files Browse the repository at this point in the history
refactor allocator to remove go routine and address a few PR comments
  • Loading branch information
hannahhoward committed Oct 27, 2020
1 parent 6b1db1a commit 5bf1d94
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 187 deletions.
7 changes: 3 additions & 4 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
var log = logging.Logger("graphsync")

const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(4 * 1 << 30)
const defaultMaxMemoryPerPeer = uint64(1 << 30)
const defaultTotalMaxMemory = uint64(1 << 28)
const defaultMaxMemoryPerPeer = uint64(1 << 24)

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
Expand Down Expand Up @@ -140,7 +140,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
for _, option := range options {
option(graphSync)
}
allocator := allocator.NewAllocator(ctx, graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
graphSync.allocator = allocator
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator)
Expand All @@ -150,7 +150,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
graphSync.responseManager = responseManager

allocator.Start()
asyncLoader.Startup()
requestManager.SetDelegate(peerManager)
requestManager.Startup()
Expand Down
224 changes: 64 additions & 160 deletions responsemanager/allocator/allocator.go
Original file line number Diff line number Diff line change
@@ -1,196 +1,114 @@
package allocator

import (
"context"
"errors"
"sync"

pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)

type Allocator struct {
ctx context.Context
totalMemoryMax uint64
perPeerMax uint64
totalMemoryMax uint64
perPeerMax uint64

allocLk sync.Mutex
total uint64
nextAllocIndex uint64
messages chan allocationRequest
peerStatuses map[peer.ID]*peerStatus
peerStatusQueue pq.PQ
}

func NewAllocator(ctx context.Context, totalMemoryMax uint64, perPeerMax uint64) *Allocator {
func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator {
return &Allocator{
ctx: ctx,
totalMemoryMax: totalMemoryMax,
perPeerMax: perPeerMax,
total: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
messages: make(chan allocationRequest, 16),
}
}

func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
responseChan := make(chan error, 1)
done := make(chan struct{}, 1)
select {
case <-a.ctx.Done():
responseChan <- errors.New("context closed")
case a.messages <- allocationRequest{p, amount, allocOperation, responseChan, done}:
}
select {
case <-a.ctx.Done():
case <-done:
}
return responseChan
}

func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
responseChan := make(chan error, 1)
select {
case <-a.ctx.Done():
responseChan <- errors.New("context closed")
case a.messages <- allocationRequest{p, amount, deallocOperation, responseChan, nil}:
}
select {
case <-a.ctx.Done():
return errors.New("context closed")
case err := <-responseChan:
return err
}
}

func (a *Allocator) ReleasePeerMemory(p peer.ID) error {
responseChan := make(chan error, 1)
select {
case <-a.ctx.Done():
responseChan <- errors.New("context closed")
case a.messages <- allocationRequest{p, 0, deallocPeerOperation, responseChan, nil}:
}
select {
case <-a.ctx.Done():
return errors.New("context closed")
case err := <-responseChan:
return err
}
}

func (a *Allocator) Start() {
go func() {
a.run()
a.cleanup()
}()
}

func (a *Allocator) run() {
for {
select {
case <-a.ctx.Done():
return
case request := <-a.messages:
status, ok := a.peerStatuses[request.p]
switch request.operation {
case allocOperation:
if !ok {
status = &peerStatus{
p: request.p,
totalAllocated: 0,
}
a.peerStatusQueue.Push(status)
a.peerStatuses[request.p] = status
}
a.handleAllocRequest(request, status)
case deallocOperation:
if !ok {
request.response <- errors.New("cannot deallocate from peer with no allocations")
continue
}
a.handleDeallocRequest(request, status)
case deallocPeerOperation:
if !ok {
request.response <- errors.New("cannot deallocate from peer with no allocations")
continue
}
a.handleDeallocPeerRequest(request, status)
}
}
}
}

func (a *Allocator) cleanup() {
for {
if a.peerStatusQueue.Len() == 0 {
return
}
nextPeer := a.peerStatusQueue.Peek().(*peerStatus)
if len(nextPeer.pendingAllocations) == 0 {
return
a.allocLk.Lock()
defer a.allocLk.Unlock()

status, ok := a.peerStatuses[p]
if !ok {
status = &peerStatus{
p: p,
totalAllocated: 0,
}
pendingAllocation := nextPeer.pendingAllocations[0]
nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:]
pendingAllocation.response <- errors.New("never allocated")
a.peerStatusQueue.Update(nextPeer.Index())
a.peerStatusQueue.Push(status)
a.peerStatuses[p] = status
}
}

func (a *Allocator) handleAllocRequest(request allocationRequest, status *peerStatus) {
if (a.total+request.amount <= a.totalMemoryMax) && (status.totalAllocated+request.amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
a.total += request.amount
status.totalAllocated += request.amount
request.response <- nil
if (a.total+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
a.total += amount
status.totalAllocated += amount
responseChan <- nil
} else {
pendingAllocation := pendingAllocation{
allocationRequest: request,
allocIndex: a.nextAllocIndex,
}
pendingAllocation := pendingAllocation{p, amount, responseChan, a.nextAllocIndex}
a.nextAllocIndex++
status.pendingAllocations = append(status.pendingAllocations, pendingAllocation)
}
a.peerStatusQueue.Update(status.Index())
request.done <- struct{}{}
return responseChan
}

func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peerStatus) {
status.totalAllocated -= request.amount
a.total -= request.amount
a.peerStatusQueue.Update(status.Index())
for a.processNextPendingAllocation() {
func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
a.allocLk.Lock()
defer a.allocLk.Unlock()

status, ok := a.peerStatuses[p]
if !ok {
return errors.New("cannot deallocate from peer with no allocations")
}
request.response <- nil
status.totalAllocated -= amount
a.total -= amount
a.peerStatusQueue.Update(status.Index())
a.processPendingAllocations()
return nil
}

func (a *Allocator) handleDeallocPeerRequest(request allocationRequest, status *peerStatus) {
func (a *Allocator) ReleasePeerMemory(p peer.ID) error {
a.allocLk.Lock()
defer a.allocLk.Unlock()
status, ok := a.peerStatuses[p]
if !ok {
return errors.New("cannot deallocate peer with no allocations")
}
a.peerStatusQueue.Remove(status.Index())
for _, pendingAllocation := range status.pendingAllocations {
pendingAllocation.response <- errors.New("Peer has been deallocated")
}
a.total -= status.totalAllocated
for a.processNextPendingAllocation() {
}
request.response <- nil
a.processPendingAllocations()
return nil
}

func (a *Allocator) processNextPendingAllocation() bool {
if a.peerStatusQueue.Len() == 0 {
return false
}
nextPeer := a.peerStatusQueue.Peek().(*peerStatus)

if len(nextPeer.pendingAllocations) > 0 {
if !a.processNextPendingAllocationForPeer(nextPeer) {
return false
func (a *Allocator) processPendingAllocations() {
for {
if a.peerStatusQueue.Len() == 0 {
return
}
a.peerStatusQueue.Update(nextPeer.Index())
} else {
if nextPeer.totalAllocated > 0 {
return false
nextPeer := a.peerStatusQueue.Peek().(*peerStatus)

if len(nextPeer.pendingAllocations) > 0 {
if !a.processNextPendingAllocationForPeer(nextPeer) {
return
}
a.peerStatusQueue.Update(nextPeer.Index())
} else {
if nextPeer.totalAllocated > 0 {
return
}
a.peerStatusQueue.Pop()
target := nextPeer.p
delete(a.peerStatuses, target)
}
a.peerStatusQueue.Pop()
target := nextPeer.p
delete(a.peerStatuses, target)
}
return true
}

func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool {
Expand All @@ -208,22 +126,6 @@ func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bo
return true
}

type operationType uint64

const (
allocOperation operationType = iota
deallocOperation
deallocPeerOperation
)

type allocationRequest struct {
p peer.ID
amount uint64
operation operationType
response chan error
done chan struct{}
}

type peerStatus struct {
p peer.ID
totalAllocated uint64
Expand All @@ -232,7 +134,9 @@ type peerStatus struct {
}

type pendingAllocation struct {
allocationRequest
p peer.ID
amount uint64
response chan error
allocIndex uint64
}

Expand Down
8 changes: 1 addition & 7 deletions responsemanager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package allocator_test

import (
"context"
"testing"
"time"

"github.com/ipfs/go-graphsync/responsemanager/allocator"
"github.com/ipfs/go-graphsync/testutil"
Expand All @@ -14,7 +12,6 @@ import (

func TestAllocator(t *testing.T) {
peers := testutil.GeneratePeers(3)
ctx := context.Background()
testCases := map[string]struct {
total uint64
maxPerPeer uint64
Expand Down Expand Up @@ -160,10 +157,7 @@ func TestAllocator(t *testing.T) {
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
allocator := allocator.NewAllocator(ctx, data.total, data.maxPerPeer)
allocator.Start()
allocator := allocator.NewAllocator(data.total, data.maxPerPeer)
totals := map[peer.ID]uint64{}
currentTotal := 0
var pending []pendingResult
Expand Down
3 changes: 1 addition & 2 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,10 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) {

func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool {
if blkSize > 0 {
allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize)
select {
case <-prs.allocator.AllocateBlockMemory(prs.p, blkSize):
case <-prs.ctx.Done():
return false
case <-allocResponse:
}
}
prs.responseBuildersLk.Lock()
Expand Down
Loading

0 comments on commit 5bf1d94

Please sign in to comment.