Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Enable custom task prioritization logic #535

Merged
merged 8 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

// WithTaskComparator configures custom task prioritization logic.
func WithTaskComparator(comparator decision.TaskComparator) Option {
synzhu marked this conversation as resolved.
Show resolved Hide resolved
return func(bs *Bitswap) {
bs.taskComparator = comparator
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -272,6 +279,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -375,6 +383,8 @@ type Bitswap struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool

taskComparator decision.TaskComparator
}

type counters struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.4.0
github.com/ipfs/go-peertaskqueue v0.6.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.14.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI=
github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc=
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
78 changes: 76 additions & 2 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-metrics-interface"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
process "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -175,6 +176,60 @@ type Engine struct {
// used to ensure metrics are reported each fixed number of operation
metricsLock sync.Mutex
metricUpdateCounter int

taskComparator TaskComparator
}

// TaskInfo represents the details of a request from a peer.
type TaskInfo struct {
synzhu marked this conversation as resolved.
Show resolved Hide resolved
Peer peer.ID
// The CID of the block
Cid cid.Cid
// Tasks can be want-have or want-block
IsWantBlock bool
// Whether to immediately send a response if the block is not found
SendDontHave bool
// The size of the block corresponding to the task
BlockSize int
// Whether the block was found
HaveBlock bool
}

// TaskComparator is used for task prioritization.
// It should return true if task 'ta' has higher priority than task 'tb'
type TaskComparator func(ta, tb *TaskInfo) bool

type Option func(*Engine)

func WithTaskComparator(comparator TaskComparator) Option {
return func(e *Engine) {
e.taskComparator = comparator
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
taskDataA := a.Task.Data.(*taskData)
taskInfoA := &TaskInfo{
Peer: a.Target,
Cid: a.Task.Topic.(cid.Cid),
IsWantBlock: taskDataA.IsWantBlock,
SendDontHave: taskDataA.SendDontHave,
BlockSize: taskDataA.BlockSize,
HaveBlock: taskDataA.HaveBlock,
}
taskDataB := b.Task.Data.(*taskData)
taskInfoB := &TaskInfo{
Peer: b.Target,
Cid: b.Task.Topic.(cid.Cid),
IsWantBlock: taskDataB.IsWantBlock,
SendDontHave: taskDataB.SendDontHave,
BlockSize: taskDataB.BlockSize,
HaveBlock: taskDataB.HaveBlock,
}
return tc(taskInfoA, taskInfoB)
}
}

// NewEngine creates a new block sending engine for the given block store.
Expand All @@ -192,6 +247,7 @@ func NewEngine(
activeEngineGauge metrics.Gauge,
pendingBlocksGauge metrics.Gauge,
activeBlocksGauge metrics.Gauge,
opts ...Option,
) *Engine {
return newEngine(
ctx,
Expand All @@ -207,6 +263,7 @@ func NewEngine(
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
opts...,
)
}

Expand All @@ -223,6 +280,7 @@ func newEngine(
activeEngineGauge metrics.Gauge,
pendingBlocksGauge metrics.Gauge,
activeBlocksGauge metrics.Gauge,
opts ...Option,
) *Engine {

if scoreLedger == nil {
Expand All @@ -247,12 +305,28 @@ func newEngine(
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(

for _, opt := range opts {
opt(e)
}

// default peer task queue options
peerTaskQueueOpts := []peertaskqueue.Option{
peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
peertaskqueue.TaskMerger(newTaskMerger()),
peertaskqueue.IgnoreFreezing(true),
peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer))
peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer),
}

if e.taskComparator != nil {
queueTaskComparator := wrapTaskComparator(e.taskComparator)
peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.PeerComparator(peertracker.TaskPriorityPeerComparator(queueTaskComparator)))
peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.TaskComparator(queueTaskComparator))
}

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

return e
}

Expand Down