Skip to content

Commit

Permalink
Merge branch 'traceparent'
Browse files Browse the repository at this point in the history
  • Loading branch information
kimtore committed Sep 17, 2024
2 parents d1704f5 + 4b017a4 commit a7fa98b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 191 deletions.
20 changes: 18 additions & 2 deletions cmd/deploy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/nais/deploy/pkg/pb"
"github.com/nais/deploy/pkg/telemetry"
"github.com/nais/deploy/pkg/version"
"go.opentelemetry.io/otel/attribute"
otrace "go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/encoding/protojson"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -52,15 +54,29 @@ func run() error {
}
}()

// Inherit traceparent from pipeline, if any
ctx := telemetry.WithTraceParent(programContext, cfg.Traceparent)
ctx, span := telemetry.Tracer().Start(ctx, "NAIS deploy", otrace.WithSpanKind(otrace.SpanKindClient))
defer span.End()

span.SetAttributes(attribute.KeyValue{
Key: "deploy.client.version",
Value: attribute.StringValue(version.Version()),
})

// Welcome
log.Infof("NAIS deploy %s", version.Version())
ts, err := version.BuildTime()
if err == nil {
span.SetAttributes(attribute.KeyValue{
Key: "deploy.client.build-time",
Value: attribute.StringValue(ts.Local().String()),
})
log.Infof("This version was built %s", ts.Local())
}

// Prepare request
request, err := deployclient.Prepare(programContext, cfg)
request, err := deployclient.Prepare(ctx, cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,5 +105,5 @@ func run() error {
return nil
}

return d.Deploy(programContext, cfg, request)
return d.Deploy(ctx, cfg, request)
}
12 changes: 2 additions & 10 deletions pkg/deployclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package deployclient

import (
"encoding/hex"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/nais/deploy/pkg/telemetry"
flag "github.com/spf13/pflag"
)

