Skip to content

Commit

Permalink
refactor(blooms): Improve task progress logging in builder (#13394)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jul 8, 2024
1 parent 304db10 commit 2affa48
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 28 deletions.
81 changes: 53 additions & 28 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,23 +191,27 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
return fmt.Errorf("failed to receive task from planner: %w", err)
}

logger := log.With(b.logger, "task", protoTask.Task.Id)

b.metrics.taskStarted.Inc()
start := time.Now()
status := statusSuccess

newMetas, err := b.processTask(c.Context(), protoTask.Task)
task, err := protos.FromProtoTask(protoTask.Task)
if err != nil {
status = statusFailure
level.Error(logger).Log("msg", "failed to process task", "err", err)
task = &protos.Task{ID: protoTask.Task.Id}
err = fmt.Errorf("failed to convert proto task to task: %w", err)
b.logTaskCompleted(task, nil, err, start)
if err = b.notifyTaskCompletedToPlanner(c, task, nil, err); err != nil {
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
continue
}

b.metrics.taskCompleted.WithLabelValues(status).Inc()
b.metrics.taskDuration.WithLabelValues(status).Observe(time.Since(start).Seconds())
newMetas, err := b.processTask(c.Context(), task)
if err != nil {
err = fmt.Errorf("failed to process task: %w", err)
}

// Acknowledge task completion to planner
if err = b.notifyTaskCompletedToPlanner(c, protoTask.Task.Id, newMetas, err); err != nil {
b.logTaskCompleted(task, newMetas, err, start)
if err = b.notifyTaskCompletedToPlanner(c, task, newMetas, err); err != nil {
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
}
Expand All @@ -216,14 +220,44 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
return nil
}

func (b *Builder) logTaskCompleted(
task *protos.Task,
metas []bloomshipper.Meta,
err error,
start time.Time,
) {
logger := task.GetLogger(b.logger)

if err != nil {
b.metrics.taskCompleted.WithLabelValues(statusFailure).Inc()
b.metrics.taskDuration.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
level.Debug(logger).Log(
"msg", "task failed",
"duration", time.Since(start).String(),
"err", err,
)
return
}

b.metrics.taskCompleted.WithLabelValues(statusSuccess).Inc()
b.metrics.taskDuration.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(start).String(),
"metas", len(metas),
)
}

func (b *Builder) notifyTaskCompletedToPlanner(
c protos.PlannerForBuilder_BuilderLoopClient,
taskID string,
task *protos.Task,
metas []bloomshipper.Meta,
err error,
) error {
logger := task.GetLogger(b.logger)

result := &protos.TaskResult{
TaskID: taskID,
TaskID: task.ID,
Error: err,
CreatedMetas: metas,
}
Expand All @@ -239,14 +273,15 @@ func (b *Builder) notifyTaskCompletedToPlanner(
break
}

level.Error(b.logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err)
level.Error(logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}

level.Debug(logger).Log("msg", "acknowledged task completion to planner")
return nil
}

Expand All @@ -261,28 +296,18 @@ func (b *Builder) notifyTaskCompletedToPlanner(
// accelerate bloom generation for the new blocks.
func (b *Builder) processTask(
ctx context.Context,
protoTask *protos.ProtoTask,
task *protos.Task,
) ([]bloomshipper.Meta, error) {
task, err := protos.FromProtoTask(protoTask)
if err != nil {
return nil, fmt.Errorf("failed to convert proto task to task: %w", err)
}
tenant := task.Tenant
logger := task.GetLogger(b.logger)
level.Debug(logger).Log("msg", "task started")

client, err := b.bloomStore.Client(task.Table.ModelTime())
if err != nil {
level.Error(b.logger).Log("msg", "failed to get client", "err", err)
level.Error(logger).Log("msg", "failed to get client", "err", err)
return nil, fmt.Errorf("failed to get client: %w", err)
}

tenant := task.Tenant
logger := log.With(
b.logger,
"tenant", tenant,
"task", task.ID,
"tsdb", task.TSDB.Name(),
)
level.Debug(logger).Log("msg", "received task")

blockEnc, err := chunkenc.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/bloombuild/protos/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package protos
import (
"fmt"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

Expand Down Expand Up @@ -126,6 +127,15 @@ func (t *Task) ToProtoTask() *ProtoTask {
}
}

func (t *Task) GetLogger(logger log.Logger) log.Logger {
return log.With(logger,
"task", t.ID,
"tenant", t.Tenant,
"table", t.Table.String(),
"tsdb", t.TSDB.Name(),
)
}

type TaskResult struct {
TaskID string
Error error
Expand Down

0 comments on commit 2affa48

Please sign in to comment.