Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap sessions #3867

Merged
merged 11 commits into from
Jul 7, 2017
4 changes: 2 additions & 2 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (bg *BlockGenerator) Next() *blocks.BasicBlock {
}

// Blocks generates as many BasicBlocks as specified by n.
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
59 changes: 53 additions & 6 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"

logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)

Expand Down Expand Up @@ -77,6 +78,23 @@ func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

// NewSession creates a bitswap session that allows for controlled exchange of
// wantlists to decrease the bandwidth overhead.
func NewSession(ctx context.Context, bs BlockService) *Session {
exchange := bs.Exchange()
if bswap, ok := exchange.(*bitswap.Bitswap); ok {
ses := bswap.NewSession(ctx)
return &Session{
ses: ses,
bs: bs.Blockstore(),
}
}
return &Session{
ses: exchange,
bs: bs.Blockstore(),
}
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
Expand Down Expand Up @@ -141,16 +159,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.blockstore.Get(c)
var f exchange.Fetcher
if s.exchange != nil {
f = s.exchange
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, can't we just pass s.exchange?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, because then the response wouldnt be nil (it would be a typed nil, which in interface form is != nil)


return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
block, err := bs.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.exchange != nil {
if err == blockstore.ErrNotFound && f != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.exchange.GetBlock(ctx, c)
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -172,12 +199,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.blockstore.Get(c)
hit, err := bs.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -194,7 +225,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.exchange.GetBlocks(ctx, misses)
rblocks, err := f.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -220,3 +251,19 @@ func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.exchange.Close()
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
return getBlock(ctx, c, s.bs, s.ses)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
return getBlocks(ctx, ks, s.bs, s.ses)
}
11 changes: 10 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{
ks = append(ks, c)
}

bs.CancelWants(ks)
// TODO: This should maybe find *all* sessions for this request and cancel them?
// (why): in reality, i think this command should be removed. Its
// messing with the internal state of bitswap. You should cancel wants
// by killing the command that caused the want.
bs.CancelWants(ks, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does canceling the context not cancel the associated wants?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, cancelling the context of any request command cancels the associated wants. So this command really should just get removed, it has no real purpose

},
}

Expand Down Expand Up @@ -107,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`,
res.SetError(err, cmds.ErrNormal)
return
}
if pid == nd.Identity {
res.SetOutput(&KeyList{bs.GetWantlist()})
return
}

res.SetOutput(&KeyList{bs.WantlistForPeer(pid)})
} else {
res.SetOutput(&KeyList{bs.GetWantlist()})
Expand Down
2 changes: 1 addition & 1 deletion core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format"
humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
multibase "gx/ipfs/QmcxkxTVuURV2Ptse8TvkqH5BQDwV62X1x19JqqvbBzwUM/go-multibase"
multibase "gx/ipfs/Qme4T6BE4sQxg7ZouamF5M7Tx1ZFTqzcns7BkyQPXpoT99/go-multibase"
)

const (
Expand Down
Loading