Skip to content

Commit

Permalink
Merge pull request #38 from jbenet/feat/bitswap-import-cleanup
Browse files Browse the repository at this point in the history
chore(bitswap) tidy up bitswap, ledger
  • Loading branch information
Brian Tiger Chow committed Sep 11, 2014
2 parents dbc3a05 + ad30333 commit 677d790
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 30 deletions.
31 changes: 15 additions & 16 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package bitswap

import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
"time"

proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

"time"
)

// PartnerWantListMax is the bound for the number of keys we'll store per
Expand Down Expand Up @@ -44,7 +44,7 @@ type BitSwap struct {
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
// Changes to the Ledger should be committed to the datastore.
partners map[u.Key]*Ledger
partners LedgerMap

// haveList is the set of keys we have values for. a map for fast lookups.
// haveList KeySet -- not needed. all values in datastore?
Expand Down Expand Up @@ -136,7 +136,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
go func() {
for _, ledger := range bs.partners {
if _, ok := ledger.WantList[blk.Key()]; ok {
if ledger.WantListContains(blk.Key()) {
//send block to node
if ledger.ShouldSend() {
bs.SendBlock(ledger.Partner, blk)
Expand Down Expand Up @@ -189,14 +189,13 @@ func (bs *BitSwap) handleMessages() {
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
ledg := bs.GetLedger(p)
ledger := bs.getLedger(p)

dsk := ds.NewKey(want)
blk_i, err := bs.datastore.Get(dsk)
if err != nil {
if err == ds.ErrNotFound {
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(want)] = struct{}{}
ledger.Wants(u.Key(want))
}
u.PErr("datastore get error: %v\n", err)
return
Expand All @@ -208,15 +207,15 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
return
}

if ledg.ShouldSend() {
if ledger.ShouldSend() {
u.DOut("Sending block to peer.\n")
bblk, err := blocks.NewBlock(blk)
if err != nil {
u.PErr("newBlock error: %v\n", err)
return
}
bs.SendBlock(p, bblk)
ledg.SentBytes(len(blk))
ledger.SentBytes(len(blk))
} else {
u.DOut("Decided not to send block.")
}
Expand All @@ -236,11 +235,11 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
}
bs.listener.Respond(string(blk.Key()), mes)

ledger := bs.GetLedger(p)
ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data))
}

func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger {
l, ok := bs.partners[p.Key()]
if ok {
return l
Expand Down Expand Up @@ -273,7 +272,7 @@ func (bs *BitSwap) Halt() {

func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
bs.strategy = sf
for _, ledg := range bs.partners {
ledg.Strategy = sf
for _, ledger := range bs.partners {
ledger.Strategy = sf
}
}
61 changes: 47 additions & 14 deletions bitswap/ledger.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
package bitswap

import (
"sync"
"time"

peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"

"time"
)

// Ledger stores the data exchange relationship between two peers.
type Ledger struct {
lock sync.RWMutex

// Partner is the remote Peer.
Partner *peer.Peer

// Accounting tracks bytes sent and recieved.
Accounting debtRatio

// FirstExchnage is the time of the first data exchange.
FirstExchange time.Time
// firstExchnage is the time of the first data exchange.
firstExchange time.Time

// LastExchange is the time of the last data exchange.
LastExchange time.Time
// lastExchange is the time of the last data exchange.
lastExchange time.Time

// Number of exchanges with this peer
ExchangeCount uint64
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64

// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet
// wantList is a (bounded, small) set of keys that Partner desires.
wantList KeySet

Strategy StrategyFunc
}
Expand All @@ -35,17 +37,48 @@ type Ledger struct {
type LedgerMap map[u.Key]*Ledger

func (l *Ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()

return l.Strategy(l)
}

func (l *Ledger) SentBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}

func (l *Ledger) ReceivedBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *Ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()

l.wantList[k] = struct{}{}
}

func (l *Ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()

_, ok := l.wantList[k]
return ok
}

func (l *Ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
23 changes: 23 additions & 0 deletions bitswap/ledger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package bitswap

import (
"sync"
"testing"
)

func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(Ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}

0 comments on commit 677d790

Please sign in to comment.