Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
I think it works
Browse files Browse the repository at this point in the history
WIP on fixing tests
  • Loading branch information
Jorropo committed Aug 1, 2022
1 parent fd744f3 commit 3d91ab1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 35 deletions.
14 changes: 11 additions & 3 deletions client/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bmetrics "github.com/ipfs/go-bitswap/metrics"
bsnet "github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/tracer"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -159,6 +160,15 @@ func WithTaskComparator(comparator TaskComparator) Option {
}
}

// Configures Bitswap to use given tracer.
// This provides methods to access all messages sent and received by Bitswap.
// This interface can be used to implement various statistics (this is original intent).
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Bitswap) {
bs.tracer = tap
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -295,7 +305,7 @@ type Bitswap struct {
allMetric metrics.Histogram

// External statistics interface
tracer Tracer
tracer tracer.Tracer

// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager
Expand Down Expand Up @@ -346,8 +356,6 @@ type counters struct {
blocksRecvd uint64
dupBlocksRecvd uint64
dupDataRecvd uint64
blocksSent uint64
dataSent uint64
dataRecvd uint64
messagesRecvd uint64
}
Expand Down
4 changes: 0 additions & 4 deletions client/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type Stat struct {
Peers []string
BlocksReceived uint64
DataReceived uint64
BlocksSent uint64
DataSent uint64
DupBlksReceived uint64
DupDataReceived uint64
MessagesReceived uint64
Expand All @@ -30,8 +28,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st.BlocksReceived = c.blocksRecvd
st.DupBlksReceived = c.dupBlocksRecvd
st.DupDataReceived = c.dupDataRecvd
st.BlocksSent = c.blocksSent
st.DataSent = c.dataSent
st.DataReceived = c.dataRecvd
st.MessagesReceived = c.messagesRecvd
bs.counterLk.Unlock()
Expand Down
14 changes: 0 additions & 14 deletions decision/iface.go

This file was deleted.

37 changes: 31 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package server
import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/internal/defaults"
pb "github.com/ipfs/go-bitswap/message/pb"
bmetrics "github.com/ipfs/go-bitswap/metrics"
bsnet "github.com/ipfs/go-bitswap/network"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-bitswap/tracer"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"go.uber.org/zap"
)

Expand All @@ -35,6 +35,13 @@ type Server struct {
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

// External statistics interface
tracer tracer.Tracer

// Counters for various statistics
counterLk sync.Mutex
counters Stat

// the total number of simultaneous threads sending outgoing messages
taskWorkerCount int
}
Expand All @@ -60,6 +67,12 @@ func TaskWorkerCount(count int) Option {
}
}

func WithTracer(tap tracer.Tracer) Option {
return func(bs *Server) {
bs.tracer = tap
}
}

func (bs *Server) startWorkers(ctx context.Context, px process.Process) {

// Start up workers to handle requests from other nodes for the data on this node
Expand Down Expand Up @@ -107,7 +120,7 @@ func (bs *Server) taskWorker(ctx context.Context, id int) {
}
}

func (bs *Server) logOutgoingBlocks(env *engine.Envelope) {
func (bs *Server) logOutgoingBlocks(env *decision.Envelope) {
if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
return
}
Expand Down Expand Up @@ -146,7 +159,7 @@ func (bs *Server) logOutgoingBlocks(env *engine.Envelope) {
}
}

func (bs *Server) sendBlocks(ctx context.Context, env *engine.Envelope) {
func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
Expand All @@ -168,9 +181,21 @@ func (bs *Server) sendBlocks(ctx context.Context, env *engine.Envelope) {
dataSent += len(b.RawData())
}
bs.counterLk.Lock()
bs.counters.blocksSent += uint64(len(blocks))
bs.counters.dataSent += uint64(dataSent)
bs.counters.BlocksSent += uint64(len(blocks))
bs.counters.DataSent += uint64(dataSent)
bs.counterLk.Unlock()
bs.sentHistogram.Observe(float64(env.Message.Size()))
log.Debugw("sent message", "peer", env.Peer)
}

type Stat struct {
BlocksSent uint64
DataSent uint64
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Server) Stat() Stat {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
return bs.counters
}
9 changes: 1 addition & 8 deletions client/tracer.go → tracer/tracer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package tracer

import (
bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -11,10 +11,3 @@ type Tracer interface {
MessageReceived(peer.ID, bsmsg.BitSwapMessage)
MessageSent(peer.ID, bsmsg.BitSwapMessage)
}

// Configures Bitswap to use given tracer.
func WithTracer(tap Tracer) Option {
return func(bs *Bitswap) {
bs.tracer = tap
}
}

0 comments on commit 3d91ab1

Please sign in to comment.