Skip to content

Commit

Permalink
feat: boost provider retrieval-transports CLI command
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 23, 2022
1 parent 7942c6e commit b32c4b7
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 34 deletions.
106 changes: 106 additions & 0 deletions cmd/boost/provider_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
"github.com/filecoin-project/boost/cli/ctxutil"
clinode "github.com/filecoin-project/boost/cli/node"
"github.com/filecoin-project/boost/cmd"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
// TODO: This multiaddr util library should probably live in its own repo
multiaddrutil "github.com/filecoin-project/go-legs/httpsync/multiaddr"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
)

Expand All @@ -30,6 +34,7 @@ var providerCmd = &cli.Command{
libp2pInfoCmd,
storageAskCmd,
retrievalAskCmd,
retrievalTransportsCmd,
},
}

Expand Down Expand Up @@ -294,3 +299,104 @@ var retrievalAskCmd = &cli.Command{
return nil
},
}

var retrievalTransportsCmd = &cli.Command{
Name: "retrieval-transports",
Usage: "Query a storage provider's available retrieval transports (libp2p, http, etc)",
ArgsUsage: "[provider]",
Action: func(cctx *cli.Context) error {
ctx := bcli.ReqContext(cctx)

afmt := NewAppFmt(cctx.App)
if cctx.NArg() != 1 {
afmt.Println("Usage: retrieval-transports [provider]")
return nil
}

n, err := clinode.Setup(cctx.String(cmd.FlagRepo.Name))
if err != nil {
return err
}

api, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return fmt.Errorf("cant setup gateway connection: %w", err)
}
defer closer()

maddr, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}

addrInfo, err := cmd.GetAddrInfo(ctx, api, maddr)
if err != nil {
return err
}

log.Debugw("found storage provider", "id", addrInfo.ID, "multiaddrs", addrInfo.Addrs, "addr", maddr)

if err := n.Host.Connect(ctx, *addrInfo); err != nil {
return fmt.Errorf("failed to connect to peer %s: %w", addrInfo.ID, err)
}

// Send the query to the Storage Provider
client := lp2pimpl.NewTransportsClient(n.Host)
resp, err := client.SendQuery(ctx, addrInfo.ID)

if cctx.Bool("json") {
type addr struct {
Multiaddr string `json:"multiaddr"`
Address string `json:"address,omitempty"`
}
json := make(map[string]interface{}, len(resp.Protocols))
for _, p := range resp.Protocols {
addrs := make([]addr, 0, len(p.Addresses))
for _, ma := range p.Addresses {
// Get the multiaddress, and also try to get the address
// in the protocol's native format (eg URL format for
// http protocol)
addrs = append(addrs, addr{
Multiaddr: ma.String(),
Address: multiaddrToNative(p.Name, ma),
})
}
json[p.Name] = addrs
}
return cmd.PrintJson(json)
}

if len(resp.Protocols) == 0 {
afmt.Println("No available retrieval protocols")
return nil
}
for _, p := range resp.Protocols {
afmt.Println(p.Name)
for _, a := range p.Addresses {
// Output the multiaddress
afmt.Println(" " + a.String())
// Try to get the address in the protocol's native format
// (eg URL format for http protocol)
nativeFmt := multiaddrToNative(p.Name, a)
if nativeFmt != "" {
afmt.Println(" " + nativeFmt)
}
}
}

return nil
},
}

func multiaddrToNative(proto string, ma multiaddr.Multiaddr) string {
switch proto {
case "http", "https":
u, err := multiaddrutil.ToURL(ma)
if err != nil {
return ""
}
return u.String()
}

return ""
}
45 changes: 45 additions & 0 deletions cmd/boost/provider_cmd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"testing"
)

