Skip to content

Commit

Permalink
Remove Provider from Blockservice. Revert Session changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 20, 2024
1 parent b55366d commit 0ca70f4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 151 deletions.
132 changes: 63 additions & 69 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/provider"
"github.com/ipfs/boxo/verifcid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -74,21 +73,10 @@ type BoundedBlockService interface {

var _ BoundedBlockService = (*blockService)(nil)

// ProvidingBlockService is a Blockservice which provides new blocks to a provider.
type ProvidingBlockService interface {
BlockService

// Provider can return nil, then no provider is used.
Provider() provider.Provider
}

var _ ProvidingBlockService = (*blockService)(nil)

type blockService struct {
allowlist verifcid.Allowlist
blockstore blockstore.Blockstore
exchange exchange.Interface
provider provider.Provider
// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
Expand All @@ -111,13 +99,6 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option {
}
}

// WithProvider allows to advertise anything that is added through the blockservice.
func WithProvider(prov provider.Provider) Option {
return func(bs *blockService) {
bs.provider = prov
}
}

// New creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService {
if exchange == nil {
Expand All @@ -140,11 +121,6 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option)

// Blockstore returns the blockstore behind this blockservice.
func (s *blockService) Blockstore() blockstore.Blockstore {
if s.provider != nil {
// FIXME: this is a hack remove once ipfs/boxo#567 is solved.
return providingBlockstore{s.blockstore, s.provider}
}

return s.blockstore
}

Expand All @@ -157,13 +133,23 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
return s.allowlist
}

func (s *blockService) Provider() provider.Provider {
return s.provider
// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
ses := grabSessionFromContext(ctx, bs)
if ses != nil {
return ses
}

return newSession(ctx, bs)
}

// NewSession creates a new session that allows for controlled exchange of
// wantlists to decrease the bandwidth overhead.
func NewSession(ctx context.Context, bs BlockService) *Session {
// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
func newSession(ctx context.Context, bs BlockService) *Session {
return &Session{bs: bs, sesctx: ctx}
}

Expand All @@ -183,7 +169,7 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
}
}

if err = s.blockstore.Put(ctx, o); err != nil {
if err := s.blockstore.Put(ctx, o); err != nil {
return err
}

Expand All @@ -194,11 +180,6 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
if s.provider != nil {
if err := s.provider.Provide(ctx, o.Cid(), true); err != nil {
logger.Errorf("Provide: %s", err.Error())
}
}

return nil
}
Expand Down Expand Up @@ -245,19 +226,16 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
}
}
if s.provider != nil {
for _, o := range toput {
if err := s.provider.Provide(ctx, o.Cid(), true); err != nil {
logger.Errorf("Provide: %s", err.Error())
}
}
}
return nil
}

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

Expand All @@ -275,7 +253,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}

provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs)
blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
switch {
Expand Down Expand Up @@ -309,12 +287,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
}
if provider != nil {
err = provider.Provide(ctx, blk.Cid(), true)
if err != nil {
return nil, err
}
}
logger.Debugf("BlockService.BlockFetched %s", c)
return blk, nil
}
Expand All @@ -323,6 +295,10 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

Expand Down Expand Up @@ -360,7 +336,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
ks = ks2
}

provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice)
bs := blockservice.Blockstore()

var misses []cid.Cid
for _, c := range ks {
Expand Down Expand Up @@ -419,14 +395,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
cache[0] = nil // early gc
}

if provider != nil {
err = provider.Provide(ctx, b.Cid(), true)
if err != nil {
logger.Errorf("could not tell the provider about new blocks: %s", err)
return
}
}

select {
case out <- b:
case <-ctx.Done():
Expand Down Expand Up @@ -506,21 +474,47 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, newSession(ctx, bs))
}

// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
if s == nil {
return nil
}

ss, ok := s.(*Session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

return ss
}

// grabAllowlistFromBlockservice never returns nil
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
if bbs, ok := bs.(BoundedBlockService); ok {
return bbs.Allowlist()
}
return verifcid.DefaultAllowlist
}

// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used.
func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) {
if bbs, ok := bs.(*blockService); ok {
return bbs.provider, bbs.blockstore
}
if bbs, ok := bs.(ProvidingBlockService); ok {
return bbs.Provider(), bbs.Blockstore()
}
return nil, bs.Blockstore()
}
127 changes: 46 additions & 81 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,102 +288,67 @@ func TestAllowlist(t *testing.T) {
check(NewSession(ctx, blockservice).GetBlock)
}

type wrappedBlockservice struct {
BlockService
type fakeIsNewSessionCreateExchange struct {
ses exchange.Fetcher
newSessionWasCalled bool
}

type mockProvider []cid.Cid
var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil)

func (p *mockProvider) Provide(ctx context.Context, c cid.Cid, announce bool) error {
*p = append(*p, c)
func (*fakeIsNewSessionCreateExchange) Close() error {
return nil
}

func TestProviding(t *testing.T) {
t.Parallel()
a := assert.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
panic("should call on the session")
}

blocks := random.BlocksOfSize(12, blockSize)
func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
panic("should call on the session")
}

exchange := blockstore.NewBlockstore(ds.NewMapDatastore())
func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher {
f.newSessionWasCalled = true
return f.ses
}

prov := mockProvider{}
blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov))
var added []cid.Cid
func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error {
return nil
}

// Adding one block provide it.
a.NoError(blockservice.AddBlock(ctx, blocks[0]))
added = append(added, blocks[0].Cid())
blocks = blocks[1:]
func TestContextSession(t *testing.T) {
t.Parallel()
a := assert.New(t)

// Adding multiple blocks provide them.
a.NoError(blockservice.AddBlocks(ctx, blocks[0:2]))
added = append(added, blocks[0].Cid(), blocks[1].Cid())
blocks = blocks[2:]
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Downloading one block provide it.
a.NoError(exchange.Put(ctx, blocks[0]))
_, err := blockservice.GetBlock(ctx, blocks[0].Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks provide them.
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
var got []cid.Cid
for b := range blockservice.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
added = append(added, cids...)
a.ElementsMatch(cids, got)
blocks = blocks[2:]
blks := random.BlocksOfSize(2, blockSize)
block1 := blks[0]
block2 := blks[1]

session := NewSession(ctx, blockservice)
bs := blockstore.NewBlockstore(ds.NewMapDatastore())
a.NoError(bs.Put(ctx, block1))
a.NoError(bs.Put(ctx, block2))
sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)}

// Downloading one block over a session provide it.
a.NoError(exchange.Put(ctx, blocks[0]))
_, err = session.GetBlock(ctx, blocks[0].Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks over a session provide them.
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
got = nil
for b := range session.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
a.ElementsMatch(cids, got)
added = append(added, cids...)
blocks = blocks[2:]
service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)

// Test wrapping the blockservice like nopfs does.
session = NewSession(ctx, wrappedBlockservice{blockservice})
ctx = ContextWithSession(ctx, service)

// Downloading one block over a wrapped blockservice session provide it.
a.NoError(exchange.Put(ctx, blocks[0]))
_, err = session.GetBlock(ctx, blocks[0].Cid())
b, err := service.GetBlock(ctx, block1.Cid())
a.NoError(err)
added = append(added, blocks[0].Cid())
blocks = blocks[1:]

// Downloading multiple blocks over a wrapped blockservice session provide them.
a.NoError(exchange.PutMany(ctx, blocks[0:2]))
cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()}
got = nil
for b := range session.GetBlocks(ctx, cids) {
got = append(got, b.Cid())
}
a.ElementsMatch(cids, got)
added = append(added, cids...)
blocks = blocks[2:]

a.Empty(blocks)

a.ElementsMatch(added, []cid.Cid(prov))
a.Equal(b.RawData(), block1.RawData())
a.True(sesEx.newSessionWasCalled, "new session from context should be created")
sesEx.newSessionWasCalled = false

bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()})
a.Equal((<-bchan).RawData(), block2.RawData())
a.False(sesEx.newSessionWasCalled, "session should be reused in context")

a.Equal(
NewSession(ctx, service),
NewSession(ContextWithSession(ctx, service), service),
"session must be deduped in all invocations on the same context",
)
}
Loading

0 comments on commit 0ca70f4

Please sign in to comment.