diff --git a/go.mod b/go.mod index 5235a5e83..ff8acfbca 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/ipfs/go-unixfs v0.3.1 github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e - github.com/ipld/go-ipld-prime v0.17.0 + github.com/ipld/go-ipld-prime v0.18.0 github.com/ipld/go-ipld-selector-text-lite v0.0.1 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 @@ -83,7 +83,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multibase v0.0.3 - github.com/multiformats/go-multihash v0.1.0 + github.com/multiformats/go-multihash v0.2.0 github.com/multiformats/go-varint v0.0.6 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pressly/goose/v3 v3.5.3 diff --git a/go.sum b/go.sum index bceca945c..b8392c1cb 100644 --- a/go.sum +++ b/go.sum @@ -1022,8 +1022,9 @@ github.com/ipld/go-ipld-prime v0.14.1/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704n github.com/ipld/go-ipld-prime v0.14.2/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= github.com/ipld/go-ipld-prime v0.14.4-0.20211217152141-008fd70fc96f/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA= -github.com/ipld/go-ipld-prime v0.17.0 h1:+U2peiA3aQsE7mrXjD2nYZaZrCcakoz2Wge8K42Ld8g= github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs= +github.com/ipld/go-ipld-prime v0.18.0 h1:xUk7NUBSWHEXdjiOu2sLXouFJOMs0yoYzeI5RAqhYQo= +github.com/ipld/go-ipld-prime v0.18.0/go.mod h1:735yXW548CKrLwVCYXzqx90p5deRJMVVxM9eJ4Qe+qE= github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= @@ -1736,8 +1737,9 @@ github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUj github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg= github.com/multiformats/go-multihash v0.0.16/go.mod h1:zhfEIgVnB/rPMfxgFw15ZmGoNaKyNUIE4IWHG/kC+Ag= -github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtOWcJT0q9+EA= github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84= +github.com/multiformats/go-multihash v0.2.0 h1:oytJb9ZA1OUW0r0f9ea18GiaPOo4SXyc7p2movyUuo4= +github.com/multiformats/go-multihash v0.2.0/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= github.com/multiformats/go-multistream v0.0.1/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/multiformats/go-multistream v0.0.4/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg= @@ -2296,8 +2298,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA= golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= +golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/node/builder.go b/node/builder.go index 8712c3abe..abe8243d1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/boost/node/impl/common" "github.com/filecoin-project/boost/node/modules" "github.com/filecoin-project/boost/node/modules/dtypes" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" "github.com/filecoin-project/boost/sealingpipeline" "github.com/filecoin-project/boost/storagemanager" "github.com/filecoin-project/boost/storagemarket" @@ -137,6 +138,7 @@ const ( HandleMigrateProviderFundsKey HandleDealsKey HandleRetrievalKey + HandleRetrievalTransportsKey RunSectorServiceKey // boost should be started after legacy markets (HandleDealsKey) @@ -514,6 +516,8 @@ func ConfigBoost(cfg *config.Boost) Option { Override(new(rmnet.RetrievalMarketNetwork), lotus_modules.RetrievalNetwork), Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider), Override(HandleRetrievalKey, lotus_modules.HandleRetrieval), + Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)), + Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports), Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator), Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index cdddee232..f6ab107fe 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -239,6 +239,13 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f Comment: `Whether to do commp on the Boost node (local) or on the Sealer (remote)`, }, + { + Name: "HTTPRetrievalMultiAddr", + Type: "string", + + Comment: `The public multi-address for retrieving deals with booster-http. +Note: Must be in multiaddr format, eg /dns6/foo.com/tcp/80/http`, + }, }, "FeeConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 9b0c6a1b5..828f0c7e3 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -169,6 +169,10 @@ type DealmakingConfig struct { MaxTransferDuration Duration // Whether to do commp on the Boost node (local) or on the Sealer (remote) RemoteCommp bool + + // The public multi-address for retrieving deals with booster-http. + // Note: Must be in multiaddr format, eg /dns6/foo.com/tcp/80/http + HTTPRetrievalMultiAddr string } type FeeConfig struct { diff --git a/node/modules/retrieval.go b/node/modules/retrieval.go new file mode 100644 index 000000000..e581e8041 --- /dev/null +++ b/node/modules/retrieval.go @@ -0,0 +1,61 @@ +package modules + +import ( + "context" + "fmt" + + "github.com/filecoin-project/boost/node/config" + "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" + "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/libp2p/go-libp2p-core/host" + "github.com/multiformats/go-multiaddr" + "go.uber.org/fx" +) + +func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) { + return func(h host.Host) (*lp2pimpl.TransportsListener, error) { + protos := []types.Protocol{} + + // Get the libp2p address from the Host + if len(h.Addrs()) > 0 { + // TODO: should this be a list of addresses instead? + protos = append(protos, types.Protocol{ + Name: "libp2p", + Endpoint: h.Addrs()[0], + }) + } + + // If there's an http retrieval address specified, add HTTP to the list + // of supported protocols + if cfg.Dealmaking.HTTPRetrievalMultiAddr != "" { + maddr, err := multiaddr.NewMultiaddr(cfg.Dealmaking.HTTPRetrievalMultiAddr) + if err != nil { + msg := "HTTPRetrievalURL must be in multi-address format. " + msg += "Could not parse '%s' as multiaddr: %w" + return nil, fmt.Errorf(msg, cfg.Dealmaking.HTTPRetrievalMultiAddr, err) + } + + protos = append(protos, types.Protocol{ + Name: "http", + Endpoint: maddr, + }) + } + + return lp2pimpl.NewTransportsListener(h, protos), nil + } +} + +func HandleRetrievalTransports(lc fx.Lifecycle, l lp2pimpl.TransportsListener) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + log.Debug("starting retrieval transports listener") + l.Start() + return nil + }, + OnStop: func(context.Context) error { + log.Debug("stopping retrieval transports listener") + l.Stop() + return nil + }, + }) +} diff --git a/retrievalmarket/lp2pimpl/transports.go b/retrievalmarket/lp2pimpl/transports.go new file mode 100644 index 000000000..c7e28ccbc --- /dev/null +++ b/retrievalmarket/lp2pimpl/transports.go @@ -0,0 +1,120 @@ +package lp2pimpl + +import ( + "context" + "fmt" + "time" + + "github.com/filecoin-project/boost/retrievalmarket/types" + "github.com/filecoin-project/go-fil-markets/shared" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +var clog = logging.Logger("boost:lp2p:tspt:client") +var slog = logging.Logger("boost:lp2p:tspt") + +// TransportsProtocolID is the protocol for querying which retrieval transports +// the Storage Provider supports (http, libp2p, etc) +const TransportsProtocolID = protocol.ID("/fil/retrieval/transports/1.0.0") + +// TransportsListener listens for incoming queries over libp2p +type TransportsListener struct { + host host.Host + protocols []types.Protocol +} + +const streamReadDeadline = 10 * time.Second +const streamWriteDeadline = 10 * time.Second + +// QueryClientOption is an option for configuring the libp2p storage deal client +type QueryClientOption func(*TransportsClient) + +// RetryParameters changes the default parameters around connection reopening +func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) QueryClientOption { + return func(c *TransportsClient) { + c.retryStream.SetOptions(shared.RetryParameters(minDuration, maxDuration, attempts, backoffFactor)) + } +} + +// TransportsClient sends retrieval queries over libp2p +type TransportsClient struct { + retryStream *shared.RetryStream +} + +func NewTransportsClient(h host.Host, options ...QueryClientOption) *TransportsClient { + c := &TransportsClient{ + retryStream: shared.NewRetryStream(h), + } + for _, option := range options { + option(c) + } + return c +} + +// SendQuery sends a retrieval query over a libp2p stream to the peer +func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.QueryResponse, error) { + clog.Debugw("query", "provider-peer", id) + + // Create a libp2p stream to the provider + s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{TransportsProtocolID}) + if err != nil { + return nil, err + } + + defer s.Close() // nolint + + // Set a deadline on reading from the stream so it doesn't hang + _ = s.SetReadDeadline(time.Now().Add(streamReadDeadline)) + defer s.SetReadDeadline(time.Time{}) // nolint + + // Read the response from the stream + queryResponsei, err := types.BindnodeRegistry.TypeFromReader(s, (*types.QueryResponse)(nil), dagcbor.Decode) + if err != nil { + return nil, fmt.Errorf("reading query response: %w", err) + } + queryResponse := queryResponsei.(*types.QueryResponse) + + clog.Debugw("response", "provider-peer", id) + + return queryResponse, nil +} + +func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsListener { + return &TransportsListener{ + host: h, + protocols: protos, + } +} + +func (p *TransportsListener) Start() { + p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream) +} + +func (p *TransportsListener) Stop() { + p.host.RemoveStreamHandler(TransportsProtocolID) +} + +// Called when the client opens a libp2p stream +func (l *TransportsListener) handleNewQueryStream(s network.Stream) { + defer s.Close() + + slog.Debugw("query", "peer", s.Conn().RemotePeer()) + + response := types.QueryResponse{Protocols: l.protocols} + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(streamWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + // Write the response to the client + err := types.BindnodeRegistry.TypeToWriter(response, s, dagcbor.Encode) + if err != nil { + slog.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err) + return + } +} diff --git a/retrievalmarket/types/transports.go b/retrievalmarket/types/transports.go new file mode 100644 index 000000000..035caf898 --- /dev/null +++ b/retrievalmarket/types/transports.go @@ -0,0 +1,45 @@ +package types + +import ( + _ "embed" + "fmt" + + "github.com/filecoin-project/go-address" + "github.com/ipld/go-ipld-prime/node/bindnode" + bindnoderegistry "github.com/ipld/go-ipld-prime/node/bindnode/registry" + "github.com/multiformats/go-multiaddr" +) + +type Protocol struct { + // The name of the transport protocol eg "libp2p" or "http" + Name string + // The address of the endpoint in multiaddr format + Endpoint multiaddr.Multiaddr +} + +type QueryResponse struct { + Protocols []Protocol +} + +//go:embed transports.ipldsch +var embedSchema []byte + +// MultiAddrBindnodeOption converts a filecoin Address type to and from a Bytes +// field in a schema +var MultiAddrBindnodeOption = bindnode.TypedBytesConverter(&address.Address{}, multiAddrFromBytes, multiAddrToBytes) + +func multiAddrFromBytes(b []byte) (interface{}, error) { + return multiaddr.NewMultiaddrBytes(b) +} + +func multiAddrToBytes(iface interface{}) ([]byte, error) { + var ma multiaddr.Multiaddr + ma, ok := iface.(multiaddr.Multiaddr) + if !ok { + return nil, fmt.Errorf("expected *Multiaddr value") + } + + return ma.Bytes(), nil +} + +var BindnodeRegistry = bindnoderegistry.NewRegistry() diff --git a/retrievalmarket/types/transports.ipldsch b/retrievalmarket/types/transports.ipldsch new file mode 100644 index 000000000..961c9312d --- /dev/null +++ b/retrievalmarket/types/transports.ipldsch @@ -0,0 +1,13 @@ +# Defines the response to a query asking which transport protocols a +# Storage Provider supports + +type Protocol struct { + # The name of the transport protocol eg "libp2p" or "http" + Name string + # The address of the endpoint in multiaddr format + Endpoint Multiaddr +} + +type QueryResponse struct { + Protocols [Protocol] +}