func TestMultiaddrToNative(t *testing.T) {
testCases := []struct {
name string
proto string
ma string
expected string
}{{
name: "http",
proto: "http",
ma: "/dns/foo.com/http",
expected: "http://foo.com",
}, {
name: "http IP 4 address",
proto: "http",
ma: "/ip4/192.168.0.1/tcp/80/http",
expected: "http://192.168.0.1:80",
}, {
name: "https",
proto: "https",
ma: "/dns/foo.com/tcp/443/https",
expected: "https://foo.com:443",
}, {
name: "unknown protocol",
proto: "fancynewproto",
ma: "/dns/foo.com/tcp/80/http",
expected: "",
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ma, err := multiaddr.NewMultiaddr(tc.ma)
require.NoError(t, err)
res := multiaddrToNative(tc.proto, ma)
require.Equal(t, tc.expected, res)
})
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.23.2
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-legs v0.4.9 // indirect
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4
github.com/filecoin-project/go-state-types v0.1.10
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3G
github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo=
github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s=
github.com/filecoin-project/go-legs v0.4.9 h1:9ccbv5zDPqMviEpSpf0TdfKKI64TMYGSiuY2A1EXHFY=
github.com/filecoin-project/go-legs v0.4.9/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
Expand Down
4 changes: 2 additions & 2 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ type DealmakingConfig struct {
RemoteCommp bool

// The public multi-address for retrieving deals with booster-http.
// Note: Must be in multiaddr format, eg /dns6/foo.com/tcp/80/http
HTTPRetrievalMultiAddr string
// Note: Must be in multiaddr format, eg /dns/foo.com/tcp/443/https
HTTPRetrievalMultiaddr string
}

type FeeConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions node/modules/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.Trans

// If there's an http retrieval address specified, add HTTP to the list
// of supported protocols
if cfg.Dealmaking.HTTPRetrievalMultiAddr != "" {
maddr, err := multiaddr.NewMultiaddr(cfg.Dealmaking.HTTPRetrievalMultiAddr)
if cfg.Dealmaking.HTTPRetrievalMultiaddr != "" {
maddr, err := multiaddr.NewMultiaddr(cfg.Dealmaking.HTTPRetrievalMultiaddr)
if err != nil {
msg := "HTTPRetrievalURL must be in multi-address format. "
msg += "Could not parse '%s' as multiaddr: %w"
return nil, fmt.Errorf(msg, cfg.Dealmaking.HTTPRetrievalMultiAddr, err)
return nil, fmt.Errorf(msg, cfg.Dealmaking.HTTPRetrievalMultiaddr, err)
}

protos = append(protos, types.Protocol{
Expand Down
10 changes: 5 additions & 5 deletions retrievalmarket/lp2pimpl/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type TransportsListener struct {
protocols []types.Protocol
}

const streamReadDeadline = 10 * time.Second
const streamWriteDeadline = 10 * time.Second
const streamReadDeadline = 30 * time.Second
const streamWriteDeadline = 30 * time.Second

// QueryClientOption is an option for configuring the libp2p storage deal client
type QueryClientOption func(*TransportsClient)
Expand Down Expand Up @@ -58,7 +58,7 @@ func NewTransportsClient(h host.Host, options ...QueryClientOption) *TransportsC

// SendQuery sends a retrieval query over a libp2p stream to the peer
func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.QueryResponse, error) {
clog.Debugw("query", "provider-peer", id)
clog.Debugw("query", "peer", id)

// Create a libp2p stream to the provider
s, err := c.retryStream.OpenStream(ctx, id, []protocol.ID{TransportsProtocolID})
Expand All @@ -79,7 +79,7 @@ func (c *TransportsClient) SendQuery(ctx context.Context, id peer.ID) (*types.Qu
}
queryResponse := queryResponsei.(*types.QueryResponse)

clog.Debugw("response", "provider-peer", id)
clog.Debugw("response", "peer", id)

return queryResponse, nil
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (l *TransportsListener) handleNewQueryStream(s network.Stream) {
defer s.SetWriteDeadline(time.Time{}) // nolint

// Write the response to the client
err := types.BindnodeRegistry.TypeToWriter(response, s, dagcbor.Encode)
err := types.BindnodeRegistry.TypeToWriter(&response, s, dagcbor.Encode)
if err != nil {
slog.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err)
return
Expand Down
37 changes: 13 additions & 24 deletions retrievalmarket/types/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,30 @@ type QueryResponse struct {
var embedSchema []byte

func multiAddrFromBytes(b []byte) (interface{}, error) {
return multiaddr.NewMultiaddrBytes(b)
ma, err := multiaddr.NewMultiaddrBytes(b)
if err != nil {
return nil, err
}
return &ma, err
}

func multiAddrToBytes(iface interface{}) ([]byte, error) {
var ma multiaddr.Multiaddr
ma, ok := iface.(multiaddr.Multiaddr)
ma, ok := iface.(*multiaddr.Multiaddr)
if !ok {
return nil, fmt.Errorf("expected *Multiaddr value")
}

return ma.Bytes(), nil
}

// MultiAddrBindnodeOption converts a filecoin Multiaddr type to and from a Bytes
// field in a schema
var dummyMa multiaddr.Multiaddr
var MultiAddrBindnodeOption = bindnode.TypedBytesConverter(&dummyMa, multiAddrFromBytes, multiAddrToBytes)

var bindnodeOptions = []bindnode.Option{
MultiAddrBindnodeOption,
return (*ma).Bytes(), nil
}

var BindnodeRegistry = bindnoderegistry.NewRegistry()

func init() {
for _, r := range []struct {
typ interface{}
typName string
}{
{(*QueryResponse)(nil), "QueryResponse"},
{(*Protocol)(nil), "Protocol"},
{(multiaddr.Multiaddr)(nil), "Multiaddr"},
} {
if err := BindnodeRegistry.RegisterType(r.typ, string(embedSchema), r.typName, bindnodeOptions...); err != nil {
panic(err.Error())
}
var dummyMa multiaddr.Multiaddr
var bindnodeOptions = []bindnode.Option{
bindnode.TypedBytesConverter(&dummyMa, multiAddrFromBytes, multiAddrToBytes),
}
if err := BindnodeRegistry.RegisterType((*QueryResponse)(nil), string(embedSchema), "QueryResponse", bindnodeOptions...); err != nil {
panic(err.Error())
}
}
1 change: 1 addition & 0 deletions retrievalmarket/types/transports.ipldsch
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Defines the response to a query asking which transport protocols a
# Storage Provider supports
type Multiaddr bytes

type Protocol struct {
# The name of the transport protocol eg "libp2p" or "http"
Expand Down

0 comments on commit b32c4b7

Please sign in to comment.