Skip to content

Commit

Permalink
Wait for tracer to be done before finishing workflow (#4068)
Browse files Browse the repository at this point in the history
  • Loading branch information
anbraten authored Aug 30, 2024
1 parent e2a43e8 commit 599dd97
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 19 deletions.
18 changes: 9 additions & 9 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
}

// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = f.Labels
req.Filter.Labels = filter.Labels
for {
res, err = c.client.Next(ctx, req)
if err == nil {
Expand Down Expand Up @@ -135,10 +135,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error)
}

// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Wait(ctx, req)
if err == nil {
Expand Down Expand Up @@ -273,10 +273,10 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
}

// Extend extends the workflow deadline.
func (c *client) Extend(ctx context.Context, id string) (err error) {
func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Extend(ctx, req)
if err == nil {
Expand Down Expand Up @@ -317,10 +317,10 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
}

// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) {
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.Id = workflowID
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er
return nil
}

// Log writes the workflow log entry.
// Log writes the step log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
Expand Down
10 changes: 4 additions & 6 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")

cancel()
} else {
logger.Debug().Msg("done listening for cancel signal")
Expand All @@ -117,11 +116,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
select {
case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done")

return

case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed")

if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
}
Expand All @@ -144,7 +142,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{
"workflow_id": workflow.ID,
Expand All @@ -170,9 +168,9 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
Bool("canceled", canceled).
Msg("workflow finished")

logger.Debug().Msg("uploading logs ...")
logger.Debug().Msg("uploading logs and traces / states ...")
uploads.Wait()
logger.Debug().Msg("uploaded logs")
logger.Debug().Msg("uploaded logs and traces / states")

logger.Debug().
Str("error", state.Error).
Expand Down
8 changes: 6 additions & 2 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"runtime"
"strconv"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -26,11 +27,13 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)

func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
uploads.Add(1)

stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Str("workflowID", workflow.ID).
Str("workflow_id", workflow.ID).
Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited).
Expand All @@ -57,6 +60,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
}

stepLogger.Debug().Msg("update step status complete")
uploads.Done()
}()
if state.Process.Exited {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
return nil, nil
}

// Some pipeline backends, such as local, will close the pipe from Tail on Wait,
// so first make sure all reading has finished.
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()

waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down

0 comments on commit 599dd97

Please sign in to comment.