Skip to content

Commit

Permalink
feat: libp2p retrieval transports endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 22, 2022
1 parent cf9ae30 commit 08d341e
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 5 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car v0.4.1-0.20220707083113-89de8134e58e
github.com/ipld/go-car/v2 v2.4.2-0.20220707083113-89de8134e58e
github.com/ipld/go-ipld-prime v0.17.0
github.com/ipld/go-ipld-prime v0.18.0
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
Expand All @@ -83,7 +83,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-multihash v0.2.0
github.com/multiformats/go-varint v0.0.6
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/pressly/goose/v3 v3.5.3
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1022,8 +1022,9 @@ github.com/ipld/go-ipld-prime v0.14.1/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704n
github.com/ipld/go-ipld-prime v0.14.2/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.4-0.20211217152141-008fd70fc96f/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA=
github.com/ipld/go-ipld-prime v0.17.0 h1:+U2peiA3aQsE7mrXjD2nYZaZrCcakoz2Wge8K42Ld8g=
github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs=
github.com/ipld/go-ipld-prime v0.18.0 h1:xUk7NUBSWHEXdjiOu2sLXouFJOMs0yoYzeI5RAqhYQo=
github.com/ipld/go-ipld-prime v0.18.0/go.mod h1:735yXW548CKrLwVCYXzqx90p5deRJMVVxM9eJ4Qe+qE=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
Expand Down Expand Up @@ -1736,8 +1737,9 @@ github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUj
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg=
github.com/multiformats/go-multihash v0.0.16/go.mod h1:zhfEIgVnB/rPMfxgFw15ZmGoNaKyNUIE4IWHG/kC+Ag=
github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtOWcJT0q9+EA=
github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84=
github.com/multiformats/go-multihash v0.2.0 h1:oytJb9ZA1OUW0r0f9ea18GiaPOo4SXyc7p2movyUuo4=
github.com/multiformats/go-multihash v0.2.0/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc=
github.com/multiformats/go-multistream v0.0.1/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg=
github.com/multiformats/go-multistream v0.0.4/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg=
github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg=
Expand Down Expand Up @@ -2296,8 +2298,9 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
4 changes: 4 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/boost/node/impl/common"
"github.com/filecoin-project/boost/node/modules"
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/sealingpipeline"
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
Expand Down Expand Up @@ -137,6 +138,7 @@ const (
HandleMigrateProviderFundsKey
HandleDealsKey
HandleRetrievalKey
HandleRetrievalTransportsKey
RunSectorServiceKey

// boost should be started after legacy markets (HandleDealsKey)
Expand Down Expand Up @@ -514,6 +516,8 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(rmnet.RetrievalMarketNetwork), lotus_modules.RetrievalNetwork),
Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider),
Override(HandleRetrievalKey, lotus_modules.HandleRetrieval),
Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)),
Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports),
Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator),
Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)),

Expand Down
7 changes: 7 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ type DealmakingConfig struct {
MaxTransferDuration Duration
// Whether to do commp on the Boost node (local) or on the Sealer (remote)
RemoteCommp bool

// The public multi-address for retrieving deals with booster-http.
// Note: Must be in multiaddr format, eg /dns6/foo.com/tcp/80/http
HTTPRetrievalMultiAddr string
}

type FeeConfig struct {
Expand Down
61 changes: 61 additions & 0 deletions node/modules/retrieval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package modules

import (
"context"
"fmt"

"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/libp2p/go-libp2p-core/host"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
)

func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
protos := []types.Protocol{}

// Get the libp2p address from the Host
if len(h.Addrs()) > 0 {
// TODO: should this be a list of addresses instead?
protos = append(protos, types.Protocol{
Name: "libp2p",
Endpoint: h.Addrs()[0],
})
}

// If there's an http retrieval address specified, add HTTP to the list
// of supported protocols
if cfg.Dealmaking.HTTPRetrievalMultiAddr != "" {
maddr, err := multiaddr.NewMultiaddr(cfg.Dealmaking.HTTPRetrievalMultiAddr)
if err != nil {
msg := "HTTPRetrievalURL must be in multi-address format. "
msg += "Could not parse '%s' as multiaddr: %w"
return nil, fmt.Errorf(msg, cfg.Dealmaking.HTTPRetrievalMultiAddr, err)
}

protos = append(protos, types.Protocol{
Name: "http",
Endpoint: maddr,
})
}

return lp2pimpl.NewTransportsListener(h, protos), nil
}
}

func HandleRetrievalTransports(lc fx.Lifecycle, l lp2pimpl.TransportsListener) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
log.Debug("starting retrieval transports listener")
l.Start()
return nil
},
OnStop: func(context.Context) error {
log.Debug("stopping retrieval transports listener")
l.Stop()
return nil
},
})
}
120 changes: 120 additions & 0 deletions retrievalmarket/lp2pimpl/transports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package lp2pimpl

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/filecoin-project/go-fil-markets/shared"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

