Skip to content

Commit

Permalink
set max retries, locallog improvement (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesReate authored Jan 28, 2024
1 parent 34b9604 commit 68871b3
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions internal/controllers/download_data_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"errors"

"encoding/json"
"time"
Expand Down Expand Up @@ -75,15 +76,16 @@ func (d *DataDownloadController) DataDownloadHandler(c *fiber.Ctx) error {
}

func (d *DataDownloadController) DataDownloadConsumer(ctx context.Context) error {
sub, err := d.NATSSvc.JetStream.PullSubscribe(d.NATSSvc.JetStreamSubject, d.NATSSvc.DurableConsumer, nats.AckWait(d.NATSSvc.AckTimeout))
sub, err := d.NATSSvc.JetStream.PullSubscribe(d.NATSSvc.JetStreamSubject, d.NATSSvc.DurableConsumer,
nats.AckWait(d.NATSSvc.AckTimeout), nats.MaxDeliver(2))
if err != nil {
return err
}

for {
msgs, err := sub.Fetch(1)
if err != nil {
if err == nats.ErrTimeout {
if errors.Is(err, nats.ErrTimeout) {
continue
}
d.log.Err(err).Msg("error fetching from data download stream")
Expand All @@ -109,8 +111,9 @@ func (d *DataDownloadController) DataDownloadConsumer(ctx context.Context) error
d.log.Error().Msgf("unable to parse query parameters: %+v", err)
continue
}
localLog := d.log.With().Str("userId", params.UserID).Str("userDeviceID", params.UserDeviceID).Logger()

d.log.Info().Str("userId", params.UserID).Str("userDeviceID", params.UserDeviceID).Msg("data download initiated")
localLog.Info().Msg("data download initiated")
d.inProgress(msg, params)

nestedCtx, cancel := context.WithCancel(ctx)
Expand All @@ -130,7 +133,7 @@ func (d *DataDownloadController) DataDownloadConsumer(ctx context.Context) error
s3link, err := d.QuerySvc.StreamDataToS3(ctx, params.UserDeviceID)
if err != nil {
d.nak(msg, &params)
d.log.Err(err).Str("userId", params.UserID).Str("userDeviceID", params.UserDeviceID).Msg("error while fetching data from elasticsearch")
localLog.Err(err).Msg("error while fetching data from elasticsearch")
cancel()
continue
}
Expand All @@ -141,12 +144,12 @@ func (d *DataDownloadController) DataDownloadConsumer(ctx context.Context) error
err = d.EmailSvc.SendEmail(params.UserID, s3link)
if err != nil {
d.nak(msg, &params)
d.log.Err(err).Msg("unable to put send email")
localLog.Err(err).Msg("unable to put send email")
continue
}

d.ack(msg, params)
d.log.Info().Str("userId", params.UserID).Str("userDeviceID", params.UserDeviceID).Uint64("numDelivered", mtd.NumDelivered).Msg("data download completed")
localLog.Info().Uint64("numDelivered", mtd.NumDelivered).Msg("data download completed")
}
}
}
Expand Down

0 comments on commit 68871b3

Please sign in to comment.