-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add allocator for memory backpressure #108
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm generally concerned here about never releasing allocations in some cases.
- Message sending fails.
- Peer disconnects.
This could cause us to slowly slow down then stop.
The main solution I can think of is a finalizer. That is, pass the block into the allocator and get back a "tracked block". The "tracked block" would have some form of "deallocate" method that would, at a minimum, be called on free.
allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) | ||
select { | ||
case <-prs.ctx.Done(): | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. can't this cause us to never release the memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two cases this could happen -- first is if the whole graphsync instance shuts down, in which case, who cares.
The second is if the peer itself shuts down. I am working on a fix for this now.
add tests of large file processing to benchmarks
add an allocator that manages global memory allocations on responder and blocks peerresponsesenders as needed
24c557b
to
6b1db1a
Compare
@Stebalien this should now be ready for review, I think. The main addition I've added is the removal of allocations for a peer when it disconnects. You can see this in the ReleasePeerMemory method on the allocator. (I also removed allocation when blocksize = 0 case why block for nothing) So regarding your comment:
Also, I felt it was pretty important to prove to myself that we've actually solved the allocation problem, so you can see I've made some changes in the benchmarks. Specifically, I have a large file transfer test (1GB). I've run it both as it currently stands -- with 128MB max for allocations (artificially low) and regular (with 4GB default, which means no back pressure during the request). Note that the connection bandwidth in the test is extremely slow -- so not much data is sent over the wire in the 10 seconds the test runs. At the time the test stops, minus backpressure, we should have created the conditions that triggered the memory issue people are seeing: the selector traversal is done, and we've queued up all the response messages but very few have gone out. With backpressure, the selector traversal should be held up, so that only some of the blocks have been read out of the blockstore, and we've only queued up messages up to the memory limit. So, here's the results: I've outlined the area where you can see the different results are as expected. Note that I had to use Badger to replicate this case with the in memory data store reading blocks has no allocation penalty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't reviewed everything but this looks reasonable. I guess my main concerns are:
- This may lead to a lot of work/allocations per block sent.
- Related, we're playing a lot of games with channels just to avoid a lock. A single mutex would likely be faster and have less code.
impl/graphsync.go
Outdated
const defaultTotalMaxMemory = uint64(4 * 1 << 30) | ||
const defaultMaxMemoryPerPeer = uint64(1 << 30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These defaults seem way too high. I'd expect something on the order of megabytes. (e.g., 8MiB per peer, total of 128MiB). That should be more than enough, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(ideally we'd profile the affects on performance somehow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking 256 / 16MB. I just don't want the jittery ness of holding up a selector traversal to end up with a pipe that's not maxed out. I could be super wrong? (seems like a thing that definitely merits profiling in the future)
done := make(chan struct{}, 1) | ||
select { | ||
case <-a.ctx.Done(): | ||
responseChan <- errors.New("context closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a.ctx.Err()
?
responseChan := make(chan error, 1) | ||
select { | ||
case <-a.ctx.Done(): | ||
responseChan <- errors.New("context closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return a.ctx.Err()
?
responseChan := make(chan error, 1) | ||
select { | ||
case <-a.ctx.Done(): | ||
responseChan <- errors.New("context closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
func (a *Allocator) Start() { | ||
go func() { | ||
a.run() | ||
a.cleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Any reason not to move this into run()
as a defer? (then just invoke go a.run()
here)
for a.processNextPendingAllocation() { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The external loop here is strange.
@@ -383,6 +394,14 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { | |||
} | |||
|
|||
func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { | |||
if blkSize > 0 { | |||
allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a max request limit? IIRC, we do so this shouldn't be a huge issue. However, in the future, we should consider something like:
- Take a ticket to allow allocating. We'd have limits (per-peer and total) on the number of outstanding tickets.
- Load the block.
- Allocate space.
- Release the ticket.
The ticket effectively measures the maximum amount of space we might need. Then we can convert to a real allocation to free up that space.
select { | ||
case <-prs.ctx.Done(): | ||
return false | ||
case <-allocResponse: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultra-stylistic-nit-you-should-probably-ignore:
select {
case <-prs.allocator.AllocateBlockMemory(prs.p, blkSize):
case <-prs.ctx.Done():
return false
}
refactor allocator to remove go routine and address a few PR comments
52b9127
to
5bf1d94
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
impl/graphsync.go
Outdated
const defaultTotalMaxMemory = uint64(1 << 28) | ||
const defaultMaxMemoryPerPeer = uint64(1 << 24) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Personally, I think in MiB, GiB, etc. I'd find these easier to read as uint64(256<<20)
and uint64(16<<20)
.
Co-authored-by: Steven Allen <steven@stebalien.com>
* feat(benchmarks): add rudimentary benchmarks Add a benchmarking framework to measure data transfer performance Also update graphsync * fix(benchmarks): setup accurate heap profiling * ci(circle): update to go 1.14 * style(lint): cleanup imports
Goals
With slow receiving peers, we can sometimes make major memory allocators in a selector traversal if the blocks we're traversing aren't being sent. We would like to slow down traversals with backpressure to prevent global allocations from going over a certain amount
Implementation