Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace CarOffsetWriter with car.NewCarV1StreamReader #693

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b
github.com/ipld/go-ipld-prime v0.17.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,8 @@ github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e h1:0cTsDz2E2JTlKASIr
github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e/go.mod h1:Uslcn4O9cBKK9wqHm/cLTFacg6RAPv6LZx2mxd2Ypl4=
github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI=
github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0=
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e h1:A4ttip3C2PLdE29/owgZAUgSX/qtIU6vphQU9CEPlN4=
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e/go.mod h1:sDHqspWMwG6cC0lrid3Lq2xtIR4R6iy6ymCNT0drhaI=
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b h1:QdxArU06XS5rmVwhHOq5MZ/oCfoWWwrvfoAY8mCoA2o=
github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0=
github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY=
Expand Down
80 changes: 51 additions & 29 deletions transport/httptransport/libp2p_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"sync"
"time"

"github.com/filecoin-project/boost/car"
"github.com/filecoin-project/boost/transport/types"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
car "github.com/ipld/go-car/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -36,9 +39,7 @@ type Libp2pCarServer struct {
auth *AuthTokenDB
bstore blockstore.Blockstore
cfg ServerConfig
bicm car.BlockInfoCacheManager

ctx context.Context
cancel context.CancelFunc
server *http.Server
netListener net.Listener
Expand All @@ -47,21 +48,14 @@ type Libp2pCarServer struct {
*transfersMgr
}

type ServerConfig struct {
BlockInfoCacheManager car.BlockInfoCacheManager
}
type ServerConfig struct{}

func NewLibp2pCarServer(h host.Host, auth *AuthTokenDB, bstore blockstore.Blockstore, cfg ServerConfig) *Libp2pCarServer {
bcim := cfg.BlockInfoCacheManager
if bcim == nil {
bcim = car.NewRefCountBICM()
}
return &Libp2pCarServer{
h: h,
auth: auth,
bstore: bstore,
cfg: cfg,
bicm: bcim,
transfersMgr: newTransfersManager(),
}
}
Expand Down Expand Up @@ -107,7 +101,6 @@ func (s *Libp2pCarServer) Start(ctx context.Context) error {
}

func (s *Libp2pCarServer) Stop(ctx context.Context) error {
bicmerr := s.bicm.Close()
s.cancel()
lerr := s.netListener.Close()
serr := s.server.Close()
Expand All @@ -121,7 +114,7 @@ func (s *Libp2pCarServer) Stop(ctx context.Context) error {
if serr != nil {
return serr
}
return bicmerr
return nil
}

// handler is called by the http library to handle an incoming HTTP request
Expand Down Expand Up @@ -149,9 +142,7 @@ func (s *Libp2pCarServer) handler(w http.ResponseWriter, r *http.Request) {
defer s.h.ConnManager().Unprotect(pid, tag)

// Get a block info cache for the CarOffsetWriter
bic := s.bicm.Get(authVal.PayloadCid)
err = s.serveContent(w, r, authToken, authVal, bic)
s.bicm.Unref(authVal.PayloadCid, err)
_ = s.serveContent(w, r, authToken, authVal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should log this error somewhere?

}

func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpError) {
Expand Down Expand Up @@ -183,12 +174,43 @@ func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpE
return authToken, val, nil
}

func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue, bic *car.BlockInfoCache) error {
func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue) error {
ctx := r.Context()

// Create a CarOffsetWriter and a reader for it
cow := car.NewCarOffsetWriter(val.PayloadCid, s.bstore, bic)
content := car.NewCarReaderSeeker(ctx, cow, val.Size)
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: s.bstore}
ls.SetReadStorage(&bsa)

content, err := car.NewCarV1StreamReader(
ctx,
&ls,
val.PayloadCid,
selectorparse.CommonSelector_ExploreAllRecursively,
car.WithDataPayloadSize(val.Size),
)
if err != nil {
return err
}
closer, _ := content.(io.Closer)

cancelContent := func(parentCtx context.Context) (err error) {
cancelComplete := make(chan struct{}, 1)
go func() {
// calling a Seek() on the SkipWriterReaderSeeker from go-car will cancel
// any existing writer and block until that cancel is complete
if closeErr := closer.Close(); closeErr != nil && !errors.Is(closeErr, context.Canceled) {
// context.Canceled can come up through the http Request from a socket close, ignore it
err = closeErr
}
cancelComplete <- struct{}{}
}()
select {
case <-parentCtx.Done():
return parentCtx.Err()
case <-cancelComplete:
}
return err
}
Comment on lines +196 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd suggest moving the cancelContent function inside of sendCar as it's not used anywhere else


// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
Expand All @@ -202,12 +224,12 @@ func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, a
}

// Send the CAR file
return s.sendCar(r, w, val, authToken, content)
return s.sendCar(r, w, val, authToken, content, cancelContent)
}

func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content *car.CarReaderSeeker) error {
func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content io.ReadSeeker, cancel func(context.Context) error) error {
// Create transfer
xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, content)
xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, cancel)

// Add transfer to the list of active transfers
fireEvent, err := s.transfersMgr.add(xfer)
Expand Down Expand Up @@ -571,7 +593,7 @@ type Libp2pTransfer struct {
AuthToken string
LocalAddr string
RemoteAddr string
content *car.CarReaderSeeker
Cancel func(context.Context) error
// indicates whether this transfer replaces a previous transfer with the
// same id
isRestart bool
Expand All @@ -589,7 +611,7 @@ type Libp2pTransfer struct {
eventsDrained chan struct{}
}

func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, content *car.CarReaderSeeker) *Libp2pTransfer {
func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, cancel func(context.Context) error) *Libp2pTransfer {
return &Libp2pTransfer{
ID: val.ID,
PayloadCid: val.PayloadCid,
Expand All @@ -598,7 +620,7 @@ func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remot
AuthToken: authToken,
LocalAddr: localAddr,
RemoteAddr: remoteAddr,
content: content,
Cancel: cancel,
status: types.TransferStatusStarted,
eventsDrained: make(chan struct{}),
}
Expand Down Expand Up @@ -666,8 +688,7 @@ func (t *Libp2pTransfer) state() types.TransferState {
// completed or errored out).
func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, error) {
// Cancel the read / write stream
err := t.content.Cancel(ctx)
if err != nil {
if err := t.Cancel(ctx); err != nil {
return nil, err
}

Expand All @@ -684,10 +705,11 @@ func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, erro
}

func (t *Libp2pTransfer) close(ctx context.Context) {

t.closed = true

// Cancel the read / write stream
go t.content.Cancel(ctx) //nolint:errcheck
go t.Cancel(ctx) //nolint:errcheck
}

// eventPublished is called when an event for this transfer is emitted
Expand Down