Skip to content

Commit

Permalink
Merge branch 'master' into release/v0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Jan 21, 2021
2 parents 90303a5 + 2d101ff commit de70694
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 333 deletions.
53 changes: 36 additions & 17 deletions responsemanager/allocator/allocator.go → allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,34 @@ type Allocator struct {
totalMemoryMax uint64
perPeerMax uint64

allocLk sync.Mutex
total uint64
nextAllocIndex uint64
peerStatuses map[peer.ID]*peerStatus
peerStatusQueue pq.PQ
allocLk sync.RWMutex
totalAllocatedAllPeers uint64
nextAllocIndex uint64
peerStatuses map[peer.ID]*peerStatus
peerStatusQueue pq.PQ
}

func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator {
return &Allocator{
totalMemoryMax: totalMemoryMax,
perPeerMax: perPeerMax,
total: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
totalMemoryMax: totalMemoryMax,
perPeerMax: perPeerMax,
totalAllocatedAllPeers: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
}
}

func (a *Allocator) AllocatedForPeer(p peer.ID) uint64 {
a.allocLk.RLock()
defer a.allocLk.RUnlock()

status, ok := a.peerStatuses[p]
if !ok {
return 0
}
return status.totalAllocated
}

func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
responseChan := make(chan error, 1)
a.allocLk.Lock()
Expand All @@ -44,8 +55,8 @@ func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
a.peerStatuses[p] = status
}

if (a.total+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
a.total += amount
if (a.totalAllocatedAllPeers+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
a.totalAllocatedAllPeers += amount
status.totalAllocated += amount
responseChan <- nil
} else {
Expand All @@ -65,8 +76,16 @@ func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
if !ok {
return errors.New("cannot deallocate from peer with no allocations")
}
status.totalAllocated -= amount
a.total -= amount
if status.totalAllocated > amount {
status.totalAllocated -= amount
} else {
status.totalAllocated = 0
}
if a.totalAllocatedAllPeers > amount {
a.totalAllocatedAllPeers -= amount
} else {
a.totalAllocatedAllPeers = 0
}
a.peerStatusQueue.Update(status.Index())
a.processPendingAllocations()
return nil
Expand All @@ -84,7 +103,7 @@ func (a *Allocator) ReleasePeerMemory(p peer.ID) error {
for _, pendingAllocation := range status.pendingAllocations {
pendingAllocation.response <- errors.New("Peer has been deallocated")
}
a.total -= status.totalAllocated
a.totalAllocatedAllPeers -= status.totalAllocated
a.processPendingAllocations()
return nil
}
Expand All @@ -111,13 +130,13 @@ func (a *Allocator) processPendingAllocations() {

func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool {
pendingAllocation := nextPeer.pendingAllocations[0]
if a.total+pendingAllocation.amount > a.totalMemoryMax {
if a.totalAllocatedAllPeers+pendingAllocation.amount > a.totalMemoryMax {
return false
}
if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax {
return false
}
a.total += pendingAllocation.amount
a.totalAllocatedAllPeers += pendingAllocation.amount
nextPeer.totalAllocated += pendingAllocation.amount
nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:]
pendingAllocation.response <- nil
Expand Down
Loading

0 comments on commit de70694

Please sign in to comment.