Skip to content

Commit

Permalink
feat(kds): better error handling (#7868)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Sep 26, 2023
1 parent 1ba981d commit 3ab585c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/kds/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *server) Start(stop <-chan struct{}) error {
}
}

// StreamMessage handle Mux messages for KDS V1. It's not used in KDS V2
func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageServer) error {
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "GlobalToZoneSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
return errors.Wrap(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background")
clientLog.Error(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}

Expand All @@ -82,6 +83,7 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background")
clientLog.Error(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}
26 changes: 19 additions & 7 deletions pkg/kds/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"io"
"time"

"github.com/sethvargo/go-retry"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/api/system/v1alpha1"
Expand All @@ -20,6 +23,8 @@ import (
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
)

var log = core.Log.WithName("kds-service")

type GlobalKDSServiceServer struct {
envoyAdminRPCs EnvoyAdminRPCs
resManager manager.ResourceManager
Expand Down Expand Up @@ -67,30 +72,37 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
) error {
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
return status.Error(codes.InvalidArgument, err.Error())
}
clientID := ClientID(stream.Context(), zone)
core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "clientID", clientID)
logger := log.WithValues("rpc", rpcName, "clientID", clientID)
logger.Info("Envoy Admin RPC stream started")
rpc.ClientConnected(clientID, stream)
if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil {
return err
logger.Error(err, "could not store stream connection")
return status.Error(codes.Internal, "could not store stream connection")
}
defer func() {
rpc.ClientDisconnected(clientID)
// stream.Context() is cancelled here, we need to use another ctx
ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background())
if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil {
core.Log.Error(err, "could not clear stream connection information in ZoneInsight", "rpc", rpcName, "clientID", clientID, "rpc", rpcName)
logger.Error(err, "could not clear stream connection information in ZoneInsight")
}
}()
for {
resp, err := recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
logger.Error(err, "could not receive a message")
return status.Error(codes.Internal, "could not receive a message")
}
core.Log.V(1).Info("Envoy Admin RPC response received", "rpc", rpc, "clientID", clientID, "requestId", resp.GetRequestId())
logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId())
if err := rpc.ResponseReceived(clientID, resp); err != nil {
return err
logger.Error(err, "could not mark the response as received")
return status.Error(codes.InvalidArgument, "could not mark the response as received")
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/kds/v2/server/error_recorder_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"io"
"sync"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// ErrorRecorderStream is a DeltaStream that records an error
// We need this because go-control-plane@v0.11.1/pkg/server/delta/v3/server.go:190 swallows an error on Recv()
type ErrorRecorderStream interface {
stream.DeltaStream
Err() error
}

type errorRecorderStream struct {
stream.DeltaStream
err error
sync.Mutex
}

var _ stream.DeltaStream = &errorRecorderStream{}

func NewErrorRecorderStream(s stream.DeltaStream) ErrorRecorderStream {
return &errorRecorderStream{
DeltaStream: s,
}
}

func (e *errorRecorderStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) {
res, err := e.DeltaStream.Recv()
if err != nil && err != io.EOF { // do not consider "end of stream" an error
e.Lock()
e.err = err
e.Unlock()
}
return res, err
}

func (e *errorRecorderStream) Err() error {
e.Lock()
defer e.Unlock()
return e.err
}
14 changes: 12 additions & 2 deletions pkg/kds/v2/server/kds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ type server struct {
}

func (s *server) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
return s.Server.DeltaStreamHandler(stream, "")
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}

// Delta xDS server expects `KDSSyncService_ZoneToGlobalSyncServer` to have Send(*v3.DeltaDiscoveryResponse)
Expand All @@ -45,5 +50,10 @@ func (s *server) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalS
// on zone while kds.proto has different definition of `KDSSyncService_ZoneToGlobalSyncServer` then
// expected by delta xDS server.
func (s *server) ZoneToGlobal(stream stream.DeltaStream) error {
return s.Server.DeltaStreamHandler(stream, "")
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}
1 change: 1 addition & 0 deletions pkg/kds/v2/server/streamwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type serverStream struct {
stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient
}

// NewServerStream converts client stream to a server's DeltaStream, so it can be used in DeltaStreamHandler
func NewServerStream(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) ServerStream {
s := &serverStream{
stream: stream,
Expand Down

0 comments on commit 3ab585c

Please sign in to comment.