Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add taskRef remote resolution support #4859

Merged
merged 1 commit into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/taskruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ cli *(coming soon)*.

**([alpha only](https://github.com/tektoncd/pipeline/blob/main/docs/install.md#alpha-features))**

**Warning: This feature is still in very early stage of development and is not yet functional. Do not use it.**

A `taskRef` field may specify a Task in a remote location such as git.
Support for specific types of remote will depend on the Resolvers your
cluster's operator has installed. The below example demonstrates
Expand All @@ -179,7 +177,7 @@ spec:
taskRef:
resolver: git
resource:
- name: repo
- name: url
value: https://github.com/tektoncd/catalog.git
- name: commit
value: abc123
Expand Down
21 changes: 11 additions & 10 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (pt PipelineTask) validateBundle() (errs *apis.FieldError) {

// validateTask validates a pipeline task or a final task for taskRef and taskSpec
func (pt PipelineTask) validateTask(ctx context.Context) (errs *apis.FieldError) {
cfg := config.FromContextOrDefaults(ctx)
// Validate TaskSpec if it's present
if pt.TaskSpec != nil {
errs = errs.Also(pt.TaskSpec.Validate(ctx).ViaField("taskSpec"))
Expand All @@ -287,21 +288,21 @@ func (pt PipelineTask) validateTask(ctx context.Context) (errs *apis.FieldError)
if errSlice := validation.IsQualifiedName(pt.TaskRef.Name); len(errSlice) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(errSlice, ","), "name"))
}
} else {
} else if pt.TaskRef.Resolver == "" {
errs = errs.Also(apis.ErrInvalidValue("taskRef must specify name", "taskRef.name"))
}
// fail if bundle is present when EnableTektonOCIBundles feature flag is off (as it won't be allowed nor used)
if pt.TaskRef.Bundle != "" {
if !cfg.FeatureFlags.EnableTektonOCIBundles && pt.TaskRef.Bundle != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.bundle"))
}
// fail if resolver or resource are present regardless
// of enabled api fields because remote resolution is
// not implemented yet for PipelineTasks.
if pt.TaskRef.Resolver != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resolver"))
}
if len(pt.TaskRef.Resource) > 0 {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resource"))
if cfg.FeatureFlags.EnableAPIFields != config.AlphaAPIFields {
// fail if resolver or resource are present when enable-api-fields is false.
if pt.TaskRef.Resolver != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resolver"))
}
if len(pt.TaskRef.Resource) > 0 {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resource"))
}
}
}
return errs
Expand Down
44 changes: 37 additions & 7 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@ func TestPipelineTask_ValidateBundle_Failure(t *testing.T) {

func TestPipelineTask_ValidateRegularTask_Success(t *testing.T) {
tests := []struct {
name string
tasks PipelineTask
name string
tasks PipelineTask
enableAPIFields bool
enableBundles bool
}{{
name: "pipeline task - valid taskRef name",
tasks: PipelineTask{
Expand All @@ -257,10 +259,40 @@ func TestPipelineTask_ValidateRegularTask_Success(t *testing.T) {
Name: "foo",
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
},
}, {
name: "pipeline task - use of resolver with the feature flag set",
tasks: PipelineTask{
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resolver: "bar"}},
},
enableAPIFields: true,
}, {
name: "pipeline task - use of resource with the feature flag set",
tasks: PipelineTask{
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resource: []ResolverParam{{}}}},
},
enableAPIFields: true,
}, {
name: "pipeline task - use of bundle with the feature flag set",
tasks: PipelineTask{
Name: "foo",
TaskRef: &TaskRef{Name: "bar", Bundle: "docker.io/foo"},
},
enableBundles: true,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.tasks.validateTask(context.Background())
ctx := context.Background()
cfg := &config.Config{
FeatureFlags: &config.FeatureFlags{},
}
if tt.enableAPIFields {
cfg.FeatureFlags.EnableAPIFields = config.AlphaAPIFields
}
if tt.enableBundles {
cfg.FeatureFlags.EnableTektonOCIBundles = true
}
ctx = config.ToContext(ctx, cfg)
err := tt.tasks.validateTask(ctx)
if err != nil {
t.Errorf("PipelineTask.validateTask() returned error for valid pipeline task: %v", err)
}
Expand Down Expand Up @@ -311,16 +343,14 @@ func TestPipelineTask_ValidateRegularTask_Failure(t *testing.T) {
},
expectedError: *apis.ErrDisallowedFields("taskref.bundle"),
}, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some test cases for use of resolver and resource with the alpha feature flag set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will do.

