diff --git a/pkg/orm/task.go b/pkg/orm/task.go index 605a11b..fdb6210 100644 --- a/pkg/orm/task.go +++ b/pkg/orm/task.go @@ -238,54 +238,17 @@ func (o *TaskORM) countTasksByWorkerSubscription(ctx context.Context, taskTypes return totalTasks, nil } -// check every 10 mins for expired tasks +// Check every 10 mins for expired tasks func (o *TaskORM) UpdateExpiredTasks(ctx context.Context) { - for range time.Tick(3 * time.Minute) { + for range time.Tick(40 * time.Second) { log.Info().Msg("Checking for expired tasks") o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() - currentTime := time.Now() + currentTime := time.Now().UTC() batchSize := 100 // Adjust batch size based on database performance - - // Step 1: Delete expired tasks without TaskResults in batches batchNumber := 0 - startTime := time.Now() // Start timing for delete operation - for { - batchNumber++ - deleteQuery := ` - DELETE FROM "Task" - WHERE "id" IN ( - SELECT "id" FROM "Task" - WHERE "expire_at" <= $1 - AND "status" IN ($2::"TaskStatus", $3::"TaskStatus") - AND "id" NOT IN (SELECT DISTINCT "task_id" FROM "TaskResult") - LIMIT $4 - ) - ` - - // has to include TaskStatusInProgress, to handle Task with in-progress with no results - params := []interface{}{currentTime, db.TaskStatusInProgress, db.TaskStatusExpired, batchSize} - - execResult, err := o.dbClient.Prisma.ExecuteRaw(deleteQuery, params...).Exec(ctx) - if err != nil { - log.Error().Err(err).Msg("Error deleting tasks without TaskResults") - break - } - - if execResult.Count == 0 { - log.Info().Msg("No more expired tasks to delete without TaskResults") - break - } - - log.Info().Msgf("Deleted %v expired tasks without associated TaskResults in batch %d", execResult.Count, batchNumber) - } - deleteDuration := time.Since(startTime) // Calculate total duration for delete operation - log.Info().Msgf("Total time taken to delete expired tasks without TaskResults: %s", deleteDuration) - - // Step 2: Update expired tasks with TaskResults to 'expired' status in batches - batchNumber = 0 - startTime = time.Now() // Start timing for update operation + startTime := time.Now().UTC() // Start timing for update operation for { batchNumber++ updateQuery := `