Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds customizable prioritization logic for peertracker and peertaskqueue #17

Merged
merged 1 commit into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion peertask/peertask.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

type QueueTaskComparator func(a, b *QueueTask) bool

// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
Expand All @@ -23,7 +25,7 @@ var PriorityCompare = func(a, b *QueueTask) bool {

// WrapCompare wraps a QueueTask comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
func WrapCompare(f QueueTaskComparator) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*QueueTask), b.(*QueueTask))
}
Expand Down
41 changes: 36 additions & 5 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
peerComparator peertracker.PeerComparator
taskComparator peertask.QueueTaskComparator
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
Expand Down Expand Up @@ -112,15 +114,40 @@ func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
return addHook(hook)
}

// PeerComparator is an option that specifies custom peer prioritization logic.
func PeerComparator(pc peertracker.PeerComparator) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.peerComparator
ptq.peerComparator = pc
return PeerComparator(previous)
}
}

// TaskComparator is an option that specifies custom task prioritization logic.
func TaskComparator(tc peertask.QueueTaskComparator) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskComparator
ptq.taskComparator = tc
return TaskComparator(previous)
}
}

// New creates a new PeerTaskQueue
func New(options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
peerComparator: peertracker.DefaultPeerComparator,
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
taskMerger: &peertracker.DefaultTaskMerger{},
}
ptq.Options(options...)
ptq.pQueue = pq.New(
func(a, b pq.Elem) bool {
pa := a.(*peertracker.PeerTracker)
pb := b.(*peertracker.PeerTracker)
return ptq.peerComparator(pa, pb)
},
)
return ptq
}

Expand Down Expand Up @@ -171,7 +198,11 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer)
var opts []peertracker.Option
if ptq.taskComparator != nil {
opts = append(opts, peertracker.WithQueueTaskComparator(ptq.taskComparator))
}
peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer, opts...)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
Expand Down
53 changes: 45 additions & 8 deletions peertracker/peertracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,51 @@ type PeerTracker struct {

freezeVal int

queueTaskComparator peertask.QueueTaskComparator

// priority queue of tasks belonging to this peer
taskQueue pq.PQ

taskMerger TaskMerger
}

// Option is a function that configures the peer tracker
type Option func(*PeerTracker)

// WithQueueTaskComparator sets a custom QueueTask comparison function for the
// peer tracker's task queue.
func WithQueueTaskComparator(f peertask.QueueTaskComparator) Option {
return func(pt *PeerTracker) {
pt.queueTaskComparator = f
}
}

// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker {
return &PeerTracker{
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int, opts ...Option) *PeerTracker {
pt := &PeerTracker{
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
queueTaskComparator: peertask.PriorityCompare,
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}

for _, opt := range opts {
opt(pt)
}

pt.taskQueue = pq.New(peertask.WrapCompare(pt.queueTaskComparator))

return pt
}

// PeerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func PeerCompare(a, b pq.Elem) bool {
pa := a.(*PeerTracker)
pb := b.(*PeerTracker)
// PeerComparator is used for peer prioritization.
// It should return true if peer 'a' has higher priority than peer 'b'
type PeerComparator func(a, b *PeerTracker) bool

// DefaultPeerComparator implements the default peer prioritization logic.
func DefaultPeerComparator(pa, pb *PeerTracker) bool {
// having no pending tasks means lowest priority
paPending := len(pa.pendingTasks)
pbPending := len(pb.pendingTasks)
Expand Down Expand Up @@ -108,6 +129,22 @@ func PeerCompare(a, b pq.Elem) bool {
return pa.activeWork < pb.activeWork
}

// TaskPriorityPeerComparator prioritizes peers based on their highest priority task.
func TaskPriorityPeerComparator(comparator peertask.QueueTaskComparator) PeerComparator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you're using this on a trusted network?

Copy link
Contributor Author

@synzhu synzhu Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, but is that actually a problem here? Are you worried that peers may lie about their priority?

I provide the comparator function myself, which does not necessarily need to compare two tasks based on what the requesters say their priority is. It could be based on something else, such as the data being requested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, but is that actually a problem here? Are you worried that peers may lie about their priority?

Well, the priority always starts at "max priority" and decreases from there:

  1. Yes, a peer could just send everything with "max priority".
  2. Longer-lived peers will always have "less" priority.

So this is only useful if you're planning on running a private network and intend on setting the priority yourself.


I provide the comparator function myself, which does not necessarily need to compare two tasks based on what the requesters say their priority is. It could be based on something else, such as the data being requested.

Got it. In that case, any objections to removing TaskPriorityPeerComparitor? I'm a bit concerned someone will misuse it and wonder why everything is broken.

Copy link
Contributor Author

@synzhu synzhu Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is only useful if you're planning on running a private network and intend on setting the priority yourself.

Indeed, I was only intending to use this when supplying a QueueTaskComparator which does comparison based on something other than the priority sent by a peer. Maybe I should go farther and explicitly document that this is the intended usecase? Perhaps something like:

// TaskPriorityPeerComparator prioritizes peers based on their highest priority task, where priorities
// are determined based on the given QueueTask comparison function, and NOT the priority specified
// by the client.

Or I can rename it to something else if you think the naming is confusing. But yeah, essentially how this is supposed to work is, I provide a QueueTask comparison function, and then Peer comparison logic will be: "the peer with higher priority is the one whose highest priority task has higher priority according to the provided QueueTask comparison function". Hope this makes sense?

If I remove it, I will not be able to achieve this usecase. It's not possible atm for me to define this outside of the package, because taskQueue is private: https://github.com/smnzhu/go-peertaskqueue/blob/9432115230b5ce765a9f08c46d6a1dc67b65798f/peertracker/peertracker.go#L135-L136

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, no. I just somehow completely failed to actually see the code I was reviewing. I need to pay more attention, this code is fine.

return func(pa, pb *PeerTracker) bool {
ta := pa.taskQueue.Peek()
tb := pb.taskQueue.Peek()
if ta == nil {
return false
}
if tb == nil {
return true
}

return comparator(ta.(*peertask.QueueTask), tb.(*peertask.QueueTask))
}
}

// Target returns the peer that this peer tracker tracks tasks for
func (p *PeerTracker) Target() peer.ID {
return p.target
Expand Down