Skip to content

Commit

Permalink
gateway: Use long lived connections to peers
Browse files Browse the repository at this point in the history
Signed-off-by: Chance Zibolski <chance.zibolski@gmail.com>
  • Loading branch information
chancez committed Apr 28, 2024
1 parent 53809cf commit 7f13c03
Show file tree
Hide file tree
Showing 13 changed files with 1,318 additions and 26 deletions.
126 changes: 101 additions & 25 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"log/slog"
"net"
"slices"
"strconv"
"sync"
"time"

"github.com/chancez/capper/pkg/capture"
capperpb "github.com/chancez/capper/proto/capper"
mapset "github.com/deckarep/golang-set/v2"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -70,13 +70,31 @@ func runGateway(ctx context.Context, logger *slog.Logger, listen string, staticP
return fmt.Errorf("failed to listen: %w", err)
}

peerManager := NewPeerManager(logger.With("subsystem", "peer-manager"), peerDnsQuery, 5*time.Second)
pmLogger := logger.With("subsystem", "peer-manager")
peerManager := NewPeerManager(pmLogger, peerDnsQuery, 5*time.Second)
clientPool := NewPeerClientConnPool(logger.With("subsystem", "client-pool"), 5*time.Second)

peerManager.checkCallback = func(removed, added []string) {
for _, peer := range removed {
err := clientPool.Close(peer)
if err != nil {
pmLogger.Error("unable to closing connection to peer", "peer", peer, "err", err)
}
}
for _, peer := range added {
_, err := clientPool.Client(ctx, peer)
if err != nil {
pmLogger.Error("unable to connect to peer", "peer", peer, "err", err)
}
}
}

gatewaySrv := newGRPCServer(logger, &gateway{
clock: clockwork.NewRealClock(),
staticPeers: staticPeers,
log: logger,
connTimeout: 5 * time.Second,
peerManager: peerManager,
clientPool: clientPool,
})

eg, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -109,8 +127,8 @@ type gateway struct {
clock clockwork.Clock
log *slog.Logger
staticPeers []string
connTimeout time.Duration
peerManager *PeerManager
clientPool *PeerClientConnPool
}

func (s *gateway) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_CaptureServer) error {
Expand All @@ -123,29 +141,21 @@ func (s *gateway) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_C
peers = append(peers, dynamicPeers...)

if len(peers) == 0 {
s.log.Error("no peers found")
s.log.Error("no peers")
return status.Error(codes.Internal, "no peers")
} else {
s.log.Debug("found peers", "peers", peers)
}

s.log.Debug("found peers", "peers", peers)

var sources []capture.NamedPacketSource
var linkType layers.LinkType
for _, peer := range peers {
s.log.Debug("connecting to peer", "peer", peer)
connCtx := ctx
if s.connTimeout != 0 {
var connCancel context.CancelFunc
connCtx, connCancel = context.WithTimeout(ctx, s.connTimeout)
defer connCancel()
}
conn, err := grpc.DialContext(connCtx, peer, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := s.clientPool.Client(ctx, peer)
if err != nil {
return fmt.Errorf("error connecting to peer: %w", err)
return err
}
defer conn.Close()
c := capperpb.NewCapperClient(conn)

c := capperpb.NewCapperClient(conn)
s.log.Debug("starting stream", "peer", peer)
peerStream, err := c.Capture(ctx, req)
if err != nil {
Expand Down Expand Up @@ -202,14 +212,75 @@ func (s *gateway) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_C
return nil
}

type PeerClientConnPool struct {
log *slog.Logger
connTimeout time.Duration

mu sync.Mutex
clientConns map[string]*grpc.ClientConn
}

func NewPeerClientConnPool(log *slog.Logger, connTimeout time.Duration) *PeerClientConnPool {
return &PeerClientConnPool{
log: log,
connTimeout: connTimeout,
clientConns: make(map[string]*grpc.ClientConn),
}
}

func (pcm *PeerClientConnPool) Client(ctx context.Context, peer string) (*grpc.ClientConn, error) {
pcm.mu.Lock()
defer pcm.mu.Unlock()
conn, ok := pcm.clientConns[peer]
if !ok {
pcm.log.Debug("connecting to peer", "peer", peer)
connCtx := ctx
if pcm.connTimeout != 0 {
var connCancel context.CancelFunc
connCtx, connCancel = context.WithTimeout(ctx, pcm.connTimeout)
defer connCancel()
}
var err error
conn, err = grpc.DialContext(connCtx, peer, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("error connecting to peer: %w", err)
}
pcm.log.Info("established connection to peer", "peer", peer)
pcm.clientConns[peer] = conn
}
return conn, nil
}

func (pcm *PeerClientConnPool) Close(peer string) error {
pcm.mu.Lock()
defer pcm.mu.Unlock()
conn, ok := pcm.clientConns[peer]
if !ok {
return nil
}
pcm.log.Debug("closing connection to peer", "peer", peer)
return conn.Close()
}

func (pcm *PeerClientConnPool) CloseAll(peer string) error {
pcm.mu.Lock()
defer pcm.mu.Unlock()
var errs []error
for _, conn := range pcm.clientConns {
errs = append(errs, conn.Close())
}
return errors.Join(errs...)
}

type PeerManager struct {
clock clockwork.Clock
log *slog.Logger

peersMu sync.Mutex
peers []string
peers mapset.Set[string]

checkInterval time.Duration
checkCallback func(removed, added []string)
query string
}

Expand All @@ -219,6 +290,7 @@ func NewPeerManager(log *slog.Logger, query string, checkInterval time.Duration)
log: log,
query: query,
checkInterval: checkInterval,
peers: mapset.NewSet[string](),
}
}

Expand All @@ -230,20 +302,24 @@ func (pm *PeerManager) Start(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.Chan():
var peers []string
peers := mapset.NewSet[string]()
_, records, err := net.DefaultResolver.LookupSRV(ctx, "", "", pm.query)
if err != nil {
pm.log.Error("error resolving peers", "error", err)
continue
}
for _, srv := range records {
p := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
peers = append(peers, p)
peerAddr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
peers.Add(peerAddr)
}

if !slices.Equal(pm.peers, peers) {
added := peers.Difference(pm.peers)
removed := pm.peers.Difference(peers)

if added.Cardinality() != 0 || removed.Cardinality() != 0 {
pm.peersMu.Lock()
pm.log.Debug("updating peers list", "old", pm.peers, "new", peers)
pm.log.Debug("updating peers list", "added", added.ToSlice(), "removed", removed.ToSlice(), "peers", peers.ToSlice())
pm.checkCallback(removed.ToSlice(), added.ToSlice())
pm.peers = peers
pm.peersMu.Unlock()
}
Expand All @@ -254,5 +330,5 @@ func (pm *PeerManager) Start(ctx context.Context) error {
func (pm *PeerManager) Peers() []string {
pm.peersMu.Lock()
defer pm.peersMu.Unlock()
return pm.peers
return pm.peers.ToSlice()
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ go 1.22.2

require (
github.com/containerd/containerd v1.7.15
github.com/deckarep/golang-set/v2 v2.6.0
github.com/gopacket/gopacket v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/jonboulle/clockwork v0.4.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
google.golang.org/grpc v1.63.2
Expand Down Expand Up @@ -46,7 +48,6 @@ require (
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/github.com/deckarep/golang-set/v2/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/github.com/deckarep/golang-set/v2/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7f13c03

Please sign in to comment.