diff --git a/internal/clients/redis/producer.go b/internal/clients/redis/producer.go index 505332c63cc..a848ac4f094 100644 --- a/internal/clients/redis/producer.go +++ b/internal/clients/redis/producer.go @@ -12,9 +12,9 @@ import ( ) const ( - checkUnpublishedEventsInterval = 1 * time.Minute - checkRedisConnectionInterval = 100 * time.Millisecond - maxNumberOfUnpublishedEvents uint64 = 1e6 + unpublishedEventsCheckInterval = 1 * time.Minute + redisConnCheckInterval = 100 * time.Millisecond + maxUnpublishedEvents uint64 = 1e6 ) // Event represents redis event. @@ -44,7 +44,7 @@ type eventStore struct { func NewEventStore(client *redis.Client, streamID string, streamLen int64) Publisher { return &eventStore{ client: client, - unpublishedEvents: make(chan *redis.XAddArgs, maxNumberOfUnpublishedEvents), + unpublishedEvents: make(chan *redis.XAddArgs, maxUnpublishedEvents), streamID: streamID, streamLen: streamLen, } @@ -75,7 +75,7 @@ func (es *eventStore) Publish(ctx context.Context, event Event) error { } func (es *eventStore) StartPublishingRoutine(ctx context.Context) { - ticker := time.NewTicker(checkUnpublishedEventsInterval) + ticker := time.NewTicker(unpublishedEventsCheckInterval) defer ticker.Stop() for { @@ -85,8 +85,9 @@ func (es *eventStore) StartPublishingRoutine(ctx context.Context) { es.mu.Lock() for i := len(es.unpublishedEvents) - 1; i >= 0; i-- { record := <-es.unpublishedEvents - if err := es.client.XAdd(ctx, record).Err(); err == nil { - continue + if err := es.client.XAdd(ctx, record).Err(); err != nil { + es.unpublishedEvents <- record + break } } es.mu.Unlock() @@ -99,7 +100,7 @@ func (es *eventStore) StartPublishingRoutine(ctx context.Context) { func (es *eventStore) checkRedisConnection(ctx context.Context) error { // A timeout is used to avoid blocking the main thread - ctx, cancel := context.WithTimeout(ctx, checkRedisConnectionInterval) + ctx, cancel := context.WithTimeout(ctx, redisConnCheckInterval) defer cancel() return es.client.Ping(ctx).Err()