Skip to content

Commit

Permalink
Support upload outputs and use needs context (#133)
Browse files Browse the repository at this point in the history
See [Example usage of the needs context](https://docs.github.com/en/actions/learn-github-actions/contexts#example-usage-of-the-needs-context).

Related to:
- [actions-proto-def #5](https://gitea.com/gitea/actions-proto-def/pulls/5)
- [gitea #24230](go-gitea/gitea#24230)

Reviewed-on: https://gitea.com/gitea/act_runner/pulls/133
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Reviewed-by: Zettat123 <zettat123@noreply.gitea.io>
Co-authored-by: Jason Song <i@wolfogre.com>
Co-committed-by: Jason Song <i@wolfogre.com>
  • Loading branch information
wolfogre authored and lunny committed Apr 20, 2023
1 parent ed86e2f commit 83ec0ba
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 25 deletions.
12 changes: 4 additions & 8 deletions internal/app/run/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package run

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -112,16 +111,11 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.

reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())

workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
workflow, jobID, err := generateWorkflow(task)
if err != nil {
return err
}

jobIDs := workflow.GetJobIDs()
if len(jobIDs) != 1 {
return fmt.Errorf("multiple jobs found: %v", jobIDs)
}
jobID := jobIDs[0]
plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
if err != nil {
return err
Expand Down Expand Up @@ -209,5 +203,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
// add logger recorders
ctx = common.WithLoggerHook(ctx, reporter)

return executor(ctx)
execErr := executor(ctx)
reporter.SetOutputs(job.Outputs)
return execErr
}
54 changes: 54 additions & 0 deletions internal/app/run/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package run

import (
"bytes"
"fmt"
"sort"
"strings"

runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"github.com/nektos/act/pkg/model"
"gopkg.in/yaml.v3"
)

func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) {
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
if err != nil {
return nil, "", err
}

jobIDs := workflow.GetJobIDs()
if len(jobIDs) != 1 {
return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs)
}
jobID := jobIDs[0]

needJobIDs := make([]string, 0, len(task.Needs))
for id, need := range task.Needs {
needJobIDs = append(needJobIDs, id)
needJob := &model.Job{
Outputs: need.Outputs,
Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")),
}
workflow.Jobs[id] = needJob
}
sort.Strings(needJobIDs)

rawNeeds := yaml.Node{
Kind: yaml.SequenceNode,
Content: make([]*yaml.Node, 0, len(needJobIDs)),
}
for _, id := range needJobIDs {
rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{
Kind: yaml.ScalarNode,
Value: id,
})
}

workflow.Jobs[jobID].RawNeeds = rawNeeds

return workflow, jobID, nil
}
74 changes: 74 additions & 0 deletions internal/app/run/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package run

import (
"testing"

runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"github.com/nektos/act/pkg/model"
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)

func Test_generateWorkflow(t *testing.T) {
type args struct {
task *runnerv1.Task
}
tests := []struct {
name string
args args
assert func(t *testing.T, wf *model.Workflow)
want1 string
wantErr bool
}{
{
name: "has needs",
args: args{
task: &runnerv1.Task{
WorkflowPayload: []byte(`
name: Build and deploy
on: push
jobs:
job9:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: ./deploy --build ${{ needs.job1.outputs.output1 }}
- run: ./deploy --build ${{ needs.job2.outputs.output2 }}
`),
Needs: map[string]*runnerv1.TaskNeed{
"job1": {
Outputs: map[string]string{
"output1": "output1 value",
},
Result: runnerv1.Result_RESULT_SUCCESS,
},
"job2": {
Outputs: map[string]string{
"output2": "output2 value",
},
Result: runnerv1.Result_RESULT_SUCCESS,
},
},
},
},
assert: func(t *testing.T, wf *model.Workflow) {
assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"})
},
want1: "job9",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, err := generateWorkflow(tt.args.task)
require.NoError(t, err)
tt.assert(t, got)
assert.Equal(t, got1, tt.want1)
})
}
}
84 changes: 67 additions & 17 deletions internal/pkg/report/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ type Reporter struct {
logOffset int
logRows []*runnerv1.LogRow
logReplacer *strings.Replacer
state *runnerv1.TaskState
stateM sync.RWMutex

state *runnerv1.TaskState
stateMu sync.RWMutex
outputs sync.Map
}

