Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple performance metrics #3615

Merged
merged 5 commits into from
Mar 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
iconn "gx/ipfs/QmcYnysCkyGezY6k6MQ1yHHdrRiZaU9x3M9Y1tE9qZ5hD2/go-libp2p-interface-conn"

_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
)

const (
Expand Down
43 changes: 30 additions & 13 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"

metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Expand Down Expand Up @@ -47,6 +48,9 @@ var (
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 512

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

func init() {
Expand Down Expand Up @@ -74,6 +78,11 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
// shouldn't accept a context anymore. Clients should probably use Close()
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)
ctx = metrics.CtxSubScope(ctx, "bitswap")
dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
" data blocks recived").Histogram(metricsBuckets)
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
" data blocks recived").Histogram(metricsBuckets)

notif := notifications.New()
px := process.WithTeardown(func() error {
Expand All @@ -91,6 +100,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),

dupMetric: dupHist,
allMetric: allHist,
}
go bs.wm.Run()
network.SetDelegate(bs)
Expand Down Expand Up @@ -145,6 +157,10 @@ type Bitswap struct {
blocksRecvd int
dupBlocksRecvd int
dupDataRecvd uint64

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
}

type blockRequest struct {
Expand Down Expand Up @@ -352,9 +368,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
go func(b blocks.Block) {
defer wg.Done()

if err := bs.updateReceiveCounters(b); err != nil {
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
}
bs.updateReceiveCounters(b)

k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
Expand All @@ -370,24 +384,27 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

var ErrAlreadyHaveBlock = errors.New("already have block")

func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
bs.blocksRecvd++
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
blkLen := len(b.RawData())
has, err := bs.blockstore.Has(b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
return err
return
}
if err == nil && has {
bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(len(b.RawData()))

bs.allMetric.Observe(float64(blkLen))
if has {
bs.dupMetric.Observe(float64(blkLen))
}

bs.counterLk.Lock()
defer bs.counterLk.Unlock()

bs.blocksRecvd++
if has {
return ErrAlreadyHaveBlock
bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(blkLen)
}
return nil
}

// Connected/Disconnected warns bitswap about peer connections
Expand Down
33 changes: 24 additions & 9 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"

metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
)
Expand All @@ -27,20 +29,29 @@ type WantManager struct {
network bsnet.BitSwapNetwork
ctx context.Context
cancel func()

wantlistGauge metrics.Gauge
sentHistogram metrics.Histogram
}

func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wanlist_total",
"Number of items in wantlist.").Gauge()
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
wantlistGauge: wantlistGauge,
sentHistogram: sentHistogram,
}
}

Expand Down Expand Up @@ -109,6 +120,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// throughout the network stack
defer env.Sent()

pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

msg := bsmsg.New(false)
msg.AddBlock(env.Block)
log.Infof("Sending block %s to %s", env.Block, env.Peer)
Expand Down Expand Up @@ -282,10 +295,12 @@ func (pm *WantManager) Run() {
for _, e := range entries {
if e.Cancel {
if pm.wl.Remove(e.Cid) {
pm.wantlistGauge.Dec()
filtered = append(filtered, e)
}
} else {
if pm.wl.AddEntry(e.Entry) {
pm.wantlistGauge.Inc()
filtered = append(filtered, e)
}
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH",
"hash": "QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU",
"name": "go-ds-measure",
"version": "1.1.0"
"version": "1.2.0"
},
{
"author": "whyrusleeping",
Expand Down
4 changes: 2 additions & 2 deletions repo/fsrepo/defaultds.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
config "github.com/ipfs/go-ipfs/repo/config"
"github.com/ipfs/go-ipfs/thirdparty/dir"

measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
mount "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/syncmount"
"gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
flatfs "gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
levelds "gx/ipfs/QmaHHmfEozrrotyhyN44omJouyuEtx6ahddqV6W5yRaUSQ/go-ds-leveldb"
ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
measure "gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
)

const (
Expand Down
5 changes: 3 additions & 2 deletions repo/fsrepo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"sync"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
keystore "github.com/ipfs/go-ipfs/keystore"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/common"
Expand All @@ -21,10 +20,12 @@ import (
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
dir "github.com/ipfs/go-ipfs/thirdparty/dir"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"

measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
ma "gx/ipfs/QmSWLfmj5frN9xVLMMN846dMDriy5wN5jeghUm7aTW3DAG/go-multiaddr"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
"gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
)

var log = logging.Logger("fsrepo")
Expand Down