Skip to content

Commit

Permalink
chore: logging cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Jul 28, 2023
1 parent 4fcd77c commit 57a65e4
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 58 deletions.
30 changes: 15 additions & 15 deletions cmd/cas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func main() {

discordHandler, err := notifs.NewDiscordHandler(logger)
if err != nil {
logger.Fatalf("failed to create discord handler: %v", err)
logger.Fatalf("error creating discord handler: %v", err)
}

metricService, err := metrics.NewMetricService(serverCtx, logger)
if err != nil {
logger.Fatalf("failed to create metric service: %v", err)
logger.Fatalf("error creating metric service: %v", err)
}

// Queue publishers
Expand All @@ -81,11 +81,11 @@ func main() {
queue.PublisherOpts{QueueType: queue.QueueType_DLQ, VisibilityTimeout: visibilityTimeout},
)
if err != nil {
logger.Fatalf("failed to create dead-letter queue: %v", err)
logger.Fatalf("error creating dead-letter queue: %v", err)
}
dlqArn, err := queue.GetQueueArn(serverCtx, deadLetterQueue.GetUrl(), sqsClient)
if err != nil {
logger.Fatalf("failed to fetch dead-letter queue arn: %v", err)
logger.Fatalf("error fetching dead-letter queue arn: %v", err)
}
redrivePolicy := &queue.QueueRedrivePolicy{
DeadLetterTargetArn: dlqArn,
Expand All @@ -104,7 +104,7 @@ func main() {
},
)
if err != nil {
logger.Fatalf("failed to create failure queue: %v", err)
logger.Fatalf("error creating failure queue: %v", err)
}
// Validate queue
validateQueue, err := queue.NewPublisher(
Expand All @@ -117,7 +117,7 @@ func main() {
},
)
if err != nil {
logger.Fatalf("failed to create validate queue: %v", err)
logger.Fatalf("error creating validate queue: %v", err)
}
// The Ready and Batch queues will need larger visibility timeouts than the other queues. Requests pulled from the
// Ready queue will remain in flight for the batch linger duration. Batches from the Batch queue will remain in
Expand Down Expand Up @@ -151,7 +151,7 @@ func main() {
},
)
if err != nil {
logger.Fatalf("failed to create ready queue: %v", err)
logger.Fatalf("error creating ready queue: %v", err)
}
batchQueue, err := queue.NewPublisher(
serverCtx,
Expand All @@ -163,7 +163,7 @@ func main() {
},
)
if err != nil {
logger.Fatalf("failed to create batch queue: %v", err)
logger.Fatalf("error creating batch queue: %v", err)
}
// Status queue
statusQueue, err := queue.NewPublisher(
Expand All @@ -176,28 +176,28 @@ func main() {
},
)
if err != nil {
logger.Fatalf("failed to create status queue: %v", err)
logger.Fatalf("error creating status queue: %v", err)
}

// Create utilization gauges for all the queues
if err = metricService.QueueGauge(serverCtx, deadLetterQueue.GetName(), queue.NewMonitor(deadLetterQueue.GetUrl(), sqsClient)); err != nil {
logger.Fatalf("failed to create utilization gauge for dead-letter queue: %v", err)
logger.Fatalf("error creating gauge for dead-letter queue: %v", err)
}
if err = metricService.QueueGauge(serverCtx, failureQueue.GetName(), queue.NewMonitor(failureQueue.GetUrl(), sqsClient)); err != nil {
logger.Fatalf("failed to create utilization gauge for failure queue: %v", err)
logger.Fatalf("error creating gauge for failure queue: %v", err)
}
if err = metricService.QueueGauge(serverCtx, validateQueue.GetName(), queue.NewMonitor(validateQueue.GetUrl(), sqsClient)); err != nil {
logger.Fatalf("failed to create utilization gauge for validate queue: %v", err)
logger.Fatalf("error creating gauge for validate queue: %v", err)
}
if err = metricService.QueueGauge(serverCtx, readyQueue.GetName(), queue.NewMonitor(readyQueue.GetUrl(), sqsClient)); err != nil {
logger.Fatalf("failed to create utilization gauge for ready queue: %v", err)
logger.Fatalf("error creating gauge for ready queue: %v", err)
}
batchMonitor := queue.NewMonitor(batchQueue.GetUrl(), sqsClient)
if err = metricService.QueueGauge(serverCtx, batchQueue.GetName(), batchMonitor); err != nil {
logger.Fatalf("failed to create utilization gauge for batch queue: %v", err)
logger.Fatalf("error creating gauge for batch queue: %v", err)
}
if err = metricService.QueueGauge(serverCtx, statusQueue.GetName(), queue.NewMonitor(statusQueue.GetUrl(), sqsClient)); err != nil {
logger.Fatalf("failed to create utilization gauge for status queue: %v", err)
logger.Fatalf("error creating gauge for status queue: %v", err)
}

