Skip to content

Commit

Permalink
WIP Job Verification
Browse files Browse the repository at this point in the history
  • Loading branch information
moskyb committed Jul 25, 2023
1 parent 86a36a6 commit 856cce4
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 120 deletions.
47 changes: 26 additions & 21 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,32 @@ package agent
// AgentConfiguration is the run-time configuration for an agent that
// has been loaded from the config file and command-line params
type AgentConfiguration struct {
ConfigPath string
BootstrapScript string
BuildPath string
HooksPath string
SocketsPath string
GitMirrorsPath string
GitMirrorsLockTimeout int
GitMirrorsSkipUpdate bool
PluginsPath string
GitCheckoutFlags string
GitCloneFlags string
GitCloneMirrorFlags string
GitCleanFlags string
GitFetchFlags string
GitSubmodules bool
SSHKeyscan bool
CommandEval bool
PluginsEnabled bool
PluginValidation bool
LocalHooksEnabled bool
RunInPty bool
ConfigPath string
BootstrapScript string
BuildPath string
HooksPath string
SocketsPath string
GitMirrorsPath string
GitMirrorsLockTimeout int
GitMirrorsSkipUpdate bool
PluginsPath string
GitCheckoutFlags string
GitCloneFlags string
GitCloneMirrorFlags string
GitCleanFlags string
GitFetchFlags string
GitSubmodules bool
SSHKeyscan bool
CommandEval bool
PluginsEnabled bool
PluginValidation bool
LocalHooksEnabled bool
RunInPty bool

JobVerificationKeyPath string
JobVerificationNoSignatureBehavior string
JobVerificationInvalidSignatureBehavior string

ANSITimestamps bool
TimestampLines bool
HealthCheckAddr string
Expand Down
66 changes: 66 additions & 0 deletions agent/integration/job_verification_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package integration

import (
"os"
"testing"

"github.com/buildkite/agent/v3/agent"
"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/internal/pipeline"
)

func Test_WhenJobSignatureIsInvalid_AndInvalidSignatureBehaviorIsFail_ItRefusesTheJob(t *testing.T) {
t.Parallel()

keyFile, err := os.CreateTemp("", "keyfile")
if err != nil {
t.Fatalf("making keyfile: %v", err)
}
defer os.Remove(keyFile.Name())

_, err = keyFile.Write([]byte("llamasrock"))
if err != nil {
t.Fatalf("writing keyfile: %v", err)
}

jobID := "my-job-id"
j := &api.Job{
ID: jobID,
ChunksMaxSizeBytes: 1024,
Step: pipeline.CommandStep{
Command: "echo hello world",
Signature: &pipeline.Signature{
Algorithm: "hmac-sha256",
SignedFields: []string{"command"},
Value: "not-the-real-signature",
},
},
Env: map[string]string{
"BUILDKITE_COMMAND": "echo hello world",
},
}

// create a mock agent API
e := createTestAgentEndpoint()
server := e.server("my-job-id")
defer server.Close()

mb := mockBootstrap(t)
mb.Expect().NotCalled() // The bootstrap won't be called, as the pre-bootstrap hook failed
defer mb.CheckAndClose(t)

runJob(t, j, server, agent.AgentConfiguration{
JobVerificationKeyPath: keyFile.Name(),
JobVerificationInvalidSignatureBehavior: "block",
}, mb)

job := e.finishesFor(t, jobID)[0]

if got, want := job.ExitStatus, "-1"; got != want {
t.Errorf("job.ExitStatus = %q, want %q", got, want)
}

if got, want := job.SignalReason, "agent_refused"; got != want {
t.Errorf("job.SignalReason = %q, want %q", got, want)
}
}
3 changes: 2 additions & 1 deletion agent/integration/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func (t *testAgentEndpoint) server(jobID string) *httptest.Server {
case "/jobs/" + jobID + "/start":
rw.WriteHeader(http.StatusOK)
case "/jobs/" + jobID + "/chunks":
rw.WriteHeader(http.StatusCreated)
req.
rw.WriteHeader(http.StatusCreated)
case "/jobs/" + jobID + "/finish":
rw.WriteHeader(http.StatusOK)
default:
Expand Down
120 changes: 37 additions & 83 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (

// BuildkiteMessageName is the env var name of the build/commit message.
BuildkiteMessageName = "BUILDKITE_MESSAGE"

JobVerificationBehaviourWarn = "warn"
JobVerificationBehaviourBlock = "block"
)

// Certain env can only be set by agent configuration.
Expand Down Expand Up @@ -105,6 +108,10 @@ type JobRunner struct {
// The configuration for the job runner
conf JobRunnerConfig

// How the JobRunner should respond in various signature failure modes
JobVerificationInvalidSignatureBehavior string
JobVerificationNoSignatureBehavior string

// The logger to use
logger logger.Logger

Expand Down Expand Up @@ -158,6 +165,17 @@ func NewJobRunner(l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (j
apiClient: apiClient,
}

var err error
r.JobVerificationInvalidSignatureBehavior, err = r.normalizeJobVerificationBehavior(conf.AgentConfiguration.JobVerificationInvalidSignatureBehavior)
if err != nil {
return nil, fmt.Errorf("setting invalid signature behavior: %w", err)
}

r.JobVerificationNoSignatureBehavior, err = r.normalizeJobVerificationBehavior(conf.AgentConfiguration.JobVerificationNoSignatureBehavior)
if err != nil {
return nil, fmt.Errorf("setting no signature behavior: %w", err)
}

if conf.JobStatusInterval == 0 {
conf.JobStatusInterval = 1 * time.Second
}
Expand Down Expand Up @@ -356,6 +374,25 @@ func NewJobRunner(l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (j
return r, nil
}

func (r *JobRunner) normalizeJobVerificationBehavior(behavior string) (string, error) {
if r.conf.AgentConfiguration.JobVerificationKeyPath == "" {
// We won't be verifying jobs, so it doesn't matter
return "if you're seeing this string, there's a problem with the job verification code in the agent. contact support@buildkite.com", nil
}

switch behavior {
case JobVerificationBehaviourBlock, JobVerificationBehaviourWarn:
return behavior, nil
case "":
// TODO: Should we have a default behavior? "warn" is easy, but less secure. "block" is more secure, but has some
// sharp edges when it comes to initial implementation
return JobVerificationBehaviourBlock, nil
default:
return "", fmt.Errorf("invalid job verification behavior: %q", behavior)
}

}

// Creates the environment variables that will be used in the process and writes a flat environment file
func (r *JobRunner) createEnvironment() ([]string, error) {
// Create a clone of our jobs environment. We'll then set the
Expand Down Expand Up @@ -574,89 +611,6 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error {
})
}

// finishJob finishes the job in the Buildkite Agent API. If the FinishJob call
// cannot return successfully, this will retry for a long time.
func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit processExit, failedChunkCount int) error {
r.conf.Job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano)
r.conf.Job.ExitStatus = strconv.Itoa(exit.Status)
r.conf.Job.Signal = exit.Signal
r.conf.Job.SignalReason = exit.SignalReason
r.conf.Job.ChunksFailedCount = failedChunkCount

r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s",
r.conf.Job.ExitStatus, r.conf.Job.Signal, r.conf.Job.SignalReason)

ctx, cancel := context.WithTimeout(ctx, 48*time.Hour)
defer cancel()

return roko.NewRetrier(
roko.TryForever(),
roko.WithJitter(),
roko.WithStrategy(roko.Constant(1*time.Second)),
).DoWithContext(ctx, func(retrier *roko.Retrier) error {
response, err := r.apiClient.FinishJob(ctx, r.conf.Job)
if err != nil {
// If the API returns with a 422, that means that we
// succesfully tried to finish the job, but Buildkite
// rejected the finish for some reason. This can
// sometimes mean that Buildkite has cancelled the job
// before we get a chance to send the final API call
// (maybe this agent took too long to kill the
// process). In that case, we don't want to keep trying
// to finish the job forever so we'll just bail out and
// go find some more work to do.
if response != nil && response.StatusCode == 422 {
r.logger.Warn("Buildkite rejected the call to finish the job (%s)", err)
retrier.Break()
} else {
r.logger.Warn("%s (%s)", err, retrier)
}
}

return err
})
}

// jobLogStreamer waits for the process to start, then grabs the job output
// every few seconds and sends it back to Buildkite.
func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) {
ctx, setStat, done := status.AddSimpleItem(ctx, "Job Log Streamer")
defer done()
setStat("🏃 Starting...")

defer func() {
wg.Done()
r.logger.Debug("[JobRunner] Routine that processes the log has finished")
}()

select {
case <-r.process.Started():
case <-ctx.Done():
return
}

for {
setStat("📨 Sending process output to log streamer")

// Send the output of the process to the log streamer
// for processing
r.logStreamer.Process(r.output.ReadAndTruncate())

setStat("😴 Sleeping for a bit")

// Sleep for a bit, or until the job is finished
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
return
case <-r.process.Done():
return
}
}

// The final output after the process has finished is processed in Run().
}

// jobCancellationChecker waits for the processs to start, then continuously
// polls GetJobState to see if the job has been cancelled server-side. If so,
// it calls r.Cancel.
Expand Down
Loading

0 comments on commit 856cce4

Please sign in to comment.