Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keep connection between booster-bitswap and proxy alive #867

Merged
merged 4 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 30 additions & 42 deletions cmd/booster-bitswap/init.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -17,10 +15,11 @@ import (
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
)

func configureRepo(ctx context.Context, cfgDir string, createIfNotExist bool) (peer.ID, crypto.PrivKey, error) {
func configureRepo(cfgDir string, createIfNotExist bool) (peer.ID, crypto.PrivKey, error) {
if cfgDir == "" {
return "", nil, fmt.Errorf("%s is a required flag", FlagRepo.Name)
}
Expand All @@ -42,8 +41,8 @@ func configureRepo(ctx context.Context, cfgDir string, createIfNotExist bool) (p
return selfPid, peerkey, nil
}

func setupHost(ctx context.Context, cfgDir string, port int) (host.Host, error) {
_, peerKey, err := configureRepo(ctx, cfgDir, false)
func setupHost(cfgDir string, port int) (host.Host, error) {
_, peerKey, err := configureRepo(cfgDir, false)
if err != nil {
return nil, err
}
Expand All @@ -62,46 +61,35 @@ func setupHost(ctx context.Context, cfgDir string, port int) (host.Host, error)
}

func loadPeerKey(cfgDir string, createIfNotExists bool) (crypto.PrivKey, error) {
var peerkey crypto.PrivKey
keyPath := filepath.Join(cfgDir, "libp2p.key")
keyFile, err := os.ReadFile(keyPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
if !createIfNotExists {
return nil, fmt.Errorf("booster-bitswap has not been initialized. Run the booster-bitswap init command")
}
log.Infof("Generating new peer key...")

key, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
peerkey = key
if err == nil {
return crypto.UnmarshalPrivateKey(keyFile)
}

data, err := crypto.MarshalPrivateKey(key)
if err != nil {
return nil, err
}
if !os.IsNotExist(err) {
return nil, err
}
if !createIfNotExists {
return nil, fmt.Errorf("booster-bitswap has not been initialized. Run the booster-bitswap init command")
}
log.Infof("Generating new peer key...")

if err := os.WriteFile(keyPath, data, 0600); err != nil {
return nil, err
}
} else {
key, err := crypto.UnmarshalPrivateKey(keyFile)
if err != nil {
return nil, err
}
key, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}

peerkey = key
data, err := crypto.MarshalPrivateKey(key)
if err != nil {
return nil, err
}

if peerkey == nil {
panic("sanity check: peer key is uninitialized")
if err := os.WriteFile(keyPath, data, 0600); err != nil {
return nil, err
}

return peerkey, nil
return key, nil
}

var initCmd = &cli.Command{
Expand All @@ -110,13 +98,13 @@ var initCmd = &cli.Command{
Before: before,
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
repoDir, err := homedir.Expand(cctx.String(FlagRepo.Name))
if err != nil {
return fmt.Errorf("expanding repo file path: %w", err)
}

ctx := lcli.ReqContext(cctx)

repoDir := cctx.String(FlagRepo.Name)

peerID, _, err := configureRepo(ctx, repoDir, true)
fmt.Println(peerID)
peerID, _, err := configureRepo(repoDir, true)
fmt.Println("Initialized booster-bitswap with libp2p peer ID: " + peerID)
return err
},
}
2 changes: 1 addition & 1 deletion cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var FlagRepo = &cli.StringFlag{
Name: "repo",
Usage: "repo directory for Booster bitswap",
Value: "~/.booster-bitswap",
EnvVars: []string{"BOOST_BITSWAP_REPO"},
EnvVars: []string{"BOOSTER_BITSWAP_REPO"},
}

func main() {
Expand Down
22 changes: 16 additions & 6 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boost/tracing"
"github.com/filecoin-project/go-jsonrpc"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)

Expand All @@ -37,6 +38,10 @@ var runCmd = &cli.Command{
Usage: "the endpoint for the boost API",
Required: true,
},
&cli.StringFlag{
Name: "proxy",
Usage: "the multiaddr of the libp2p proxy that this node connects through",
},
&cli.BoolFlag{
Name: "tracing",
Usage: "enables tracing of booster-bitswap calls",
Expand Down Expand Up @@ -85,26 +90,31 @@ var runCmd = &cli.Command{
// Create the server API
port := cctx.Int("port")
repoDir := cctx.String(FlagRepo.Name)
host, err := setupHost(ctx, repoDir, port)
host, err := setupHost(repoDir, port)
if err != nil {
return fmt.Errorf("setting up libp2p host: %w", err)
}
// Start the server

// Create the bitswap server
blockFilter := blockfilter.NewBlockFilter(repoDir)
err = blockFilter.Start(ctx)
if err != nil {
return fmt.Errorf("starting block filter: %w", err)
}
server := NewBitswapServer(remoteStore, host, blockFilter)

addrs, err := bapi.NetAddrsListen(ctx)
if err != nil {
return fmt.Errorf("getting boost API addrs: %w", err)
var proxyAddrInfo *peer.AddrInfo
if cctx.IsSet("proxy") {
proxy := cctx.String("proxy")
proxyAddrInfo, err = peer.AddrInfoFromString(proxy)
if err != nil {
return fmt.Errorf("parsing proxy multiaddr %s: %w", proxy, err)
}
}

// Start the bitswap server
log.Infof("Starting booster-bitswap node on port %d", port)
err = server.Start(ctx, addrs)
err = server.Start(ctx, proxyAddrInfo)
if err != nil {
return err
}
Expand Down
60 changes: 53 additions & 7 deletions cmd/booster-bitswap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/boost/protocolproxy"
bsnetwork "github.com/ipfs/go-bitswap/network"
Expand All @@ -22,6 +24,7 @@ type BitswapServer struct {
blockFilter BlockFilter
ctx context.Context
cancel context.CancelFunc
proxy *peer.AddrInfo
server *server.Server
host host.Host
}
Expand All @@ -30,16 +33,28 @@ func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host, blockFi
return &BitswapServer{remoteStore: remoteStore, host: host, blockFilter: blockFilter}
}

func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error {
const protectTag = "bitswap-server-to-proxy"

func (s *BitswapServer) Start(ctx context.Context, proxy *peer.AddrInfo) error {
s.ctx, s.cancel = context.WithCancel(ctx)
s.proxy = proxy

host, err := protocolproxy.NewForwardingHost(ctx, s.host, balancer)
if err != nil {
return err
host := s.host
if proxy != nil {
// If there's a proxy host, connect to the proxy over libp2p
log.Infow("connecting to proxy", "proxy", proxy)
err := s.host.Connect(s.ctx, *proxy)
if err != nil {
return fmt.Errorf("connecting to proxy %s: %w", proxy, err)
}
s.host.ConnManager().Protect(proxy.ID, protectTag)

// Create a forwarding host that registers routes with the proxy
host = protocolproxy.NewForwardingHost(s.host, *proxy)
}

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
// Start a bitswap server on the provider
nilRouter, err := nilrouting.ConstructNilRouting(s.ctx, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -50,14 +65,45 @@ func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error
return !filtered && err == nil
})}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
s.server = server.New(ctx, net, s.remoteStore, bsopts...)
s.server = server.New(s.ctx, net, s.remoteStore, bsopts...)
net.Start(s.server)

log.Infow("bitswap server running", "multiaddrs", host.Addrs(), "peerId", host.ID())
if proxy != nil {
go s.keepProxyConnectionAlive(s.ctx, *proxy)
log.Infow("with proxy", "multiaddrs", proxy.Addrs, "peerId", proxy.ID)
}

return nil
}

func (s *BitswapServer) Stop() error {
if s.proxy != nil {
s.host.ConnManager().Unprotect(s.proxy.ID, protectTag)
}
s.cancel()
return s.server.Close()
}

func (s *BitswapServer) keepProxyConnectionAlive(ctx context.Context, proxy peer.AddrInfo) {
// Periodically ensure that the connection over libp2p to the proxy is alive
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

connected := true
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.host.Connect(ctx, proxy)
if err != nil {
connected = false
log.Warnw("failed to connect to proxy", "address", proxy)
} else if !connected {
log.Infow("reconnected to proxy", "address", proxy)
connected = true
}
}
}
}
4 changes: 2 additions & 2 deletions docker/devnet/.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DOCKER_USER=filecoin
LOTUS_IMAGE=${DOCKER_USER}/lotus-dev:1.17.1-rc2
LOTUS_MINER_IMAGE=${DOCKER_USER}/lotus-miner-dev:1.17.1-rc2
LOTUS_IMAGE=${DOCKER_USER}/lotus-dev:1.17.2-rc2
LOTUS_MINER_IMAGE=${DOCKER_USER}/lotus-miner-dev:1.17.2-rc2
BOOST_IMAGE=${DOCKER_USER}/boost-dev:dev
BOOSTER_HTTP_IMAGE=${DOCKER_USER}/booster-http-dev:dev
BOOSTER_BITSWAP_IMAGE=${DOCKER_USER}/booster-bitswap-dev:dev
Expand Down
Loading