// Create the queue consumers. These consumers will be responsible for scaling event processing up based on load and
Expand Down
2 changes: 1 addition & 1 deletion common/aws/ddb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewJobDb(ctx context.Context, logger models.Logger, ddbClient *dynamodb.Cli
jobTable := "ceramic-" + os.Getenv(cas.Env_Env) + "-ops"
jdb := JobDatabase{ddbClient, jobTable, logger}
if err := jdb.createJobTable(ctx); err != nil {
jdb.logger.Fatalf("table creation failed: %v", err)
jdb.logger.Fatalf("error creating table: %v", err)
}
return &jdb
}
Expand Down
8 changes: 4 additions & 4 deletions common/aws/ddb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func NewStateDb(ctx context.Context, logger models.Logger, client *dynamodb.Clie
logger,
}
if err := sdb.createCheckpointTable(ctx); err != nil {
sdb.logger.Fatalf("checkpoint table creation failed: %v", err)
sdb.logger.Fatalf("error creating checkpoint table: %v", err)
} else if err = sdb.createStreamTable(ctx); err != nil {
sdb.logger.Fatalf("stream table creation failed: %v", err)
sdb.logger.Fatalf("error creating stream table: %v", err)
} else if err = sdb.createTipTable(ctx); err != nil {
sdb.logger.Fatalf("tip table creation failed: %v", err)
sdb.logger.Fatalf("error creating tip table: %v", err)
}
return &sdb
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (sdb *StateDatabase) UpdateCheckpoint(ctx context.Context, checkpointType m
var condUpdErr *types.ConditionalCheckFailedException
if errors.As(err, &condUpdErr) {
// Not an error, just indicate that we couldn't update the entry
sdb.logger.Errorf("could not update checkpoint: %s, %v", checkpointStr, err)
sdb.logger.Errorf("error updating checkpoint: %s, %v", checkpointStr, err)
return false, nil
}
sdb.logger.Errorf("error writing to db: %v", err)
Expand Down
9 changes: 3 additions & 6 deletions common/aws/queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,16 @@ func NewConsumer(logger models.Logger, publisher *Publisher, callback gosqs.Mess
}

func (c Consumer) Start() {
c.logger.Infof("%s: consumer starting...", c.queueType)
c.consumer.Start()
c.logger.Infof("%s: consumer started", c.queueType)
c.logger.Infof("%s: started", c.queueType)
}

func (c Consumer) Shutdown() {
c.logger.Infof("%s: consumer stopping...", c.queueType)
c.consumer.Shutdown()
c.logger.Infof("%s: consumer stopped", c.queueType)
c.logger.Infof("%s: stopped", c.queueType)
}

func (c Consumer) WaitForRxShutdown() {
c.logger.Infof("%s: consumer rx stopping...", c.queueType)
c.consumer.WaitForRxShutdown()
c.logger.Infof("%s: consumer rx stopped", c.queueType)
c.logger.Infof("%s: rx stopped", c.queueType)
}
21 changes: 13 additions & 8 deletions common/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,29 @@ func parseDiscordWebhookUrl(urlEnv string) (webhook.Client, error) {
return nil, nil
}

func (d DiscordHandler) SendAlert(title, desc string) error {
func (d DiscordHandler) SendAlert(title, desc, content string) error {
if d.alertWebhook != nil {
return d.sendNotif(d.alertWebhook, title, desc, DiscordColor_Alert)
return d.sendNotif(d.alertWebhook, title, desc, content, DiscordColor_Alert)
}
// Always duplicate notifications to the test channel, if configured.
if d.testWebhook != nil {
return d.sendNotif(d.testWebhook, title, desc, DiscordColor_Alert)
return d.sendNotif(d.testWebhook, title, desc, content, DiscordColor_Alert)
}
return nil
}

// TODO: Need to make the output more readable for specific errors
func (d DiscordHandler) sendNotif(wh webhook.Client, title, desc string, color DiscordColor) error {
func (d DiscordHandler) sendNotif(wh webhook.Client, title, desc, content string, color DiscordColor) error {
messageEmbed := discord.Embed{
Title: title,
Description: desc,
Type: discord.EmbedTypeRich,
Color: int(color),
Title: title,
Fields: []discord.EmbedField{
{
Name: desc,
Value: content,
},
},
Type: discord.EmbedTypeRich,
Color: int(color),
}
_, err := wh.CreateMessage(discord.NewWebhookMessageCreateBuilder().
SetEmbeds(messageEmbed).
Expand Down
8 changes: 0 additions & 8 deletions models/error.go

This file was deleted.

13 changes: 13 additions & 0 deletions models/notifs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package models

const AlertTitle = "CASv5 Alert"

const (
AlertDesc_DeadLetterQueue = "Dead Letter Queue"
AlertDesc_Unprocessed = "Unprocessed Requests"
)

const (
AlertFmt_DeadLetterQueue string = "%s:\n%s"
AlertFmt_Unprocessed string = "%d requests found between:\n%s\n%s"
)
2 changes: 1 addition & 1 deletion models/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type QueueMonitor interface {
}

type Notifier interface {
SendAlert(title, desc string) error
SendAlert(title, desc, content string) error
}

type MetricService interface {
Expand Down
2 changes: 1 addition & 1 deletion services/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (b BatchingService) batch(ctx context.Context, anchorReqs []*models.AnchorR
batchResults[idx] = results.New[*uuid.UUID](&anchorReqBatch.Id, nil)
}
if _, err := b.batchPublisher.SendMessage(ctx, anchorReqBatch); err != nil {
b.logger.Errorf("failed to send message: %v, %v", anchorReqBatch, err)
b.logger.Errorf("error sending message: %v, %v", anchorReqBatch, err)
return nil, err
}
b.metricService.Count(ctx, models.MetricName_BatchCreated, 1)
Expand Down
7 changes: 6 additions & 1 deletion services/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ func (f FailureHandlingService) DLQ(ctx context.Context, msgBody string) error {
msgType = "Batch request"
}
}
return f.notif.SendAlert(models.ErrorTitle, fmt.Sprintf(models.ErrorMessageFmt_DLQ, msgType, msgBody))
text, _ := json.MarshalIndent(msgBody, "", "")
return f.notif.SendAlert(
models.AlertTitle,
models.AlertDesc_DeadLetterQueue,
fmt.Sprintf(models.AlertFmt_DeadLetterQueue, msgType, text),
)
}
13 changes: 5 additions & 8 deletions services/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (p RequestPoller) Run(ctx context.Context) {
p.logger.Debugf("found %d requests newer than %s", len(anchorReqs), startCheckpoint)
// Send an alert because we shouldn't have found any old unprocessed requests
err = p.notif.SendAlert(
models.ErrorTitle,
fmt.Sprintf(models.ErrorMessageFmt_Unprocessed, startCheckpoint, endCheckpoint),
models.AlertTitle,
models.AlertDesc_Unprocessed,
fmt.Sprintf(models.AlertFmt_Unprocessed, len(anchorReqs), startCheckpoint, endCheckpoint),
)
if err != nil {
p.logger.Errorf("error sending alert: %v", err)
Expand All @@ -96,11 +97,7 @@ func (p RequestPoller) Run(ctx context.Context) {
// It's possible the checkpoint was updated even if a particular request in the batch failed to be
// queued.
if nextCheckpoint := p.sendRequestMessages(ctx, anchorReqMsgs); nextCheckpoint.After(startCheckpoint) {
p.logger.Debugw(
"checkpoints",
"start", startCheckpoint,
"next", nextCheckpoint,
)
p.logger.Debugf("checkpoints: start=%s, next=%s", startCheckpoint, nextCheckpoint)
if _, err = p.stateDb.UpdateCheckpoint(ctx, models.CheckpointType_RequestPoll, nextCheckpoint); err != nil {
p.logger.Errorf("error updating checkpoint %s: %v", nextCheckpoint, err)
} else {
Expand All @@ -120,7 +117,7 @@ func (p RequestPoller) sendRequestMessages(ctx context.Context, anchorReqs []*mo
processedCheckpoint := anchorReqs[0].CreatedAt.Add(-time.Millisecond)
for _, anchorReq := range anchorReqs {
if _, err := p.validatePublisher.SendMessage(ctx, anchorReq); err != nil {
p.logger.Errorf("failed to send message: %v, %v", anchorReq, err)
p.logger.Errorf("error sending message: %v, %v", anchorReq, err)
break
}
processedCheckpoint = anchorReq.CreatedAt
Expand Down
8 changes: 4 additions & 4 deletions services/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (v ValidationService) Validate(ctx context.Context, msgBody string) error {
// new UUID generated by the Anchor DB.
v.metricService.Count(ctx, models.MetricName_ValidateIngressRequest, 1)
if storedTip, oldTip, err := v.stateDb.UpdateTip(ctx, newTip); err != nil {
v.logger.Errorf("failed to store tip: %v, %v", anchorReq, err)
v.logger.Errorf("error storing tip: %v, %v", anchorReq, err)
return err
} else if !storedTip {
// Mark the current request REPLACED because we found a newer stream/origin timestamp in the DB
Expand All @@ -79,7 +79,7 @@ func (v ValidationService) Validate(ctx context.Context, msgBody string) error {
}
}
if storedCid, err := v.stateDb.StoreCid(ctx, streamCid); err != nil {
v.logger.Errorf("failed to store cid: %v, %v", anchorReq, err)
v.logger.Errorf("error storing cid: %v, %v", anchorReq, err)
return err
} else if !storedCid && !isReprocessedTip {
// Mark the current request REPLACED if we found the stream/CID in the DB and this was not a reprocessed
Expand All @@ -90,7 +90,7 @@ func (v ValidationService) Validate(ctx context.Context, msgBody string) error {
} else
// This request has been fully de-duplicated so send it to the next stage
if _, err = v.readyPublisher.SendMessage(ctx, anchorReq); err != nil {
v.logger.Errorf("failed to send ready message: %v, %v", anchorReq, err)
v.logger.Errorf("error sending ready message: %v, %v", anchorReq, err)
return err
}
}
Expand All @@ -100,7 +100,7 @@ func (v ValidationService) Validate(ctx context.Context, msgBody string) error {
func (v ValidationService) sendStatusMsg(ctx context.Context, id uuid.UUID, status models.RequestStatus) error {
statusMsg := &models.RequestStatusMessage{Id: id, Status: status}
if _, err := v.statusPublisher.SendMessage(ctx, statusMsg); err != nil {
v.logger.Errorf("failed to send status message: %v, %v", statusMsg, err)
v.logger.Errorf("error sending status message: %v, %v", statusMsg, err)
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion services/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (w WorkerService) Run(ctx context.Context) {
return
case <-tick.C:
if err := w.createJobs(ctx); err != nil {
w.logger.Errorf("failed to create jobs: %v", err)
w.logger.Errorf("error creating jobs: %v", err)
}
}
}
Expand Down

0 comments on commit 57a65e4

Please sign in to comment.