From 9432115230b5ce765a9f08c46d6a1dc67b65798f Mon Sep 17 00:00:00 2001 From: Simon Zhu Date: Mon, 11 Oct 2021 15:52:53 -0700 Subject: [PATCH] Adds customizable prioritization logic for peertracker and peertaskqueue --- peertask/peertask.go | 4 ++- peertaskqueue.go | 41 +++++++++++++++++++++++++---- peertracker/peertracker.go | 53 ++++++++++++++++++++++++++++++++------ 3 files changed, 84 insertions(+), 14 deletions(-) diff --git a/peertask/peertask.go b/peertask/peertask.go index da85e27..87a786f 100644 --- a/peertask/peertask.go +++ b/peertask/peertask.go @@ -7,6 +7,8 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) +type QueueTaskComparator func(a, b *QueueTask) bool + // FIFOCompare is a basic task comparator that returns tasks in the order created. var FIFOCompare = func(a, b *QueueTask) bool { return a.created.Before(b.created) @@ -23,7 +25,7 @@ var PriorityCompare = func(a, b *QueueTask) bool { // WrapCompare wraps a QueueTask comparison function so it can be used as // comparison for a priority queue -func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool { +func WrapCompare(f QueueTaskComparator) func(a, b pq.Elem) bool { return func(a, b pq.Elem) bool { return f(a.(*QueueTask), b.(*QueueTask)) } diff --git a/peertaskqueue.go b/peertaskqueue.go index 99ec33e..8fbaa58 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -24,6 +24,8 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent) // first if priorities are equal. type PeerTaskQueue struct { lock sync.Mutex + peerComparator peertracker.PeerComparator + taskComparator peertask.QueueTaskComparator pQueue pq.PQ peerTrackers map[peer.ID]*peertracker.PeerTracker frozenPeers map[peer.ID]struct{} @@ -112,15 +114,40 @@ func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option { return addHook(hook) } +// PeerComparator is an option that specifies custom peer prioritization logic. +func PeerComparator(pc peertracker.PeerComparator) Option { + return func(ptq *PeerTaskQueue) Option { + previous := ptq.peerComparator + ptq.peerComparator = pc + return PeerComparator(previous) + } +} + +// TaskComparator is an option that specifies custom task prioritization logic. +func TaskComparator(tc peertask.QueueTaskComparator) Option { + return func(ptq *PeerTaskQueue) Option { + previous := ptq.taskComparator + ptq.taskComparator = tc + return TaskComparator(previous) + } +} + // New creates a new PeerTaskQueue func New(options ...Option) *PeerTaskQueue { ptq := &PeerTaskQueue{ - peerTrackers: make(map[peer.ID]*peertracker.PeerTracker), - frozenPeers: make(map[peer.ID]struct{}), - pQueue: pq.New(peertracker.PeerCompare), - taskMerger: &peertracker.DefaultTaskMerger{}, + peerComparator: peertracker.DefaultPeerComparator, + peerTrackers: make(map[peer.ID]*peertracker.PeerTracker), + frozenPeers: make(map[peer.ID]struct{}), + taskMerger: &peertracker.DefaultTaskMerger{}, } ptq.Options(options...) + ptq.pQueue = pq.New( + func(a, b pq.Elem) bool { + pa := a.(*peertracker.PeerTracker) + pb := b.(*peertracker.PeerTracker) + return ptq.peerComparator(pa, pb) + }, + ) return ptq } @@ -171,7 +198,11 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) { peerTracker, ok := ptq.peerTrackers[to] if !ok { - peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer) + var opts []peertracker.Option + if ptq.taskComparator != nil { + opts = append(opts, peertracker.WithQueueTaskComparator(ptq.taskComparator)) + } + peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer, opts...) ptq.pQueue.Push(peerTracker) ptq.peerTrackers[to] = peerTracker ptq.callHooks(to, peerAdded) diff --git a/peertracker/peertracker.go b/peertracker/peertracker.go index dec6451..af7ac39 100644 --- a/peertracker/peertracker.go +++ b/peertracker/peertracker.go @@ -54,30 +54,51 @@ type PeerTracker struct { freezeVal int + queueTaskComparator peertask.QueueTaskComparator + // priority queue of tasks belonging to this peer taskQueue pq.PQ taskMerger TaskMerger } +// Option is a function that configures the peer tracker +type Option func(*PeerTracker) + +// WithQueueTaskComparator sets a custom QueueTask comparison function for the +// peer tracker's task queue. +func WithQueueTaskComparator(f peertask.QueueTaskComparator) Option { + return func(pt *PeerTracker) { + pt.queueTaskComparator = f + } +} + // New creates a new PeerTracker -func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker { - return &PeerTracker{ +func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int, opts ...Option) *PeerTracker { + pt := &PeerTracker{ target: target, - taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), + queueTaskComparator: peertask.PriorityCompare, pendingTasks: make(map[peertask.Topic]*peertask.QueueTask), activeTasks: make(map[*peertask.Task]struct{}), taskMerger: taskMerger, maxActiveWorkPerPeer: maxActiveWorkPerPeer, } + + for _, opt := range opts { + opt(pt) + } + + pt.taskQueue = pq.New(peertask.WrapCompare(pt.queueTaskComparator)) + + return pt } -// PeerCompare implements pq.ElemComparator -// returns true if peer 'a' has higher priority than peer 'b' -func PeerCompare(a, b pq.Elem) bool { - pa := a.(*PeerTracker) - pb := b.(*PeerTracker) +// PeerComparator is used for peer prioritization. +// It should return true if peer 'a' has higher priority than peer 'b' +type PeerComparator func(a, b *PeerTracker) bool +// DefaultPeerComparator implements the default peer prioritization logic. +func DefaultPeerComparator(pa, pb *PeerTracker) bool { // having no pending tasks means lowest priority paPending := len(pa.pendingTasks) pbPending := len(pb.pendingTasks) @@ -108,6 +129,22 @@ func PeerCompare(a, b pq.Elem) bool { return pa.activeWork < pb.activeWork } +// TaskPriorityPeerComparator prioritizes peers based on their highest priority task. +func TaskPriorityPeerComparator(comparator peertask.QueueTaskComparator) PeerComparator { + return func(pa, pb *PeerTracker) bool { + ta := pa.taskQueue.Peek() + tb := pb.taskQueue.Peek() + if ta == nil { + return false + } + if tb == nil { + return true + } + + return comparator(ta.(*peertask.QueueTask), tb.(*peertask.QueueTask)) + } +} + // Target returns the peer that this peer tracker tracks tasks for func (p *PeerTracker) Target() peer.ID { return p.target