Expand All @@ -32,8 +30,7 @@ type Config struct {
Retry bool
RetryInterval time.Duration
Team string
TelemetryInput string
Telemetry *telemetry.PipelineTimings
Traceparent string
Timeout time.Duration
TracingDashboardURL string
OpenTelemetryCollectorURL string
Expand Down Expand Up @@ -61,7 +58,7 @@ func InitConfig(cfg *Config) {
flag.BoolVar(&cfg.Retry, "retry", getEnvBool("RETRY", true), "Retry deploy when encountering transient errors. (env RETRY)")
flag.StringVar(&cfg.Team, "team", os.Getenv("TEAM"), "Team making the deployment. Auto-detected from nais.yaml if possible. (env TEAM)")
flag.StringVar(&cfg.OpenTelemetryCollectorURL, "otel-collector-endpoint", getEnv("OTEL_COLLECTOR_ENDPOINT", DefaultOtelCollectorEndpoint), "OpenTelemetry collector endpoint. (env OTEL_COLLECTOR_ENDPOINT)")
flag.StringVar(&cfg.TelemetryInput, "telemetry", os.Getenv("TELEMETRY"), "Telemetry data from CI pipeline. (env TELEMETRY)")
flag.StringVar(&cfg.Traceparent, "traceparent", os.Getenv("TRACEPARENT"), "The W3C Trace Context traceparent value for the workflow run. (env TRACEPARENT)")
flag.DurationVar(&cfg.Timeout, "timeout", getEnvDuration("TIMEOUT", DefaultDeployTimeout), "Time to wait for successful deployment. (env TIMEOUT)")
flag.StringVar(&cfg.TracingDashboardURL, "tracing-dashboard-url", getEnv("TRACING_DASHBOARD_URL", DefaultTracingDashboardURL), "Base URL to Grafana tracing dashboard onto which the trace ID can be appended (env TRACING_DASHBOARD_URL)")
flag.StringSliceVar(&cfg.Variables, "var", getEnvStringSlice("VAR"), "Template variable in the form KEY=VALUE. Can be specified multiple times. (env VAR)")
Expand Down Expand Up @@ -138,10 +135,5 @@ func (cfg *Config) Validate() error {
return ErrMalformedAPIKey
}

cfg.Telemetry, err = telemetry.ParsePipelineTelemetry(cfg.TelemetryInput)
if err != nil {
return fmt.Errorf("%w: %w", ErrInvalidTelemetryFormat, err)
}

return nil
}
27 changes: 16 additions & 11 deletions pkg/deployclient/deployclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,43 +161,43 @@ func (d *Deployer) Deploy(ctx context.Context, cfg *Config, deployRequest *pb.De

// Root span for tracing.
// All sub-spans must be created from this context.
ctx, rootSpan := cfg.Telemetry.StartTracing(ctx)
defer rootSpan.End()
ctx, span := telemetry.Tracer().Start(ctx, "Send deploy request and wait for completion")
defer span.End()
deployRequest.TraceParent = telemetry.TraceParentHeader(ctx)

log.Infof("Sending deployment request to NAIS deploy at %s...", cfg.DeployServerURL)

sendDeploymentRequest := func() error {
ctx, span := telemetry.Tracer().Start(ctx, "Send to deploy server")
defer span.End()
requestContext, requestSpan := telemetry.Tracer().Start(ctx, "Send to deploy server")
defer requestSpan.End()

err = retryUnavailable(cfg.RetryInterval, cfg.Retry, func() error {
deployStatus, err = d.Client.Deploy(ctx, deployRequest)
deployStatus, err = d.Client.Deploy(requestContext, deployRequest)
return err
})

if err != nil {
code := grpcErrorCode(err)
err = fmt.Errorf(formatGrpcError(err))
if ctx.Err() != nil {
span.SetStatus(ocodes.Error, ctx.Err().Error())
return Errorf(ExitTimeout, "deployment timed out: %s", ctx.Err())
if requestContext.Err() != nil {
requestSpan.SetStatus(ocodes.Error, requestContext.Err().Error())
return Errorf(ExitTimeout, "deployment timed out: %s", requestContext.Err())
}
if code == codes.Unauthenticated {
if !strings.HasSuffix(cfg.Environment, ":"+cfg.Team) {
log.Warnf("hint: team %q does not match namespace in %q", cfg.Team, cfg.Environment)
}
}
span.SetStatus(ocodes.Error, err.Error())
requestSpan.SetStatus(ocodes.Error, err.Error())
return ErrorWrap(ExitNoDeployment, err)
}

log.Infof("Deployment request accepted by NAIS deploy and dispatched to cluster '%s'.", deployStatus.GetRequest().GetCluster())

deployRequest.ID = deployStatus.GetRequest().GetID()
telemetry.AddDeploymentRequestSpanAttributes(rootSpan, deployStatus.GetRequest())
telemetry.AddDeploymentRequestSpanAttributes(span, deployStatus.GetRequest())
traceID := telemetry.TraceID(ctx)
telemetry.AddDeploymentRequestSpanAttributes(requestSpan, deployStatus.GetRequest())
traceID := telemetry.TraceID(requestContext)

urlPrefix := "https://" + strings.Split(cfg.DeployServerURL, ":")[0]
log.Infof("Deployment information:")
Expand All @@ -214,7 +214,12 @@ func (d *Deployer) Deploy(ctx context.Context, cfg *Config, deployRequest *pb.De
}

err = sendDeploymentRequest()

// First handle errors that might have occurred with the request itself.
// Errors from underlying systems are handled later.
if err != nil {
span.SetStatus(ocodes.Error, err.Error())
span.RecordError(err)
return err
}

Expand Down
99 changes: 3 additions & 96 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ package telemetry

import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -124,104 +122,13 @@ func AddDeploymentRequestSpanAttributes(span otrace.Span, request *pb.Deployment
}, attribute.KeyValue{
Key: "deploy.repository",
Value: attribute.StringValue(request.GetRepository().FullName()),
}, attribute.KeyValue{
Key: "deploy.deadline",
Value: attribute.StringValue(request.GetDeadline().AsTime().Local().Format(time.RFC3339)),
},
)
}

// Holds timestamps from pipeline indicating when certain steps were started or finished.
// If `Validate()` returns nil, this object is safe to use and contains chronologically ordered timestamps
// for every field.
type PipelineTimings struct {
LatestCommit time.Time
Start time.Time
BuildStart time.Time
AttestStart time.Time
End time.Time
}

func (pt *PipelineTimings) Validate() error {
if pt.LatestCommit.After(pt.BuildStart) || pt.Start.After(pt.BuildStart) || pt.BuildStart.After(pt.AttestStart) || pt.AttestStart.After(pt.End) {
return fmt.Errorf("pipeline timings are not in expected chronological order, ensure that: latest_commit < pipeline_start < build_start < attest_start < pipeline_end")
}
return nil
}

func (pt *PipelineTimings) StartTracing(ctx context.Context) (context.Context, otrace.Span) {
if pt == nil {
return Tracer().Start(ctx, "Continuous integration pipeline", otrace.WithSpanKind(otrace.SpanKindClient))
}

rootCtx, rootSpan := Tracer().Start(ctx, "Continuous integration pipeline", otrace.WithTimestamp(pt.LatestCommit), otrace.WithSpanKind(otrace.SpanKindClient))
rootSpan.AddEvent("Latest commit to repository", otrace.WithTimestamp(pt.LatestCommit))
{
ciCtx, ciSpan := Tracer().Start(rootCtx, "Github Action: docker-build-push", otrace.WithTimestamp(pt.Start), otrace.WithSpanKind(otrace.SpanKindClient))
{
_, buildSpan := Tracer().Start(ciCtx, "Docker: Build and push", otrace.WithTimestamp(pt.BuildStart))
buildSpan.End(otrace.WithTimestamp(pt.AttestStart))
}
{
_, attestSpan := Tracer().Start(ciCtx, "SLSA: SBOM sign and attest", otrace.WithTimestamp(pt.AttestStart))
attestSpan.End(otrace.WithTimestamp(pt.End))
}
ciSpan.End(otrace.WithTimestamp(pt.End))
}

return rootCtx, rootSpan
}

// Parse pipeline build timings.
//
// Uses the following input format:
//
// latest_commit=1726040395,pipeline_start=1726050395,pipeline_end=1726050512,build_start=1726050400,attest_start=1726050492
//
// This output usually comes from `docker-build-push.steps.output.telemetry`.
//
// If there is no timing data, both return values will be nil.
// If all timing data is valid, returns a timings object and nil error.
func ParsePipelineTelemetry(s string) (*PipelineTimings, error) {
if len(s) == 0 {
return nil, nil
}

timings := &PipelineTimings{}
fragments := strings.Split(s, ",")
for _, keyValue := range fragments {
key, value, found := strings.Cut(keyValue, "=")
if !found {
return nil, fmt.Errorf("expected 'key=value', found '%s'", keyValue)
}

epoch, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("expected UNIX epoch, found '%s'", value)
}

ts := time.Unix(int64(epoch), 0)
ts = ts.UTC()

switch key {
case "latest_commit":
timings.LatestCommit = ts
case "pipeline_start":
timings.Start = ts
case "pipeline_end":
timings.End = ts
case "build_start":
timings.BuildStart = ts
case "attest_start":
timings.AttestStart = ts
default:
return nil, fmt.Errorf("expected key to be one of 'latest_commit', 'pipeline_start', 'pipeline_end', 'build_start', 'attest_start'; found '%s'", key)
}
}
err := timings.Validate()
if err != nil {
return nil, err
}
return timings, nil
}

func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
Expand Down
72 changes: 0 additions & 72 deletions pkg/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,11 @@ package telemetry_test
import (
"context"
"testing"
"time"

"github.com/nais/deploy/pkg/telemetry"
"github.com/stretchr/testify/assert"
)

func TestParsePipelineTelemetry(t *testing.T) {
t.Run("default case with five timings in correct order without quoting", func(t *testing.T) {
input := "latest_commit=1726040395,pipeline_start=1726050395,pipeline_end=1726050512,build_start=1726050400,attest_start=1726050492"
expected := &telemetry.PipelineTimings{
LatestCommit: time.Date(2024, time.September, 11, 7, 39, 55, 0, time.UTC),
Start: time.Date(2024, time.September, 11, 10, 26, 35, 0, time.UTC),
BuildStart: time.Date(2024, time.September, 11, 10, 26, 40, 0, time.UTC),
AttestStart: time.Date(2024, time.September, 11, 10, 28, 12, 0, time.UTC),
End: time.Date(2024, time.September, 11, 10, 28, 32, 0, time.UTC),
}
output, err := telemetry.ParsePipelineTelemetry(input)
assert.NoError(t, err)
assert.Equal(t, expected, output)
})

t.Run("missing some of the timings", func(t *testing.T) {
input := "pipeline_start=1726050395,pipeline_end=1726050512"
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "pipeline timings are not in expected chronological order, ensure that: latest_commit < pipeline_start < build_start < attest_start < pipeline_end")
assert.Nil(t, output)
})

t.Run("wrong timing order", func(t *testing.T) {
for _, input := range []string{
"pipeline_start=2,build_start=1",
"build_start=2,attest_start=1",
"attest_start=2,pipeline_end=1",
"pipeline_start=2,pipeline_end=1",
} {
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "pipeline timings are not in expected chronological order, ensure that: latest_commit < pipeline_start < build_start < attest_start < pipeline_end")
assert.Nil(t, output)
}
})

