Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): set error when KDS clients fails in goroutine (backport of #7725) #7834

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func Setup(rt runtime.Runtime) error {
go func() {
if err := kdsServer.StreamKumaResources(session.ServerStream()); err != nil {
log.Error(err, "StreamKumaResources finished with an error")
session.SetError(err)
} else {
log.V(1).Info("StreamKumaResources finished gracefully")
}
Expand All @@ -100,6 +101,7 @@ func Setup(rt runtime.Runtime) error {
go func() {
if err := sink.Receive(); err != nil {
log.Error(err, "KDSSink finished with an error")
session.SetError(err)
} else {
log.V(1).Info("KDSSink finished gracefully")
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, con
errorCh <- err
return
}
if err := c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream); err != nil {
errorCh <- errors.Wrap(err, "closing Global to Zone Sync stream after callback error")
return
}
c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, errorCh)
<-stop
log.Info("Global to Zone Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
Expand All @@ -183,10 +180,7 @@ func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, con
errorCh <- err
return
}
if err := c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream); err != nil {
errorCh <- errors.Wrap(err, "closing Zone to Global Sync stream after callback error")
return
}
c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, errorCh)
<-stop
log.Info("Zone to Global Sync rpc stream stopped")
if err := stream.CloseSend(); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kds/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ func (f OnSessionStartedFunc) OnSessionStarted(session Session) error {
return f(session)
}

type OnGlobalToZoneSyncStartedFunc func(session mesh_proto.KDSSyncService_GlobalToZoneSyncClient) error
type OnGlobalToZoneSyncStartedFunc func(session mesh_proto.KDSSyncService_GlobalToZoneSyncClient, errorCh chan error)

func (f OnGlobalToZoneSyncStartedFunc) OnGlobalToZoneSyncStarted(session mesh_proto.KDSSyncService_GlobalToZoneSyncClient) error {
return f(session)
func (f OnGlobalToZoneSyncStartedFunc) OnGlobalToZoneSyncStarted(session mesh_proto.KDSSyncService_GlobalToZoneSyncClient, errorCh chan error) {
f(session, errorCh)
}

type OnZoneToGlobalSyncStartedFunc func(session mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) error
type OnZoneToGlobalSyncStartedFunc func(session mesh_proto.KDSSyncService_ZoneToGlobalSyncClient, errorCh chan error)

func (f OnZoneToGlobalSyncStartedFunc) OnZoneToGlobalSyncStarted(session mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) error {
return f(session)
func (f OnZoneToGlobalSyncStartedFunc) OnZoneToGlobalSyncStarted(session mesh_proto.KDSSyncService_ZoneToGlobalSyncClient, errorCh chan error) {
f(session, errorCh)
}

type server struct {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kds/mux/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Session interface {
ClientStream() mesh_proto.KumaDiscoveryService_StreamKumaResourcesClient
PeerID() string
Error() <-chan error
SetError(err error)
}

type session struct {
Expand Down Expand Up @@ -77,7 +78,7 @@ func (s *session) handleRecv(stream MultiplexStream) {
s.clientStream.bufferStream.close()
s.serverStream.bufferStream.close()
// Recv always finishes with either an EOF or another error
s.setError(err)
s.SetError(err)
return
}
// convert legacy messages
Expand Down Expand Up @@ -134,19 +135,19 @@ func (s *session) handleSend(stream MultiplexStream, sendTimeout time.Duration)
// 1) This is a malicious client reading stream byte by byte. In this case it's actually better to end the stream
// 2) A client is such overwhelmed that it cannot even let the server know that it's ready to receive more data.
// In this case it's recommended to scale number of instances.
s.setError(errors.New("timeout while sending a message to peer"))
s.SetError(errors.New("timeout while sending a message to peer"))
}
}()
if err := stream.Send(msgToSend); err != nil {
s.setError(err)
s.SetError(err)
cancel()
return
}
cancel()
}
}

func (s *session) setError(err error) {
func (s *session) SetError(err error) {
// execute this once so writers to this channel won't be stuck or trying to write to a close channel
// We only care about the first error, because it results in broken session anyway.
s.Once.Do(func() {
Expand Down
22 changes: 14 additions & 8 deletions pkg/kds/zone/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func Setup(rt core_runtime.Runtime) error {
log.Info("new session created")
go func() {
if err := kdsServer.StreamKumaResources(session.ServerStream()); err != nil {
log.Error(err, "StreamKumaResources finished with an error")
session.SetError(errors.Wrap(err, "StreamKumaResources finished with an error"))
} else {
log.V(1).Info("StreamKumaResources finished gracefully")
}
}()
sink := kds_client.NewKDSSink(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(session.ClientStream(), zone, string(cfgJson)),
Expand All @@ -103,13 +105,15 @@ func Setup(rt core_runtime.Runtime) error {
)
go func() {
if err := sink.Receive(); err != nil {
log.Error(err, "KDSSink finished with an error")
session.SetError(errors.Wrap(err, "KDSSink finished with an error"))
} else {
log.V(1).Info("KDSSink finished gracefully")
}
}()
return nil
})

onGlobalToZoneSyncStarted := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncClient) error {
onGlobalToZoneSyncStarted := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.KDSSyncService_GlobalToZoneSyncClient, errChan chan error) {
log := kdsDeltaZoneLog.WithValues("kds-version", "v2")
syncClient := kds_client_v2.NewKDSSyncClient(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client_v2.NewDeltaKDSStream(stream, zone, string(cfgJson)),
kds_sync_store_v2.ZoneSyncCallback(
Expand All @@ -124,22 +128,24 @@ func Setup(rt core_runtime.Runtime) error {
)
go func() {
if err := syncClient.Receive(); err != nil {
log.Error(err, "KDSSyncClient finished with an error")
errChan <- errors.Wrap(err, "GlobalToZoneSyncClient finished with an error")
} else {
log.V(1).Info("GlobalToZoneSyncClient finished gracefully")
}
}()
return nil
})

onZoneToGlobalSyncStarted := mux.OnZoneToGlobalSyncStartedFunc(func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient) error {
onZoneToGlobalSyncStarted := mux.OnZoneToGlobalSyncStartedFunc(func(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncClient, errChan chan error) {
log := kdsDeltaZoneLog.WithValues("kds-version", "v2", "peer-id", "global")
log.Info("ZoneToGlobalSync new session created")
session := kds_server_v2.NewServerStream(stream)
go func() {
if err := kdsServerV2.ZoneToGlobal(session); err != nil {
log.Error(err, "ZoneToGlobalSync finished with an error", err)
errChan <- errors.Wrap(err, "ZoneToGlobalSync finished with an error")
} else {
log.V(1).Info("ZoneToGlobalSync finished gracefully")
}
}()
return nil
})

muxClient := mux.NewClient(
Expand Down