Skip to content

Commit

Permalink
feat: keep connection between booster-bitswap and proxy alive
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Oct 4, 2022
1 parent e33ed18 commit 0ed7213
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
48 changes: 44 additions & 4 deletions cmd/booster-bitswap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/boost/protocolproxy"
bsnetwork "github.com/ipfs/go-bitswap/network"
Expand All @@ -22,6 +24,7 @@ type BitswapServer struct {
blockFilter BlockFilter
ctx context.Context
cancel context.CancelFunc
proxy peer.AddrInfo
server *server.Server
host host.Host
}
Expand All @@ -30,16 +33,27 @@ func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host, blockFi
return &BitswapServer{remoteStore: remoteStore, host: host, blockFilter: blockFilter}
}

func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error {
const protectTag = "bitswap-server-to-proxy"

func (s *BitswapServer) Start(ctx context.Context, proxy peer.AddrInfo) error {
s.ctx, s.cancel = context.WithCancel(ctx)
s.proxy = proxy

host, err := protocolproxy.NewForwardingHost(ctx, s.host, balancer)
// Connect to the proxy over libp2p
log.Infow("connecting to proxy", "proxy", proxy)
err := s.host.Connect(s.ctx, proxy)
if err != nil {
return fmt.Errorf("connecting to proxy %s: %w", proxy, err)
}
s.host.ConnManager().Protect(proxy.ID, protectTag)

host, err := protocolproxy.NewForwardingHost(s.host, proxy)
if err != nil {
return err
}

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
nilRouter, err := nilrouting.ConstructNilRouting(s.ctx, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -50,14 +64,40 @@ func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error
return !filtered && err == nil
})}
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
s.server = server.New(ctx, net, s.remoteStore, bsopts...)
s.server = server.New(s.ctx, net, s.remoteStore, bsopts...)
net.Start(s.server)

go s.keepProxyConnectionAlive(s.ctx, proxy)

log.Infow("bitswap server running", "multiaddrs", host.Addrs(), "peerId", host.ID())
return nil
}

func (s *BitswapServer) Stop() error {
s.host.ConnManager().Unprotect(s.proxy.ID, protectTag)
s.cancel()
return s.server.Close()
}

func (s *BitswapServer) keepProxyConnectionAlive(ctx context.Context, proxy peer.AddrInfo) {
// Periodically ensure that the connection over libp2p to the proxy is alive
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

connected := true
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := s.host.Connect(ctx, proxy)
if err != nil {
connected = false
log.Warnw("failed to connect to proxy", "address", proxy)
} else if !connected {
log.Infow("reconnected to proxy", "address", proxy)
connected = true
}
}
}
}
6 changes: 1 addition & 5 deletions protocolproxy/forwardinghost.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ type ForwardingHost struct {
// NewForwardingHost node constructs a service node connected to the given proxy on the passed
// in host. A forwarding host behaves exactly like a host.Host but setting up new protocol handlers
// registers routes on the proxy node.
func NewForwardingHost(ctx context.Context, h host.Host, proxy peer.AddrInfo) (host.Host, error) {
err := h.Connect(ctx, proxy)
if err != nil {
return nil, err
}
func NewForwardingHost(h host.Host, proxy peer.AddrInfo) (host.Host, error) {
fh := &ForwardingHost{
Host: h,
proxy: proxy.ID,
Expand Down
4 changes: 2 additions & 2 deletions protocolproxy/forwardinghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSetStreamHandler(t *testing.T) {
tn := setupTestNet(ctx, t, peers)

// setup a mock load balancer for routing
fh, err := NewForwardingHost(ctx, tn.serviceNode, peer.AddrInfo{
fh, err := NewForwardingHost(tn.serviceNode, peer.AddrInfo{
ID: peers.proxyNode.id,
Addrs: []multiaddr.Multiaddr{peers.proxyNode.multiAddr},
})
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestNewStream(t *testing.T) {
require.NoError(t, err)
}
})
fh, err := NewForwardingHost(ctx, tn.serviceNode, peer.AddrInfo{
fh, err := NewForwardingHost(tn.serviceNode, peer.AddrInfo{
ID: peers.proxyNode.id,
Addrs: []multiaddr.Multiaddr{peers.proxyNode.multiAddr},
})
Expand Down
7 changes: 6 additions & 1 deletion protocolproxy/protocolproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ func (pp *ProtocolProxy) Start(ctx context.Context) {
log.Infof("started protocol proxy with protocols:\n%s", msg)
}

const routedHostTag = "protocol-proxy-routed-host"

func (pp *ProtocolProxy) Close() {
pp.h.RemoveStreamHandler(ForwardingProtocolID)
for id := range pp.supportedProtocols {
for id, pid := range pp.supportedProtocols {
pp.h.RemoveStreamHandler(id)
pp.h.ConnManager().Unprotect(pid, routedHostTag)
}
pp.activeRoutesLk.Lock()
pp.activeRoutes = map[protocol.ID]peer.ID{}
Expand Down Expand Up @@ -97,6 +100,8 @@ func (pp *ProtocolProxy) Connected(n network.Network, c network.Conn) {
for _, id := range protocols {
pp.activeRoutes[id] = p
}

pp.h.ConnManager().Protect(p, routedHostTag)
}

// Disconnected checks the peersConfig and removes listening when a service node disconnects
Expand Down

0 comments on commit 0ed7213

Please sign in to comment.