t.Run("unexpected timing parameter", func(t *testing.T) {
input := "pipeline_start=1,foobar=2"
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "expected key to be one of 'latest_commit', 'pipeline_start', 'pipeline_end', 'build_start', 'attest_start'; found 'foobar'")
assert.Nil(t, output)
})

t.Run("timing parameter not an integer", func(t *testing.T) {
input := "pipeline_start=2024-09-11"
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "expected UNIX epoch, found '2024-09-11'")
assert.Nil(t, output)
})

t.Run("parameter list missing value", func(t *testing.T) {
input := "pipeline_start=1,pipeline_end"
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "expected 'key=value', found 'pipeline_end'")
assert.Nil(t, output)
})

t.Run("parameter list missing key", func(t *testing.T) {
input := "pipeline_start=1,=2"
output, err := telemetry.ParsePipelineTelemetry(input)
assert.EqualError(t, err, "expected key to be one of 'latest_commit', 'pipeline_start', 'pipeline_end', 'build_start', 'attest_start'; found ''")
assert.Nil(t, output)
})

t.Run("no data", func(t *testing.T) {
input := ""
output, err := telemetry.ParsePipelineTelemetry(input)
assert.NoError(t, err)
assert.Nil(t, output)
})
}

func TestTraceID(t *testing.T) {
t.Run("happy case", func(t *testing.T) {
traceParentHeader := "00-ada6313c1a5b6ffdf0d085fadc3265cb-6018288557ffff51-01"
Expand Down

0 comments on commit a7fa98b

Please sign in to comment.