-
Notifications
You must be signed in to change notification settings - Fork 75
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
retry to connect the cloudevents client
Signed-off-by: Wei Liu <liuweixa@redhat.com>
- Loading branch information
Showing
14 changed files
with
391 additions
and
218 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package generic | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/util/flowcontrol" | ||
"k8s.io/klog/v2" | ||
"k8s.io/utils/clock" | ||
"open-cluster-management.io/api/cloudevents/generic/options" | ||
) | ||
|
||
type receiveFn func(ctx context.Context, evt cloudevents.Event) | ||
|
||
type baseClient struct { | ||
sync.RWMutex | ||
cloudEventsOptions options.CloudEventsOptions | ||
cloudEventsClient cloudevents.Client | ||
cloudEventsErrChan <-chan error | ||
cloudEventsRateLimiter flowcontrol.RateLimiter | ||
subStarted bool | ||
} | ||
|
||
func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { | ||
now := time.Now() | ||
|
||
if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil { | ||
return fmt.Errorf("client rate limiter Wait returned an error: %w", err) | ||
} | ||
|
||
latency := time.Since(now) | ||
if latency > longThrottleLatency { | ||
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", | ||
latency, evt)) | ||
} | ||
|
||
sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
klog.V(4).Infof("Sent event: %v\n%s", ctx, evt) | ||
|
||
if result := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) { | ||
return fmt.Errorf("failed to send event %s, %v", evt, result) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { | ||
c.Lock() | ||
defer c.Unlock() | ||
|
||
// make sure there is only one subscription go routine starting for one client. | ||
if c.subStarted { | ||
klog.Warningf("the subscription has already started") | ||
return | ||
} | ||
|
||
// start a go routine to handle cloudevents subscription | ||
go func() { | ||
var receiverCtx context.Context | ||
var receiverCancel context.CancelFunc | ||
var retrying, receiving bool | ||
|
||
clock := &clock.RealClock{} | ||
|
||
// give a long waiting time for main loop | ||
backoffManager := wait.NewJitteredBackoffManager(10*time.Minute, 2.0, clock) | ||
// the reconnect backoff will stop at [5,10) min interval. If we don't backoff for 15min, we reset the backoff. | ||
connBackoffManager := wait.NewExponentialBackoffManager(5*time.Second, 5*time.Minute, 15*time.Minute, 2.0, 1.0, clock) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
if receiverCancel != nil { | ||
receiverCancel() | ||
} | ||
return | ||
case err, ok := <-c.cloudEventsErrChan: | ||
if !ok { | ||
continue | ||
} | ||
|
||
klog.Errorf("the cloudevents client is disconnected, %v", err) | ||
|
||
// the cloudevents network connection is closed, stop the receiver and start retry | ||
if receiverCancel != nil { | ||
receiverCancel() | ||
} | ||
retrying = true | ||
receiving = false | ||
<-connBackoffManager.Backoff().C() | ||
continue | ||
default: | ||
} | ||
|
||
if retrying { | ||
klog.Infof("reconnecting the cloudevents client") | ||
cloudEventsClient, err := c.cloudEventsOptions.Client(ctx) | ||
if err != nil { | ||
// failed to reconnect, retry agin | ||
// TODO enhance the cloudevents SKD to avoid wrapping the error type, then we can only handle the | ||
// net connection errors here | ||
klog.Errorf("failed to reconnect, %v", err, err) | ||
<-connBackoffManager.Backoff().C() | ||
continue | ||
} | ||
|
||
// the cloudevents network connection is back, stop the retry and refresh the current cloudevents client | ||
retrying = false | ||
c.cloudEventsClient = cloudEventsClient | ||
} | ||
|
||
if !receiving && !retrying { | ||
receiverCtx, receiverCancel = context.WithCancel(context.TODO()) | ||
klog.Infof("starting the cloudevents receiver") | ||
go func() { | ||
if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) { | ||
receive(receiverCtx, evt) | ||
}); err != nil { | ||
klog.Errorf("failed to start the cloudevents receiver, %v", err) | ||
} | ||
}() | ||
receiving = true | ||
} | ||
|
||
<-backoffManager.Backoff().C() | ||
} | ||
}() | ||
|
||
c.subStarted = true | ||
} |
Oops, something went wrong.