func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
Expand All @@ -56,8 +58,8 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
}

func (r *Reporter) ResetSteps(l int) {
r.stateM.Lock()
defer r.stateM.Unlock()
r.stateMu.Lock()
defer r.stateMu.Unlock()
for i := 0; i < l; i++ {
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
Id: int64(i),
Expand All @@ -70,8 +72,8 @@ func (r *Reporter) Levels() []log.Level {
}

func (r *Reporter) Fire(entry *log.Entry) error {
r.stateM.Lock()
defer r.stateM.Unlock()
r.stateMu.Lock()
defer r.stateMu.Unlock()

log.WithFields(entry.Data).Trace(entry.Message)

Expand Down Expand Up @@ -155,9 +157,13 @@ func (r *Reporter) RunDaemon() {
}

func (r *Reporter) Logf(format string, a ...interface{}) {
r.stateM.Lock()
defer r.stateM.Unlock()
r.stateMu.Lock()
defer r.stateMu.Unlock()

r.logf(format, a...)
}

func (r *Reporter) logf(format string, a ...interface{}) {
if !r.duringSteps() {
r.logRows = append(r.logRows, &runnerv1.LogRow{
Time: timestamppb.Now(),
Expand All @@ -166,10 +172,30 @@ func (r *Reporter) Logf(format string, a ...interface{}) {
}
}

func (r *Reporter) SetOutputs(outputs map[string]string) {
r.stateMu.Lock()
defer r.stateMu.Unlock()

for k, v := range outputs {
if len(k) > 255 {
r.logf("ignore output because the key is too long: %q", k)
continue
}
if l := len(v); l > 1024*1024 {
log.Println("ignore output because the value is too long:", k, l)
r.logf("ignore output because the value %q is too long: %d", k, l)
}
if _, ok := r.outputs.Load(k); ok {
continue
}
r.outputs.Store(k, v)
}
}

func (r *Reporter) Close(lastWords string) error {
r.closed = true

r.stateM.Lock()
r.stateMu.Lock()
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
if lastWords == "" {
lastWords = "Early termination"
Expand All @@ -191,7 +217,7 @@ func (r *Reporter) Close(lastWords string) error {
Content: lastWords,
})
}
r.stateM.Unlock()
r.stateMu.Unlock()

return retry.Do(func() error {
if err := r.ReportLog(true); err != nil {
Expand All @@ -205,9 +231,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()

r.stateM.RLock()
r.stateMu.RLock()
rows := r.logRows
r.stateM.RUnlock()
r.stateMu.RUnlock()

resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
TaskId: r.state.Id,
Expand All @@ -224,10 +250,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
return fmt.Errorf("submitted logs are lost")
}

r.stateM.Lock()
r.stateMu.Lock()
r.logRows = r.logRows[ack-r.logOffset:]
r.logOffset = ack
r.stateM.Unlock()
r.stateMu.Unlock()

if noMore && ack < r.logOffset+len(rows) {
return fmt.Errorf("not all logs are submitted")
Expand All @@ -240,21 +266,45 @@ func (r *Reporter) ReportState() error {
r.clientM.Lock()
defer r.clientM.Unlock()

r.stateM.RLock()
r.stateMu.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateM.RUnlock()
r.stateMu.RUnlock()

outputs := make(map[string]string)
r.outputs.Range(func(k, v interface{}) bool {
if val, ok := v.(string); ok {
outputs[k.(string)] = val
}
return true
})

resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
State: state,
State: state,
Outputs: outputs,
}))
if err != nil {
return err
}

for _, k := range resp.Msg.SentOutputs {
r.outputs.Store(k, struct{}{})
}

if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
r.cancel()
}

var noSent []string
r.outputs.Range(func(k, v interface{}) bool {
if _, ok := v.(string); ok {
noSent = append(noSent, k.(string))
}
return true
})
if len(noSent) > 0 {
return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
}

return nil
}

Expand Down

0 comments on commit 83ec0ba

Please sign in to comment.