Skip to content

Commit

Permalink
move block allocation into message queue (#140)
Browse files Browse the repository at this point in the history
Co-authored-by: acruikshank <acruikshank@example.com>
  • Loading branch information
acruikshank and acruikshank authored Jan 19, 2021
1 parent 319ab7e commit 39076f4
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 113 deletions.
2 changes: 1 addition & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, allocator, peerManager)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
Expand Down
13 changes: 11 additions & 2 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type MessageNetwork interface {
}

type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleasePeerMemory(p peer.ID) error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}
Expand Down Expand Up @@ -80,8 +81,16 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Alloc
}
}

// BuildMessage allows you to work modify the next message that is sent in the queue
func (mq *MessageQueue) BuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// AllocateAndBuildMessage allows you to work modify the next message that is sent in the queue.
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (mq *MessageQueue) AllocateAndBuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
if size > 0 {
select {
case <-mq.allocator.AllocateBlockMemory(mq.p, size):
case <-mq.ctx.Done():
return
}
}
if mq.buildMessage(size, buildMessageFn, notifees) {
mq.signalWork()
}
Expand Down
77 changes: 67 additions & 10 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestStartupAndShutdown(t *testing.T) {
root := testutil.GenerateCids(1)[0]

waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {

// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
waitGroup.Wait()
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestProcessingNotification(t *testing.T) {
status := graphsync.RequestCompletedFull
expectedTopic := "testTopic"
notifee, verifier := testutil.NewTestNotifee(expectedTopic, 5)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddResponseCode(responseID, status)
b.AddExtensionData(responseID, extension)
}, []notifications.Notifee{notifee})
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDedupingMessages(t *testing.T) {
selector := ssb.Matcher().Node()
root := testutil.GenerateCids(1)[0]

messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
// wait for send attempt
Expand All @@ -233,7 +233,7 @@ func TestDedupingMessages(t *testing.T) {
selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
root3 := testutil.GenerateCids(1)[0]

messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id2, root2, selector2, priority2))
b.AddRequest(gsmsg.NewRequest(id3, root3, selector3, priority3))
}, []notifications.Notifee{})
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestDedupingMessages(t *testing.T) {
}
}

func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
func TestSendsVeryLargeBlocksResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand All @@ -288,7 +288,7 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {

// generate large blocks before proceeding
blks := testutil.GenerateBlocksOfSize(5, 1000000)
messageQueue.BuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})
waitGroup.Wait()
Expand All @@ -300,13 +300,13 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.True(t, blks[0].Cid().Equals(msgBlks[0].Cid()))

// Send 3 very large blocks
messageQueue.BuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[2])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[3])
}, []notifications.Notifee{})

Expand All @@ -325,3 +325,60 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[3].Cid().Equals(msgBlks[0].Cid()))
}

func TestSendsResponsesMemoryPressure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

p := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage, 0)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)

messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue.Startup()
waitGroup.Add(1)

// start sending block that exceeds memory limit
blks := testutil.GenerateBlocksOfSize(2, 999)
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})

finishes := make(chan string, 2)
go func() {
// attempt to send second block. Should block until memory is released
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
finishes <- "sent message"
}()

// assert transaction does not complete within 200ms because it is waiting on memory
ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
select {
case <-finishes:
t.Fatal("transaction failed to wait on memory")
case <-ctx2.Done():
}

// Allow first message to complete sending
<-messagesSent

// assert message is now queued within 200ms
ctx2, cancel2 = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel2()
select {
case <-finishes:
cancel2()
case <-ctx2.Done():
t.Fatal("timeout waiting for transaction to complete")
}
}
7 changes: 4 additions & 3 deletions peermanager/peermessagemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// PeerQueue is a process that sends messages to a peer
type PeerQueue interface {
PeerProcess
BuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// PeerQueueFactory provides a function that will create a PeerQueue.
Expand All @@ -33,7 +33,8 @@ func NewMessageManager(ctx context.Context, createPeerQueue PeerQueueFactory) *P
}

// BuildMessage allows you to modify the next message that is sent for the given peer
func (pmm *PeerMessageManager) BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (pmm *PeerMessageManager) AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.BuildMessage(blkSize, buildMessageFn, notifees)
pq.AllocateAndBuildMessage(blkSize, buildMessageFn, notifees)
}
8 changes: 4 additions & 4 deletions peermanager/peermessagemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type fakePeer struct {
messagesSent chan messageSent
}

func (fp *fakePeer) BuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
buildMessage(builder)
message, err := builder.Build()
Expand Down Expand Up @@ -76,14 +76,14 @@ func TestSendingMessagesToPeers(t *testing.T) {
peerManager := NewMessageManager(ctx, peerQueueFactory)

request := gsmsg.NewRequest(id, root, selector, priority)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
peerManager.BuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
cancelRequest := gsmsg.CancelRequest(id)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(cancelRequest)
}, []notifications.Notifee{})

Expand Down
4 changes: 2 additions & 2 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type inProgressRequestStatus struct {

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// AsyncLoader is an interface for loading links asynchronously, returning
Expand Down Expand Up @@ -566,7 +566,7 @@ const requestNetworkError = "request_network_error"
func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
rm.peerHandler.BuildMessage(p, 0, func(builder *gsmsg.Builder) {
rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) {
builder.AddRequest(request)
}, []notifications.Notifee{failNotifee})
}
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type fakePeerHandler struct {
requestRecordChan chan requestRecord
}

func (fph *fakePeerHandler) BuildMessage(p peer.ID, blkSize uint64,
func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
requestBuilder(builder)
Expand Down
21 changes: 4 additions & 17 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,26 @@ type ResponseBuilder interface {
}

// PeerMessageHandler is an interface that can queue a response for a given peer to go out over the network
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
type PeerMessageHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// Allocator is an interface that can manage memory allocated for blocks
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// ResponseAssembler manages assembling responses to go out over the network
// in libp2p messages
type ResponseAssembler struct {
*peermanager.PeerManager
allocator Allocator
peerHandler PeerMessageHandler
ctx context.Context
}

// New generates a new ResponseAssembler for sending responses
func New(ctx context.Context, allocator Allocator, peerHandler PeerMessageHandler) *ResponseAssembler {
func New(ctx context.Context, peerHandler PeerMessageHandler) *ResponseAssembler {
return &ResponseAssembler{
PeerManager: peermanager.New(ctx, func(ctx context.Context, p peer.ID) peermanager.PeerHandler {
return newTracker()
}),
ctx: ctx,
allocator: allocator,
peerHandler: peerHandler,
}
}
Expand Down Expand Up @@ -110,14 +104,7 @@ func (ra *ResponseAssembler) execute(p peer.ID, operations []responseOperation,
for _, op := range operations {
size += op.size()
}
if size > 0 {
select {
case <-ra.allocator.AllocateBlockMemory(p, size):
case <-ra.ctx.Done():
return
}
}
ra.peerHandler.BuildMessage(p, size, func(builder *gsmsg.Builder) {
ra.peerHandler.AllocateAndBuildMessage(p, size, func(builder *gsmsg.Builder) {
for _, op := range operations {
op.build(builder)
}
Expand Down
Loading

0 comments on commit 39076f4

Please sign in to comment.