Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
chore: remove goprocess from blockstoremanager
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Aug 5, 2022
1 parent 1d1c6bf commit 7c9bc59
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 37 deletions.
1 change: 0 additions & 1 deletion bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,

// Set up decision engine
bs.engine = decision.NewEngine(
ctx,
bstore,
bs.engineBstoreWorkerCount,
bs.engineTaskWorkerCount,
Expand Down
32 changes: 19 additions & 13 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@ import (
bstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
pendingGauge metrics.Gauge
activeGauge metrics.Gauge

workerWG sync.WaitGroup
stopChan chan struct{}
stopOnce sync.Once
}

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(
ctx context.Context,
bs bstore.Blockstore,
workerCount int,
pendingGauge metrics.Gauge,
Expand All @@ -36,26 +37,31 @@ func newBlockstoreManager(
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
pendingGauge: pendingGauge,
activeGauge: activeGauge,
stopChan: make(chan struct{}),
}
}

func (bsm *blockstoreManager) start(px process.Process) {
px.AddChild(bsm.px)
// Start up workers
func (bsm *blockstoreManager) start() {
bsm.workerWG.Add(bsm.workerCount)
for i := 0; i < bsm.workerCount; i++ {
bsm.px.Go(func(px process.Process) {
bsm.worker(px)
})
go bsm.worker()
}
}

func (bsm *blockstoreManager) worker(px process.Process) {
func (bsm *blockstoreManager) stop() {
bsm.stopOnce.Do(func() {
close(bsm.stopChan)
bsm.workerWG.Wait()
})
}

func (bsm *blockstoreManager) worker() {
defer bsm.workerWG.Done()
for {
select {
case <-px.Closing():
case <-bsm.stopChan:
return
case job := <-bsm.jobs:
bsm.pendingGauge.Dec()
Expand All @@ -70,7 +76,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
case <-bsm.stopChan:
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
bsm.pendingGauge.Inc()
Expand Down
26 changes: 11 additions & 15 deletions internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import (
ds_sync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
process "github.com/jbenet/goprocess"
)

func newBlockstoreManagerForTesting(
t *testing.T,
ctx context.Context,
bs blockstore.Blockstore,
workerCount int,
) *blockstoreManager {
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
bsm := newBlockstoreManager(bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
bsm.start()
t.Cleanup(bsm.stop)
return bsm
}

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
Expand All @@ -36,8 +39,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 5)

cids := testutil.GenerateCids(4)
sizes, err := bsm.getBlockSizes(ctx, cids)
Expand Down Expand Up @@ -75,8 +77,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 5)

exp := make(map[cid.Cid]blocks.Block)
var blks []blocks.Block
Expand Down Expand Up @@ -159,8 +160,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, workerCount)

blkSize := int64(8 * 1024)
blks := testutil.GenerateBlocksOfSize(32, blkSize)
Expand Down Expand Up @@ -201,9 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 3)

blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
Expand All @@ -216,7 +214,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
t.Fatal(err)
}

go px.Close()
bsm.stop()

time.Sleep(5 * time.Millisecond)

Expand All @@ -241,9 +239,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
bstore := blockstore.NewBlockstore(dstore)

ctx := context.Background()
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)
bsm := newBlockstoreManagerForTesting(t, ctx, bstore, 3)

blks := testutil.GenerateBlocksOfSize(100, 128)
var ks []cid.Cid
Expand Down
19 changes: 12 additions & 7 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
bstoreWorkerCount,
engineTaskWorkerCount, maxOutstandingBytesPerPeer int,
Expand All @@ -270,7 +269,6 @@ func NewEngine(
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
bstoreWorkerCount,
engineTaskWorkerCount,
Expand All @@ -288,7 +286,6 @@ func NewEngine(
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
bstoreWorkerCount,
engineTaskWorkerCount, maxOutstandingBytesPerPeer int,
Expand All @@ -310,7 +307,7 @@ func newEngine(
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, bstoreWorkerCount, pendingBlocksGauge, activeBlocksGauge),
bsm: newBlockstoreManager(bs, bstoreWorkerCount, pendingBlocksGauge, activeBlocksGauge),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
Expand Down Expand Up @@ -391,20 +388,28 @@ func (e *Engine) startScoreLedger(px process.Process) {
})
}

func (e *Engine) startBlockstoreManager(px process.Process) {
e.bsm.start()
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.bsm.stop()
})
}

// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager
e.bsm.start(px)
e.startBlockstoreManager(px)
e.startScoreLedger(px)

e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()

for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) {
px.Go(func(_ process.Process) {
e.taskWorker(ctx)
})
}

}

func (e *Engine) onPeerAdded(p peer.ID) {
Expand Down
1 change: 0 additions & 1 deletion internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func newEngineForTesting(
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newEngine(
ctx,
bs,
bstoreWorkerCount,
engineTaskWorkerCount,
Expand Down

0 comments on commit 7c9bc59

Please sign in to comment.