Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): better error handling (backport of #7868) #7877

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kds/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *server) Start(stop <-chan struct{}) error {
}
}

// StreamMessage handle Mux messages for KDS V1. It's not used in KDS V2
func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageServer) error {
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "GlobalToZoneSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
return errors.Wrap(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background")
clientLog.Error(err, "GlobalToZoneSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}

Expand All @@ -82,6 +83,7 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService
if status.Code(err) == codes.Unimplemented {
return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.")
}
return errors.Wrap(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background")
clientLog.Error(err, "ZoneToGlobalSync rpc stream failed prematurely, will restart in background")
return status.Error(codes.Internal, "stream failed")
}
}
26 changes: 19 additions & 7 deletions pkg/kds/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"io"
"time"

"github.com/sethvargo/go-retry"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/api/system/v1alpha1"
Expand All @@ -20,6 +23,8 @@ import (
util_grpc "github.com/kumahq/kuma/pkg/util/grpc"
)

var log = core.Log.WithName("kds-service")

type GlobalKDSServiceServer struct {
envoyAdminRPCs EnvoyAdminRPCs
resManager manager.ResourceManager
Expand Down Expand Up @@ -67,30 +72,37 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC(
) error {
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
return status.Error(codes.InvalidArgument, err.Error())
}
clientID := ClientID(stream.Context(), zone)
core.Log.Info("Envoy Admin RPC stream started", "rpc", rpcName, "clientID", clientID)
logger := log.WithValues("rpc", rpcName, "clientID", clientID)
logger.Info("Envoy Admin RPC stream started")
rpc.ClientConnected(clientID, stream)
if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil {
return err
logger.Error(err, "could not store stream connection")
return status.Error(codes.Internal, "could not store stream connection")
}
defer func() {
rpc.ClientDisconnected(clientID)
// stream.Context() is cancelled here, we need to use another ctx
ctx := multitenant.CopyIntoCtx(stream.Context(), context.Background())
if err := g.storeStreamConnection(ctx, zone, rpcName, ""); err != nil {
core.Log.Error(err, "could not clear stream connection information in ZoneInsight", "rpc", rpcName, "clientID", clientID, "rpc", rpcName)
logger.Error(err, "could not clear stream connection information in ZoneInsight")
}
}()
for {
resp, err := recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
logger.Error(err, "could not receive a message")
return status.Error(codes.Internal, "could not receive a message")
}
core.Log.V(1).Info("Envoy Admin RPC response received", "rpc", rpc, "clientID", clientID, "requestId", resp.GetRequestId())
logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId())
if err := rpc.ResponseReceived(clientID, resp); err != nil {
return err
logger.Error(err, "could not mark the response as received")
return status.Error(codes.InvalidArgument, "could not mark the response as received")
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/kds/v2/server/error_recorder_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"io"
"sync"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// ErrorRecorderStream is a DeltaStream that records an error
// We need this because go-control-plane@v0.11.1/pkg/server/delta/v3/server.go:190 swallows an error on Recv()
type ErrorRecorderStream interface {
stream.DeltaStream
Err() error
}

type errorRecorderStream struct {
stream.DeltaStream
err error
sync.Mutex
}

var _ stream.DeltaStream = &errorRecorderStream{}

func NewErrorRecorderStream(s stream.DeltaStream) ErrorRecorderStream {
return &errorRecorderStream{
DeltaStream: s,
}
}

func (e *errorRecorderStream) Recv() (*envoy_sd.DeltaDiscoveryRequest, error) {
res, err := e.DeltaStream.Recv()
if err != nil && err != io.EOF { // do not consider "end of stream" an error
e.Lock()
e.err = err
e.Unlock()
}
return res, err
}

func (e *errorRecorderStream) Err() error {
e.Lock()
defer e.Unlock()
return e.err
}
14 changes: 12 additions & 2 deletions pkg/kds/v2/server/kds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ type server struct {
}

func (s *server) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
return s.Server.DeltaStreamHandler(stream, "")
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}

// Delta xDS server expects `KDSSyncService_ZoneToGlobalSyncServer` to have Send(*v3.DeltaDiscoveryResponse)
Expand All @@ -45,5 +50,10 @@ func (s *server) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalS
// on zone while kds.proto has different definition of `KDSSyncService_ZoneToGlobalSyncServer` then
// expected by delta xDS server.
func (s *server) ZoneToGlobal(stream stream.DeltaStream) error {
return s.Server.DeltaStreamHandler(stream, "")
errorStream := NewErrorRecorderStream(stream)
err := s.Server.DeltaStreamHandler(errorStream, "")
if err == nil {
err = errorStream.Err()
}
return err
}
1 change: 1 addition & 0 deletions pkg/kds/v2/server/streamwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type serverStream struct {
stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient
}

// NewServerStream converts client stream to a server's DeltaStream, so it can be used in DeltaStreamHandler
func NewServerStream(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) ServerStream {
s := &serverStream{
stream: stream,
Expand Down
Loading