var clog = logging.Logger("boost:lp2p:tspt:client")
var slog = logging.Logger("boost:lp2p:tspt")

// TransportsProtocolID is the protocol for querying which retrieval transports
// the Storage Provider supports (http, libp2p, etc)
const TransportsProtocolID = protocol.ID("/fil/retrieval/transports/1.0.0")

// TransportsListener listens for incoming queries over libp2p
type TransportsListener struct {
host host.Host
protocols []types.Protocol
}

const streamReadDeadline = 10 * time.Second
const streamWriteDeadline = 10 * time.Second

// QueryClientOption is an option for configuring the libp2p storage deal client
type QueryClientOption func(*TransportsClient)

// RetryParameters changes the default parameters around connection reopening
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) QueryClientOption {
return func(c *TransportsClient) {
c.retryStream.SetOptions(shared.RetryParameters(minDuration, maxDuration, attempts, backoffFactor))
}
}

// TransportsClient sends retrieval queries over libp2p
type TransportsClient struct {
retryStream *shared.RetryStream
}

func NewTransportsClient(h host.Host, options ...QueryClientOption) *TransportsClient {
c := &TransportsClient{
retryStream: shared.NewRetryStream(h),
}
for _, option := range options {
option(c)
}
return c
}

// SendQuery sends a retrieval query over a libp2p stream to the peer
func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.QueryResponse, error) {
clog.Debugw("query", "provider-peer", id)

// Create a libp2p stream to the provider
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{TransportsProtocolID})
if err != nil {
return nil, err
}

defer s.Close() // nolint

// Set a deadline on reading from the stream so it doesn't hang
_ = s.SetReadDeadline(time.Now().Add(streamReadDeadline))
defer s.SetReadDeadline(time.Time{}) // nolint

// Read the response from the stream
queryResponsei, err := types.BindnodeRegistry.TypeFromReader(s, (*types.QueryResponse)(nil), dagcbor.Decode)
if err != nil {
return nil, fmt.Errorf("reading query response: %w", err)
}
queryResponse := queryResponsei.(*types.QueryResponse)

clog.Debugw("response", "provider-peer", id)

return queryResponse, nil
}

func NewTransportsListener(h host.Host, protos []types.Protocol) *TransportsListener {
return &TransportsListener{
host: h,
protocols: protos,
}
}

func (p *TransportsListener) Start() {
p.host.SetStreamHandler(TransportsProtocolID, p.handleNewQueryStream)
}

func (p *TransportsListener) Stop() {
p.host.RemoveStreamHandler(TransportsProtocolID)
}

// Called when the client opens a libp2p stream
func (l *TransportsListener) handleNewQueryStream(s network.Stream) {
defer s.Close()

slog.Debugw("query", "peer", s.Conn().RemotePeer())

response := types.QueryResponse{Protocols: l.protocols}

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(streamWriteDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

// Write the response to the client
err := types.BindnodeRegistry.TypeToWriter(response, s, dagcbor.Encode)
if err != nil {
slog.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err)
return
}
}
45 changes: 45 additions & 0 deletions retrievalmarket/types/transports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package types

import (
_ "embed"
"fmt"

"github.com/filecoin-project/go-address"
"github.com/ipld/go-ipld-prime/node/bindnode"
bindnoderegistry "github.com/ipld/go-ipld-prime/node/bindnode/registry"
"github.com/multiformats/go-multiaddr"
)

type Protocol struct {
// The name of the transport protocol eg "libp2p" or "http"
Name string
// The address of the endpoint in multiaddr format
Endpoint multiaddr.Multiaddr
}

type QueryResponse struct {
Protocols []Protocol
}

//go:embed transports.ipldsch
var embedSchema []byte

// MultiAddrBindnodeOption converts a filecoin Address type to and from a Bytes
// field in a schema
var MultiAddrBindnodeOption = bindnode.TypedBytesConverter(&address.Address{}, multiAddrFromBytes, multiAddrToBytes)

func multiAddrFromBytes(b []byte) (interface{}, error) {
return multiaddr.NewMultiaddrBytes(b)
}

func multiAddrToBytes(iface interface{}) ([]byte, error) {
var ma multiaddr.Multiaddr
ma, ok := iface.(multiaddr.Multiaddr)
if !ok {
return nil, fmt.Errorf("expected *Multiaddr value")
}

return ma.Bytes(), nil
}

var BindnodeRegistry = bindnoderegistry.NewRegistry()
13 changes: 13 additions & 0 deletions retrievalmarket/types/transports.ipldsch
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Defines the response to a query asking which transport protocols a
# Storage Provider supports

type Protocol struct {
# The name of the transport protocol eg "libp2p" or "http"
Name string
# The address of the endpoint in multiaddr format
Endpoint Multiaddr
}

type QueryResponse struct {
Protocols [Protocol]
}

0 comments on commit 08d341e

Please sign in to comment.