Skip to content

Commit

Permalink
chore: cleanup and add log
Browse files Browse the repository at this point in the history
  • Loading branch information
codebender37 committed Nov 1, 2024
1 parent dfbf2ba commit 22e63e6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func main() {
loadEnvVars()
go continuouslyReadEnv()
go orm.NewTaskORM().UpdateExpiredTasksInRaw(context.Background())
go orm.NewTaskORM().UpdateExpiredTasks(context.Background())
port := utils.LoadDotEnv("SERVER_PORT")
router := gin.Default()
// read allowedOrigins from environment variable which is a comma separated string
Expand Down
83 changes: 7 additions & 76 deletions pkg/orm/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,87 +202,15 @@ func (o *TaskORM) UpdateExpiredTasks(ctx context.Context) {
for range time.Tick(3 * time.Minute) {
log.Info().Msg("Checking for expired tasks")
o.clientWrapper.BeforeQuery()

// Fetch all expired tasks
tasks, err := o.dbClient.Task.
FindMany(
db.Task.ExpireAt.Lte(time.Now()),
db.Task.Status.Equals(db.TaskStatusInProgress),
).
With(db.Task.TaskResults.Fetch()).
OrderBy(db.Task.CreatedAt.Order(db.SortOrderDesc)).
Exec(ctx)
if err != nil {
log.Error().Err(err).Msg("Error in fetching expired tasks")
continue
}

if len(tasks) == 0 {
log.Info().Msg("No newly expired tasks to update, skipping...")
continue
} else {
log.Info().Msgf("Fetched %v newly expired tasks", len(tasks))
}

// Process tasks in batches without transactions
batchSize := 100
for i := 0; i < len(tasks); i += batchSize {
end := i + batchSize
if end > len(tasks) {
end = len(tasks)
}
batch := tasks[i:end]

// TODO - Need to refactor this
// Update or delete tasks in the current batch
for _, taskModel := range batch {
taskResults := taskModel.TaskResults()
if len(taskResults) == 0 {
// Delete the task if no TaskResults are found
_, err := o.dbClient.Task.FindUnique(
db.Task.ID.Equals(taskModel.ID),
).Delete().Exec(ctx)
if err != nil {
log.Error().Err(err).Msgf("Error deleting task ID %v", taskModel.ID)
} else {
log.Info().Msgf("Deleted task ID %v as it has no associated TaskResults", taskModel.ID)
}
} else {
// Update the task status to expired if TaskResults exist
_, err := o.dbClient.Task.FindUnique(
db.Task.ID.Equals(taskModel.ID),
).Update(
db.Task.Status.Set(db.TaskStatusExpired),
db.Task.UpdatedAt.Set(time.Now()),
).Exec(ctx)
if err != nil {
log.Error().Err(err).Msgf("Error updating task ID %v to expired", taskModel.ID)
}
}
}

log.Info().Msgf("Updated batch of %v tasks to expired status", len(batch))
}

o.clientWrapper.AfterQuery()
}
}

func (o *TaskORM) UpdateExpiredTasksInRaw(ctx context.Context) {
for range time.Tick(1 * time.Minute) {
log.Info().Msg("Checking for expired tasks")
o.clientWrapper.BeforeQuery()
defer o.clientWrapper.AfterQuery()

currentTime := time.Now()
batchSize := 100 // Adjust batch size based on database performance

// Format the status values with single quotes
// statusInProgress := fmt.Sprintf("'%v'", db.TaskStatusInProgress)
// statusExpired := fmt.Sprintf("'%v'", db.TaskStatusExpired)

// Step 1: Delete expired tasks without TaskResults in batches
batchNumber := 0
for {
batchNumber++
deleteQuery := `
DELETE FROM "Task"
WHERE "id" IN (
Expand All @@ -307,11 +235,14 @@ func (o *TaskORM) UpdateExpiredTasksInRaw(ctx context.Context) {
break
}

log.Info().Msgf("Deleted %v expired tasks without associated TaskResults", execResult.Count)
log.Info().Msgf("Deleted %v expired tasks without associated TaskResults in batch %d", execResult.Count, batchNumber)

}

// Step 2: Update expired tasks with TaskResults to 'expired' status in batches
batchNumber = 0
for {
batchNumber++
updateQuery := `
UPDATE "Task"
SET "status" = $1::"TaskStatus", "updated_at" = $2
Expand All @@ -336,7 +267,7 @@ func (o *TaskORM) UpdateExpiredTasksInRaw(ctx context.Context) {
break
}

log.Info().Msgf("Updated %v expired tasks with associated TaskResults to 'expired' status", execResult.Count)
log.Info().Msgf("Updated %v expired tasks with associated TaskResults to 'expired' status in batch %d", execResult.Count, batchNumber)
}
}
}
Expand Down

0 comments on commit 22e63e6

Please sign in to comment.