Skip to content

Commit

Permalink
Check For Non NIL Error on Publishing
Browse files Browse the repository at this point in the history
Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
  • Loading branch information
rodneyosodo committed Jul 28, 2023
1 parent 9f2d4dc commit e4bf53e
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions internal/clients/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

const (
checkUnpublishedEventsInterval = 1 * time.Minute
checkRedisConnectionInterval = 100 * time.Millisecond
maxNumberOfUnpublishedEvents uint64 = 1e6
unpublishedEventsCheckInterval = 1 * time.Minute
redisConnCheckInterval = 100 * time.Millisecond
maxUnpublishedEvents uint64 = 1e6
)

// Event represents redis event.
Expand Down Expand Up @@ -44,7 +44,7 @@ type eventStore struct {
func NewEventStore(client *redis.Client, streamID string, streamLen int64) Publisher {
return &eventStore{
client: client,
unpublishedEvents: make(chan *redis.XAddArgs, maxNumberOfUnpublishedEvents),
unpublishedEvents: make(chan *redis.XAddArgs, maxUnpublishedEvents),
streamID: streamID,
streamLen: streamLen,
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (es *eventStore) Publish(ctx context.Context, event Event) error {
}

func (es *eventStore) StartPublishingRoutine(ctx context.Context) {
ticker := time.NewTicker(checkUnpublishedEventsInterval)
ticker := time.NewTicker(unpublishedEventsCheckInterval)
defer ticker.Stop()

for {
Expand All @@ -85,8 +85,9 @@ func (es *eventStore) StartPublishingRoutine(ctx context.Context) {
es.mu.Lock()
for i := len(es.unpublishedEvents) - 1; i >= 0; i-- {
record := <-es.unpublishedEvents
if err := es.client.XAdd(ctx, record).Err(); err == nil {
continue
if err := es.client.XAdd(ctx, record).Err(); err != nil {
es.unpublishedEvents <- record
break
}
}
es.mu.Unlock()
Expand All @@ -99,7 +100,7 @@ func (es *eventStore) StartPublishingRoutine(ctx context.Context) {

func (es *eventStore) checkRedisConnection(ctx context.Context) error {
// A timeout is used to avoid blocking the main thread
ctx, cancel := context.WithTimeout(ctx, checkRedisConnectionInterval)
ctx, cancel := context.WithTimeout(ctx, redisConnCheckInterval)
defer cancel()

return es.client.Ping(ctx).Err()
Expand Down

0 comments on commit e4bf53e

Please sign in to comment.