name: "pipeline task - use of resolver",
name: "pipeline task - use of resolver without the feature flag set",
task: PipelineTask{
Name: "foo",
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resolver: "bar"}},
},
expectedError: *apis.ErrDisallowedFields("taskref.resolver"),
}, {
name: "pipeline task - use of resource",
name: "pipeline task - use of resource without the feature flag set",
task: PipelineTask{
Name: "foo",
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resource: []ResolverParam{{}}}},
},
expectedError: *apis.ErrDisallowedFields("taskref.resource"),
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ const (
TaskRunReasonCancelled TaskRunReason = "TaskRunCancelled"
// TaskRunReasonTimedOut is the reason set when the Taskrun has timed out
TaskRunReasonTimedOut TaskRunReason = "TaskRunTimeout"
// TaskRunReasonResolvingTaskRef indicates that the TaskRun is waiting for
// its taskRef to be asynchronously resolved.
TaskRunReasonResolvingTaskRef = "ResolvingTaskRef"
)

func (t TaskRunReason) String() string {
Expand Down
16 changes: 14 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ func (c *Reconciler) resolvePipelineState(
pst := resources.PipelineRunState{}
// Resolve each task individually because they each could have a different reference context (remote or local).
for _, task := range tasks {
fn, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, task.TaskRef, pr.Namespace, pr.Spec.ServiceAccountName)
// We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask
// in the TaskRun reconciler.
trName := resources.GetTaskRunName(pr.Status.TaskRuns, pr.Status.ChildReferences, task.Name, pr.Name)
fn, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, c.resolutionRequester, pr, task.TaskRef, trName, pr.Namespace, pr.Spec.ServiceAccountName)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonCouldntGetTask, "Pipeline %s/%s can't be Run; task %s could not be fetched: %s",
Expand All @@ -303,6 +306,9 @@ func (c *Reconciler) resolvePipelineState(
if tresources.IsGetTaskErrTransient(err) {
return nil, err
}
if errors.Is(err, remote.ErrorRequestInProgress) {
return nil, err
}
switch err := err.(type) {
case *resources.TaskNotFoundError:
pr.Status.MarkFailed(ReasonCouldntGetTask,
Expand Down Expand Up @@ -467,8 +473,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta, pr, providedResources)
if err != nil {
switch {
case errors.Is(err, remote.ErrorRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1beta1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}

// Build PipelineRunFacts with a list of resolved pipeline tasks,
Expand Down
78 changes: 78 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7178,6 +7178,84 @@ spec:
checkPipelineRunConditionStatusAndReason(t, updatedPipelineRun, corev1.ConditionUnknown, v1beta1.PipelineRunReasonRunning.String())
}

// TestReconcileWithTaskResolver checks that a PipelineRun with a populated Resolver
// field for a Task creates a ResolutionRequest object for that Resolver's type, and
// that when the request is successfully resolved the PipelineRun begins running.
func TestReconcileWithTaskResolver(t *testing.T) {
resolverName := "foobar"
pr := parse.MustParsePipelineRun(t, `
metadata:
name: pr
namespace: default
spec:
pipelineSpec:
tasks:
- name: some-task
taskRef:
resolver: foobar
serviceAccountName: default
`)

cms := []*corev1.ConfigMap{withEnabledAlphaAPIFields(newFeatureFlagsConfigMap())}

d := test.Data{
ConfigMaps: cms,
PipelineRuns: []*v1beta1.PipelineRun{pr},
ServiceAccounts: []*corev1.ServiceAccount{{
ObjectMeta: metav1.ObjectMeta{Name: pr.Spec.ServiceAccountName, Namespace: "foo"},
}},
}

prt := newPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string(nil)
pipelinerun, _ := prt.reconcileRun(pr.Namespace, pr.Name, wantEvents, false)
checkPipelineRunConditionStatusAndReason(t, pipelinerun, corev1.ConditionUnknown, v1beta1.TaskRunReasonResolvingTaskRef)

client := prt.TestAssets.Clients.ResolutionRequests.ResolutionV1alpha1().ResolutionRequests("default")
resolutionrequests, err := client.List(prt.TestAssets.Ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error listing resource requests: %v", err)
}
numResolutionRequests := len(resolutionrequests.Items)
if numResolutionRequests != 1 {
t.Fatalf("expected exactly 1 resource request but found %d", numResolutionRequests)
}

resreq := &resolutionrequests.Items[0]
resolutionRequestType := resreq.ObjectMeta.Labels["resolution.tekton.dev/type"]
if resolutionRequestType != resolverName {
t.Fatalf("expected resource request type %q but saw %q", resolutionRequestType, resolverName)
}

taskBytes := []byte(`
kind: Task
apiVersion: tekton.dev/v1beta1
metadata:
name: foo
spec:
steps:
- name: step1
image: ubuntu
script: |
echo "hello world!"
`)

resreq.Status.ResolutionRequestStatusFields.Data = base64.StdEncoding.Strict().EncodeToString(taskBytes)
resreq.Status.MarkSucceeded()
resreq, err = client.UpdateStatus(prt.TestAssets.Ctx, resreq, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error updating resource request with resolved pipeline data: %v", err)
}

// Check that the resolved pipeline was recognized by the
// PipelineRun reconciler and that the PipelineRun has now
// started executing.
updatedPipelineRun, _ := prt.reconcileRun("default", "pr", nil, false)
checkPipelineRunConditionStatusAndReason(t, updatedPipelineRun, corev1.ConditionUnknown, v1beta1.PipelineRunReasonRunning.String())
}

func getTaskRunWithTaskSpec(tr, pr, p, t string, labels, annotations map[string]string) *v1beta1.TaskRun {
om := taskRunObjectMeta(tr, "foo", pr, p, t, false)
for k, v := range labels {
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/resources/pipelineref.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func GetPipelineFunc(ctx context.Context, k8s kubernetes.Interface, tekton clien
for _, p := range pr.Resource {
params[p.Name] = p.Value
}
resolver := resolution.NewResolver(requester, pipelineRun, string(pr.Resolver), params)
resolver := resolution.NewResolver(requester, pipelineRun, string(pr.Resolver), "", "", params)
return resolvePipeline(ctx, resolver, name)
}, nil
default:
Expand Down
23 changes: 14 additions & 9 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package resources

import (
"context"
"errors"
"fmt"
"strconv"

"knative.dev/pkg/kmeta"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/list"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/tektoncd/pipeline/pkg/remote"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"
)

const (
Expand Down Expand Up @@ -482,7 +483,7 @@ func ResolvePipelineRunTask(
if rprt.IsCustomTask() {
rprt.RunName = getRunName(pipelineRun.Status.Runs, pipelineRun.Status.ChildReferences, task.Name, pipelineRun.Name)
run, err := getRun(rprt.RunName)
if err != nil && !errors.IsNotFound(err) {
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving Run %s: %w", rprt.RunName, err)
}
rprt.Run = run
Expand All @@ -500,7 +501,7 @@ func ResolvePipelineRunTask(

taskRun, err := getTaskRun(rprt.TaskRunName)
if err != nil {
if !errors.IsNotFound(err) {
if !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving TaskRun %s: %w", rprt.TaskRunName, err)
}
}
Expand All @@ -515,14 +516,18 @@ func ResolvePipelineRunTask(
taskName = task.TaskRef.Name
} else {
t, err = getTask(ctx, task.TaskRef.Name)
if err != nil {
switch {
case errors.Is(err, remote.ErrorRequestInProgress):
return nil, err
case err != nil:
return nil, &TaskNotFoundError{
Name: task.TaskRef.Name,
Msg: err.Error(),
}
default:
spec = t.TaskSpec()
taskName = t.TaskMetadata().Name
}
spec = t.TaskSpec()
taskName = t.TaskMetadata().Name
}
kind = task.TaskRef.Kind
} else {
Expand Down Expand Up @@ -623,7 +628,7 @@ func resolveConditionChecks(pt *v1beta1.PipelineTask, taskRunStatus map[string]*
// TODO(#3133): Also handle Custom Task Runs (getRun here)
cctr, err := getTaskRun(conditionCheckName)
if err != nil {
if !errors.IsNotFound(err) {
if !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving ConditionCheck %s for taskRun name %s : %w", conditionCheckName, taskRunName, err)
}
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
resolutionclient "github.com/tektoncd/resolution/pkg/client/injection/client"
resolutioninformer "github.com/tektoncd/resolution/pkg/client/injection/informers/resolution/v1alpha1/resolutionrequest"
resolution "github.com/tektoncd/resolution/pkg/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand All @@ -50,6 +53,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
podInformer := filteredpodinformer.Get(ctx, v1beta1.ManagedByLabelKey)
resourceInformer := resourceinformer.Get(ctx)
limitrangeInformer := limitrangeinformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger))
configStore.WatchConfigs(cmw)

Expand All @@ -59,17 +63,18 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}

c := &Reconciler{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: opts.Images,
Clock: clock,
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
limitrangeLister: limitrangeInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: opts.Images,
Clock: clock,
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
limitrangeLister: limitrangeInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()),
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
Loading