Skip to content

Commit

Permalink
reconnecting the cloudevents client
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Sep 20, 2023
1 parent d0a4952 commit f27e8ba
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 218 deletions.
165 changes: 67 additions & 98 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"

"open-cluster-management.io/api/cloudevents/generic/options"
Expand All @@ -21,14 +19,12 @@ import (
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
// to the source.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
cloudEventsClient cloudevents.Client
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
rateLimiter flowcontrol.RateLimiter
agentID string
clusterName string
*baseClient
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
statusHashGetter StatusHashGetter[T]
agentID string
clusterName string
}

// NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to
Expand Down Expand Up @@ -56,14 +52,17 @@ func NewCloudEventAgentClient[T ResourceObject](
}

return &CloudEventAgentClient[T]{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsClient: cloudEventsClient,
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
rateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
baseClient: &baseClient{
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
cloudEventsClient: cloudEventsClient,
cloudEventsErrChan: agentOptions.CloudEventsOptions.ErrorChan(),
},
lister: lister,
codecs: evtCodes,
statusHashGetter: statusHashGetter,
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
}, nil
}

Expand Down Expand Up @@ -101,12 +100,7 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
if err := c.publish(ctx, evt); err != nil {
return err
}
}
Expand All @@ -130,12 +124,7 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}

sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
if err := c.publish(ctx, *evt); err != nil {
return err
}

Expand All @@ -145,63 +134,67 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
// Subscribe the events that are from the source status resync request or source resource spec request.
// For status resync request, agent publish the current resources status back as response.
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event:\n%s", evt)

eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) {
c.subscribe(ctx, func(ctx context.Context, evt cloudevents.Event) {
c.receive(ctx, evt, handlers...)
})
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}
func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.Event, handlers ...ResourceHandler[T]) {
klog.V(4).Infof("Received event:\n%s", evt)

if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
klog.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
return
}

if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
klog.Warningf("unsupported resync event type %s, ignore", eventType)
return
}

if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}

codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}
return
}

obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}
if eventType.SubResource != types.SubResourceSpec {
klog.Warningf("unsupported event type %s, ignore", eventType)
return
}

action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}
obj, err := codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
action, err := c.specAction(evt.Source(), obj)
if err != nil {
klog.Errorf("failed to generate spec action %s, %v", evt, err)
return
}

if len(action) == 0 {
// no action is required, ignore
return
}

for _, handler := range handlers {
if err := handler(action, obj); err != nil {
klog.Errorf("failed to handle spec event %s, %v", evt, err)
}
})
}
}

// Upon receiving the status resync event, the agent responds by sending resource status events to the broker as
Expand Down Expand Up @@ -287,30 +280,6 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R
return types.Modified, nil
}

func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter,
sender cloudevents.Client, evt cloudevents.Event) error {
now := time.Now()

err := limiter.Wait(sendingCtx)
if err != nil {
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
}

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
}

klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)

if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

return nil
}

func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) {
for _, obj := range objs {
if string(obj.GetUID()) == resourceID {
Expand Down
10 changes: 3 additions & 7 deletions cloudevents/generic/agentclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ func TestStatusResyncResponse(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if err := agent.Subscribe(context.TODO()); err != nil {
t.Errorf("unexpected error %v", err)
}
agent.receive(context.TODO(), c.requestEvent)

c.validate(client.GetSentEvents())
})
Expand Down Expand Up @@ -440,13 +438,11 @@ func TestReceiveResourceSpec(t *testing.T) {

var actualEvent types.ResourceAction
var actualRes *mockResource
if err := agent.Subscribe(context.TODO(), func(event types.ResourceAction, resource *mockResource) error {
agent.receive(context.TODO(), c.requestEvent, func(event types.ResourceAction, resource *mockResource) error {
actualEvent = event
actualRes = resource
return nil
}); err != nil {
t.Errorf("unexpected error %v", err)
}
})

c.validate(actualEvent, actualRes)
})
Expand Down
140 changes: 140 additions & 0 deletions cloudevents/generic/baseclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package generic

import (
"context"
"fmt"
"sync"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"open-cluster-management.io/api/cloudevents/generic/options"
)

type receiveFn func(ctx context.Context, evt cloudevents.Event)

type baseClient struct {
sync.RWMutex
cloudEventsOptions options.CloudEventsOptions
cloudEventsClient cloudevents.Client
cloudEventsErrChan <-chan error
cloudEventsRateLimiter flowcontrol.RateLimiter
subStarted bool
}

func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
now := time.Now()

if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
}

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
}

sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
if err != nil {
return err
}

klog.V(4).Infof("Sent event: %v\n%s", ctx, evt)

if result := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt, result)
}

return nil
}

func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
c.Lock()
defer c.Unlock()

// make sure there is only one subscription go routine starting for one client.
if c.subStarted {
klog.Warningf("the subscription has already started")
return
}

// start a go routine to handle cloudevents subscription
go func() {
var receiverCtx context.Context
var receiverCancel context.CancelFunc
var retrying, receiving bool

// the reconnect backoff will stop at [1,5) min interval. If we don't backoff for 10min, we reset the backoff.
connBackoffManager := wait.NewExponentialBackoffManager(5*time.Second, 1*time.Minute, 10*time.Minute, 5.0, 1.0, &clock.RealClock{})

for {
select {
case <-ctx.Done():
if receiverCancel != nil {
receiverCancel()
}
return
case err, ok := <-c.cloudEventsErrChan:
if !ok {
continue
}

klog.Errorf("the cloudevents client is disconnected, %v", err)

// the cloudevents network connection is closed, stop the receiver and start retry
if receiverCancel != nil {
receiverCancel()
}
retrying = true
receiving = false
<-connBackoffManager.Backoff().C()
continue
default:
}

if retrying {
klog.V(4).Infof("reconnecting the cloudevents client")
cloudEventsClient, err := c.cloudEventsOptions.Client(ctx)
if err != nil {
// failed to reconnect, retry agin
// TODO enhance the cloudevents SKD to avoid wrapping the error type, then we can only handle the
// net connection errors here
klog.Errorf("failed to reconnect, %v", err)
<-connBackoffManager.Backoff().C()
continue
}

// the cloudevents network connection is back, stop the retry and refresh the current cloudevents client
retrying = false
c.cloudEventsClient = cloudEventsClient
}

if !receiving && !retrying {
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
klog.V(4).Infof("starting the cloudevents receiver")

// TODO send a resync request

go func() {
if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) {
receive(receiverCtx, evt)
}); err != nil {
runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err))
}
}()
receiving = true
}

<-time.After(time.Second)
}
}()

c.subStarted = true
}
Loading

0 comments on commit f27e8ba

Please sign in to comment.