Skip to content

Commit

Permalink
feat: replace CarOffsetWriter with car.NewCarV1StreamReader
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 10, 2022
1 parent c8f9868 commit 6f65d7a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b
github.com/ipld/go-ipld-prime v0.17.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,8 @@ github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e h1:0cTsDz2E2JTlKASIr
github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e/go.mod h1:Uslcn4O9cBKK9wqHm/cLTFacg6RAPv6LZx2mxd2Ypl4=
github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI=
github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0=
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e h1:A4ttip3C2PLdE29/owgZAUgSX/qtIU6vphQU9CEPlN4=
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e/go.mod h1:sDHqspWMwG6cC0lrid3Lq2xtIR4R6iy6ymCNT0drhaI=
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b h1:QdxArU06XS5rmVwhHOq5MZ/oCfoWWwrvfoAY8mCoA2o=
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0=
github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY=
Expand Down
80 changes: 51 additions & 29 deletions transport/httptransport/libp2p_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"sync"
"time"

"github.com/filecoin-project/boost/car"
"github.com/filecoin-project/boost/transport/types"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
car "github.com/ipld/go-car/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -36,9 +39,7 @@ type Libp2pCarServer struct {
auth *AuthTokenDB
bstore blockstore.Blockstore
cfg ServerConfig
bicm car.BlockInfoCacheManager

ctx context.Context
cancel context.CancelFunc
server *http.Server
netListener net.Listener
Expand All @@ -47,21 +48,14 @@ type Libp2pCarServer struct {
*transfersMgr
}

type ServerConfig struct {
BlockInfoCacheManager car.BlockInfoCacheManager
}
type ServerConfig struct{}

func NewLibp2pCarServer(h host.Host, auth *AuthTokenDB, bstore blockstore.Blockstore, cfg ServerConfig) *Libp2pCarServer {
bcim := cfg.BlockInfoCacheManager
if bcim == nil {
bcim = car.NewRefCountBICM()
}
return &Libp2pCarServer{
h: h,
auth: auth,
bstore: bstore,
cfg: cfg,
bicm: bcim,
transfersMgr: newTransfersManager(),
}
}
Expand Down Expand Up @@ -107,7 +101,6 @@ func (s *Libp2pCarServer) Start(ctx context.Context) error {
}

func (s *Libp2pCarServer) Stop(ctx context.Context) error {
bicmerr := s.bicm.Close()
s.cancel()
lerr := s.netListener.Close()
serr := s.server.Close()
Expand All @@ -121,7 +114,7 @@ func (s *Libp2pCarServer) Stop(ctx context.Context) error {
if serr != nil {
return serr
}
return bicmerr
return nil
}

// handler is called by the http library to handle an incoming HTTP request
Expand Down Expand Up @@ -149,9 +142,7 @@ func (s *Libp2pCarServer) handler(w http.ResponseWriter, r *http.Request) {
defer s.h.ConnManager().Unprotect(pid, tag)

// Get a block info cache for the CarOffsetWriter
bic := s.bicm.Get(authVal.PayloadCid)
err = s.serveContent(w, r, authToken, authVal, bic)
s.bicm.Unref(authVal.PayloadCid, err)
_ = s.serveContent(w, r, authToken, authVal)
}

func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpError) {
Expand Down Expand Up @@ -183,12 +174,43 @@ func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpE
return authToken, val, nil
}

func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue, bic *car.BlockInfoCache) error {
func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue) error {
ctx := r.Context()

// Create a CarOffsetWriter and a reader for it
cow := car.NewCarOffsetWriter(val.PayloadCid, s.bstore, bic)
content := car.NewCarReaderSeeker(ctx, cow, val.Size)
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: s.bstore}
ls.SetReadStorage(&bsa)

content, err := car.NewCarV1StreamReader(
ctx,
&ls,
val.PayloadCid,
selectorparse.CommonSelector_ExploreAllRecursively,
car.WithDataPayloadSize(val.Size),
)
if err != nil {
return err
}
closer, _ := content.(io.Closer)

cancelContent := func(parentCtx context.Context) (err error) {
cancelComplete := make(chan struct{}, 1)
go func() {
// calling a Seek() on the SkipWriterReaderSeeker from go-car will cancel
// any existing writer and block until that cancel is complete
if closeErr := closer.Close(); closeErr != nil && !errors.Is(closeErr, context.Canceled) {
// context.Canceled can come up through the http Request from a socket close, ignore it
err = closeErr
}
cancelComplete <- struct{}{}
}()
select {
case <-parentCtx.Done():
return parentCtx.Err()
case <-cancelComplete:
}
return err
}

// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
Expand All @@ -202,12 +224,12 @@ func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, a
}

// Send the CAR file
return s.sendCar(r, w, val, authToken, content)
return s.sendCar(r, w, val, authToken, content, cancelContent)
}

func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content *car.CarReaderSeeker) error {
func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content io.ReadSeeker, cancel func(context.Context) error) error {
// Create transfer
xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, content)
xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, cancel)

// Add transfer to the list of active transfers
fireEvent, err := s.transfersMgr.add(xfer)
Expand Down Expand Up @@ -571,7 +593,7 @@ type Libp2pTransfer struct {
AuthToken string
LocalAddr string
RemoteAddr string
content *car.CarReaderSeeker
Cancel func(context.Context) error
// indicates whether this transfer replaces a previous transfer with the
// same id
isRestart bool
Expand All @@ -589,7 +611,7 @@ type Libp2pTransfer struct {
eventsDrained chan struct{}
}

func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, content *car.CarReaderSeeker) *Libp2pTransfer {
func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, cancel func(context.Context) error) *Libp2pTransfer {
return &Libp2pTransfer{
ID: val.ID,
PayloadCid: val.PayloadCid,
Expand All @@ -598,7 +620,7 @@ func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remot
AuthToken: authToken,
LocalAddr: localAddr,
RemoteAddr: remoteAddr,
content: content,
Cancel: cancel,
status: types.TransferStatusStarted,
eventsDrained: make(chan struct{}),
}
Expand Down Expand Up @@ -666,8 +688,7 @@ func (t *Libp2pTransfer) state() types.TransferState {
// completed or errored out).
func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, error) {
// Cancel the read / write stream
err := t.content.Cancel(ctx)
if err != nil {
if err := t.Cancel(ctx); err != nil {
return nil, err
}

Expand All @@ -684,10 +705,11 @@ func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, erro
}

func (t *Libp2pTransfer) close(ctx context.Context) {

t.closed = true

// Cancel the read / write stream
go t.content.Cancel(ctx) //nolint:errcheck
go t.Cancel(ctx) //nolint:errcheck
}

// eventPublished is called when an event for this transfer is emitted
Expand Down

0 comments on commit 6f65d7a

Please sign in to comment.