diff --git a/go.mod b/go.mod index bfa5b9e44..853dd90f8 100644 --- a/go.mod +++ b/go.mod @@ -63,8 +63,9 @@ require ( github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-unixfs v0.3.1 github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e - github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e + github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b github.com/ipld/go-ipld-prime v0.17.0 + github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 github.com/ipld/go-ipld-selector-text-lite v0.0.1 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 diff --git a/go.sum b/go.sum index 619deeb92..7562addea 100644 --- a/go.sum +++ b/go.sum @@ -1002,8 +1002,8 @@ github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e h1:0cTsDz2E2JTlKASIr github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e/go.mod h1:Uslcn4O9cBKK9wqHm/cLTFacg6RAPv6LZx2mxd2Ypl4= github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI= github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0= -github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e h1:A4ttip3C2PLdE29/owgZAUgSX/qtIU6vphQU9CEPlN4= -github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e/go.mod h1:sDHqspWMwG6cC0lrid3Lq2xtIR4R6iy6ymCNT0drhaI= +github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b h1:QdxArU06XS5rmVwhHOq5MZ/oCfoWWwrvfoAY8mCoA2o= +github.com/ipld/go-car/v2 v2.4.2-0.20220810073527-4f3c172da48b/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0= github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s= github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA= github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY= diff --git a/transport/httptransport/libp2p_server.go b/transport/httptransport/libp2p_server.go index 4f603da03..5d7a6444e 100644 --- a/transport/httptransport/libp2p_server.go +++ b/transport/httptransport/libp2p_server.go @@ -10,12 +10,15 @@ import ( "sync" "time" - "github.com/filecoin-project/boost/car" "github.com/filecoin-project/boost/transport/types" "github.com/google/uuid" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" + car "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/bsadapter" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -36,9 +39,7 @@ type Libp2pCarServer struct { auth *AuthTokenDB bstore blockstore.Blockstore cfg ServerConfig - bicm car.BlockInfoCacheManager - ctx context.Context cancel context.CancelFunc server *http.Server netListener net.Listener @@ -47,21 +48,14 @@ type Libp2pCarServer struct { *transfersMgr } -type ServerConfig struct { - BlockInfoCacheManager car.BlockInfoCacheManager -} +type ServerConfig struct{} func NewLibp2pCarServer(h host.Host, auth *AuthTokenDB, bstore blockstore.Blockstore, cfg ServerConfig) *Libp2pCarServer { - bcim := cfg.BlockInfoCacheManager - if bcim == nil { - bcim = car.NewRefCountBICM() - } return &Libp2pCarServer{ h: h, auth: auth, bstore: bstore, cfg: cfg, - bicm: bcim, transfersMgr: newTransfersManager(), } } @@ -107,7 +101,6 @@ func (s *Libp2pCarServer) Start(ctx context.Context) error { } func (s *Libp2pCarServer) Stop(ctx context.Context) error { - bicmerr := s.bicm.Close() s.cancel() lerr := s.netListener.Close() serr := s.server.Close() @@ -121,7 +114,7 @@ func (s *Libp2pCarServer) Stop(ctx context.Context) error { if serr != nil { return serr } - return bicmerr + return nil } // handler is called by the http library to handle an incoming HTTP request @@ -149,9 +142,7 @@ func (s *Libp2pCarServer) handler(w http.ResponseWriter, r *http.Request) { defer s.h.ConnManager().Unprotect(pid, tag) // Get a block info cache for the CarOffsetWriter - bic := s.bicm.Get(authVal.PayloadCid) - err = s.serveContent(w, r, authToken, authVal, bic) - s.bicm.Unref(authVal.PayloadCid, err) + _ = s.serveContent(w, r, authToken, authVal) } func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpError) { @@ -183,12 +174,43 @@ func (s *Libp2pCarServer) checkAuth(r *http.Request) (string, *AuthValue, *httpE return authToken, val, nil } -func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue, bic *car.BlockInfoCache) error { +func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, authToken string, val *AuthValue) error { ctx := r.Context() - // Create a CarOffsetWriter and a reader for it - cow := car.NewCarOffsetWriter(val.PayloadCid, s.bstore, bic) - content := car.NewCarReaderSeeker(ctx, cow, val.Size) + ls := cidlink.DefaultLinkSystem() + bsa := bsadapter.Adapter{Wrapped: s.bstore} + ls.SetReadStorage(&bsa) + + content, err := car.NewCarV1StreamReader( + ctx, + &ls, + val.PayloadCid, + selectorparse.CommonSelector_ExploreAllRecursively, + car.WithDataPayloadSize(val.Size), + ) + if err != nil { + return err + } + closer, _ := content.(io.Closer) + + cancelContent := func(parentCtx context.Context) (err error) { + cancelComplete := make(chan struct{}, 1) + go func() { + // calling a Seek() on the SkipWriterReaderSeeker from go-car will cancel + // any existing writer and block until that cancel is complete + if closeErr := closer.Close(); closeErr != nil && !errors.Is(closeErr, context.Canceled) { + // context.Canceled can come up through the http Request from a socket close, ignore it + err = closeErr + } + cancelComplete <- struct{}{} + }() + select { + case <-parentCtx.Done(): + return parentCtx.Err() + case <-cancelComplete: + } + return err + } // Set the Content-Type header explicitly so that http.ServeContent doesn't // try to do it implicitly @@ -202,12 +224,12 @@ func (s *Libp2pCarServer) serveContent(w http.ResponseWriter, r *http.Request, a } // Send the CAR file - return s.sendCar(r, w, val, authToken, content) + return s.sendCar(r, w, val, authToken, content, cancelContent) } -func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content *car.CarReaderSeeker) error { +func (s *Libp2pCarServer) sendCar(r *http.Request, w http.ResponseWriter, val *AuthValue, authToken string, content io.ReadSeeker, cancel func(context.Context) error) error { // Create transfer - xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, content) + xfer := newLibp2pTransfer(val, authToken, s.h.ID().String(), r.RemoteAddr, cancel) // Add transfer to the list of active transfers fireEvent, err := s.transfersMgr.add(xfer) @@ -571,7 +593,7 @@ type Libp2pTransfer struct { AuthToken string LocalAddr string RemoteAddr string - content *car.CarReaderSeeker + Cancel func(context.Context) error // indicates whether this transfer replaces a previous transfer with the // same id isRestart bool @@ -589,7 +611,7 @@ type Libp2pTransfer struct { eventsDrained chan struct{} } -func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, content *car.CarReaderSeeker) *Libp2pTransfer { +func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remoteAddr string, cancel func(context.Context) error) *Libp2pTransfer { return &Libp2pTransfer{ ID: val.ID, PayloadCid: val.PayloadCid, @@ -598,7 +620,7 @@ func newLibp2pTransfer(val *AuthValue, authToken string, localAddr string, remot AuthToken: authToken, LocalAddr: localAddr, RemoteAddr: remoteAddr, - content: content, + Cancel: cancel, status: types.TransferStatusStarted, eventsDrained: make(chan struct{}), } @@ -666,8 +688,7 @@ func (t *Libp2pTransfer) state() types.TransferState { // completed or errored out). func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, error) { // Cancel the read / write stream - err := t.content.Cancel(ctx) - if err != nil { + if err := t.Cancel(ctx); err != nil { return nil, err } @@ -684,10 +705,11 @@ func (t *Libp2pTransfer) cancel(ctx context.Context) (*types.TransferState, erro } func (t *Libp2pTransfer) close(ctx context.Context) { + t.closed = true // Cancel the read / write stream - go t.content.Cancel(ctx) //nolint:errcheck + go t.Cancel(ctx) //nolint:errcheck } // eventPublished is called when an event for this transfer is emitted