Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

chore: remove goprocess from blockstoremanager #572

Merged
merged 1 commit into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,

// Set up decision engine
bs.engine = decision.NewEngine(
ctx,
bstore,
bs.engineBstoreWorkerCount,
bs.engineTaskWorkerCount,
Expand Down
32 changes: 19 additions & 13 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@ import (
bstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
pendingGauge metrics.Gauge
activeGauge metrics.Gauge

workerWG sync.WaitGroup
stopChan chan struct{}
stopOnce sync.Once
}

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(
ctx context.Context,
bs bstore.Blockstore,
workerCount int,
pendingGauge metrics.Gauge,
Expand All @@ -36,26 +37,31 @@ func newBlockstoreManager(
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
pendingGauge: pendingGauge,
activeGauge: activeGauge,
stopChan: make(chan struct{}),
}
}

func (bsm *blockstoreManager) start(px process.Process) {
px.AddChild(bsm.px)
// Start up workers
func (bsm *blockstoreManager) start() {
bsm.workerWG.Add(bsm.workerCount)
for i := 0; i < bsm.workerCount; i++ {
bsm.px.Go(func(px process.Process) {
bsm.worker(px)
})
go bsm.worker()
}
}

func (bsm *blockstoreManager) worker(px process.Process) {
func (bsm *blockstoreManager) stop() {
bsm.stopOnce.Do(func() {
close(bsm.stopChan)
})
bsm.workerWG.Wait()
}

func (bsm *blockstoreManager) worker() {
defer bsm.workerWG.Done()
for {
select {
case <-px.Closing():
case <-bsm.stopChan:
return
case job := <-bsm.jobs:
bsm.pendingGauge.Dec()
Expand All @@ -70,7 +76,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
case <-bsm.stopChan:
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
bsm.pendingGauge.Inc()
Expand Down
26 changes: 11 additions & 15 deletions internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import (
ds_sync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
process "github.com/jbenet/goprocess"
)

func newBlockstoreManagerForTesting(
t *testing.T,
ctx context.Context,
bs blockstore.Blockstore,
workerCount int,
) *blockstoreManager {
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
bsm := newBlockstoreManager(bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
bsm.start()
t.Cleanup(bsm.stop)
return bsm
}

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
Expand All @@ -36,8 +39,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 5)

cids := testutil.GenerateCids(4)
sizes, err := bsm.getBlockSizes(ctx, cids)
Expand Down Expand Up @@ -75,8 +77,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 5)

exp := make(map[cid.Cid]blocks.Block)
var blks []blocks.Block
Expand Down Expand Up @@ -159,8 +160,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, workerCount)

blkSize := int64(8 * 1024)
blks := testutil.GenerateBlocksOfSize(32, blkSize)
Expand Down Expand Up @@ -201,9 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 3)

blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
Expand All @@ -216,7 +214,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
t.Fatal(err)
}

go px.Close()
bsm.stop()

time.Sleep(5 * time.Millisecond)

Expand All @@ -241,9 +239,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
bstore := blockstore.NewBlockstore(dstore)

ctx := context.Background()
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 3)

blks := testutil.GenerateBlocksOfSize(100, 128)
var ks []cid.Cid
Expand Down
19 changes: 12 additions & 7 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
bstoreWorkerCount,
engineTaskWorkerCount, maxOutstandingBytesPerPeer int,
Expand All @@ -270,7 +269,6 @@ func NewEngine(
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
bstoreWorkerCount,
engineTaskWorkerCount,
Expand All @@ -288,7 +286,6 @@ func NewEngine(
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
bstoreWorkerCount,
engineTaskWorkerCount, maxOutstandingBytesPerPeer int,
Expand All @@ -310,7 +307,7 @@ func newEngine(
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, bstoreWorkerCount, pendingBlocksGauge, activeBlocksGauge),
bsm: newBlockstoreManager(bs, bstoreWorkerCount, pendingBlocksGauge, activeBlocksGauge),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
Expand Down Expand Up @@ -391,20 +388,28 @@ func (e *Engine) startScoreLedger(px process.Process) {
})
}

func (e *Engine) startBlockstoreManager(px process.Process) {
e.bsm.start()
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.bsm.stop()
})
}

// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager
e.bsm.start(px)
e.startBlockstoreManager(px)
e.startScoreLedger(px)

e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()

for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) {
px.Go(func(_ process.Process) {
e.taskWorker(ctx)
})
}

}

func (e *Engine) onPeerAdded(p peer.ID) {
Expand Down
1 change: 0 additions & 1 deletion internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func newEngineForTesting(
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newEngine(
ctx,
bs,
bstoreWorkerCount,
engineTaskWorkerCount,
Expand Down