Skip to content

Commit

Permalink
Allow PipelineResource implementations to modify the entire Pod spec.
Browse files Browse the repository at this point in the history
This change simplifies the interface by removing the GetUpload/Download container
and volume methods and replaces it with a more generic "modifier" system.

This can be cleaned up a bit more still, and is intended for an early review at this point.
  • Loading branch information
dlorenc committed Sep 24, 2019
1 parent cd5b973 commit 4411681
Show file tree
Hide file tree
Showing 19 changed files with 323 additions and 258 deletions.
37 changes: 19 additions & 18 deletions pkg/apis/pipeline/v1alpha1/build_gcs_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,6 @@ func (s BuildGCSResource) GetType() PipelineResourceType { return PipelineResour
// GetSecretParams returns nil because it takes no secret params.
func (s *BuildGCSResource) GetSecretParams() []SecretParam { return nil }

// GetUploadSteps returns nil because it does not support uploading as an
// output resource.
func (s *BuildGCSResource) GetUploadSteps(string) ([]Step, error) { return nil, nil }

// GetUploadVolumeSpec returns nil because it does not support uploading as an
// output resource.
func (s *BuildGCSResource) GetUploadVolumeSpec(*TaskSpec) ([]corev1.Volume, error) { return nil, nil }

// Replacements returns the set of available replacements for this resource.
func (s *BuildGCSResource) Replacements() map[string]string {
return map[string]string{
Expand All @@ -134,23 +126,37 @@ func (s *BuildGCSResource) Replacements() map[string]string {
}
}

// GetDownloadSteps returns the Steps needed to populate the workspace with the
// resource's data.
func (s *BuildGCSResource) GetDownloadSteps(sourcePath string) ([]Step, error) {
// GetInputTaskModifier returns a TaskModifier that prepends a step to a Task to fetch the archive or manifest.
func (s *BuildGCSResource) GetInputTaskModifier(ts *TaskSpec, sourcePath string) (TaskModifier, error) {
args := []string{"--type", string(s.ArtifactType), "--location", s.Location}
// dest_dir is the destination directory for GCS files to be copies"
if sourcePath != "" {
args = append(args, "--dest_dir", sourcePath)
}

return []Step{
steps := []Step{
CreateDirStep(s.Name, sourcePath),
{Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(fmt.Sprintf("storage-fetch-%s", s.Name)),
Command: []string{"/ko-app/gcs-fetcher"},
Image: *buildGCSFetcherImage,
Args: args,
}}}, nil
}}}

volumes, err := getStorageVolumeSpec(s, ts)
if err != nil {
return nil, err
}

return &InternalTaskModifier{
StepsToPrepend: steps,
Volumes: volumes,
}, nil
}

// GetOutputTaskModifier returns a No-op TaskModifier.
func (s *BuildGCSResource) GetOutputTaskModifier(ts *TaskSpec, sourcePath string) (TaskModifier, error) {
return &InternalTaskModifier{}, nil
}

func getArtifactType(val string) (GCSArtifactType, error) {
Expand All @@ -162,8 +168,3 @@ func getArtifactType(val string) (GCSArtifactType, error) {
}
return "", xerrors.Errorf("Invalid ArtifactType %s. Should be one of %s", val, validArtifactTypes)
}

// GetDownloadVolumeSpec returns the volumes needed by this resource.
func (s *BuildGCSResource) GetDownloadVolumeSpec(spec *TaskSpec) ([]corev1.Volume, error) {
return getStorageVolumeSpec(s, spec)
}
8 changes: 5 additions & 3 deletions pkg/apis/pipeline/v1alpha1/build_gcs_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func Test_BuildGCSGetReplacements(t *testing.T) {
}
}

func Test_BuildGCSGetDownloadSteps(t *testing.T) {
func Test_BuildGCSGetInputSteps(t *testing.T) {
for _, at := range []v1alpha1.GCSArtifactType{
v1alpha1.GCSArchive,
v1alpha1.GCSZipArchive,
Expand All @@ -150,11 +150,13 @@ func Test_BuildGCSGetDownloadSteps(t *testing.T) {
Command: []string{"/ko-app/gcs-fetcher"},
}}}
names.TestingSeed()
got, err := resource.GetDownloadSteps("/workspace")

ts := v1alpha1.TaskSpec{}
got, err := resource.GetInputTaskModifier(&ts, "/workspace")
if err != nil {
t.Fatalf("GetDownloadSteps: %v", err)
}
if d := cmp.Diff(got, wantSteps); d != "" {
if d := cmp.Diff(got.GetStepsToPrepend(), wantSteps); d != "" {
t.Errorf("Error mismatch between download steps: %s", d)
}
})
Expand Down
13 changes: 6 additions & 7 deletions pkg/apis/pipeline/v1alpha1/cloud_event_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package v1alpha1
import (
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
)

// CloudEventResource is an event sink to which events are delivered when a TaskRun has finished
Expand Down Expand Up @@ -80,9 +78,10 @@ func (s *CloudEventResource) Replacements() map[string]string {
}
}

func (s *CloudEventResource) GetUploadSteps(string) ([]Step, error) { return nil, nil }
func (s *CloudEventResource) GetDownloadSteps(string) ([]Step, error) { return nil, nil }
func (s *CloudEventResource) GetUploadVolumeSpec(*TaskSpec) ([]corev1.Volume, error) { return nil, nil }
func (s *CloudEventResource) GetDownloadVolumeSpec(*TaskSpec) ([]corev1.Volume, error) {
return nil, nil
func (s *CloudEventResource) GetInputTaskModifier(_ *TaskSpec, _ string) (TaskModifier, error) {
return &InternalTaskModifier{}, nil
}

func (s *CloudEventResource) GetOutputTaskModifier(_ *TaskSpec, _ string) (TaskModifier, error) {
return &InternalTaskModifier{}, nil
}
12 changes: 6 additions & 6 deletions pkg/apis/pipeline/v1alpha1/cloud_event_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,29 @@ func Test_CloudEventGetReplacements(t *testing.T) {
}
}

func Test_CloudEventDownloadContainerSpec(t *testing.T) {
func Test_CloudEventInputContainerSpec(t *testing.T) {
r := &v1alpha1.CloudEventResource{
Name: "cloud-event-resource",
TargetURI: "http://fake-uri",
Type: v1alpha1.PipelineResourceTypeCloudEvent,
}
d, e := r.GetDownloadSteps("")
if d != nil {
d, e := r.GetInputTaskModifier(&v1alpha1.TaskSpec{}, "")
if d.GetStepsToPrepend() != nil {
t.Errorf("Did not expect a download container for CloudEventResource")
}
if e != nil {
t.Errorf("Did not expect an error %s when getting a download container for CloudEventResource", e)
}
}

func Test_CloudEventUploadContainerSpec(t *testing.T) {
func Test_CloudEventOutputContainerSpec(t *testing.T) {
r := &v1alpha1.CloudEventResource{
Name: "cloud-event-resource",
TargetURI: "http://fake-uri",
Type: v1alpha1.PipelineResourceTypeCloudEvent,
}
d, e := r.GetUploadSteps("")
if d != nil {
d, e := r.GetOutputTaskModifier(&v1alpha1.TaskSpec{}, "")
if d.GetStepsToAppend() != nil {
t.Errorf("Did not expect an upload container for CloudEventResource")
}
if e != nil {
Expand Down
16 changes: 9 additions & 7 deletions pkg/apis/pipeline/v1alpha1/cluster_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ func (s ClusterResource) String() string {
return string(json)
}

func (s *ClusterResource) GetUploadSteps(string) ([]Step, error) { return nil, nil }
func (s *ClusterResource) GetOutputTaskModifier(_ *TaskSpec, _ string) (TaskModifier, error) {
return &InternalTaskModifier{}, nil
}

func (s *ClusterResource) GetDownloadSteps(sourcePath string) ([]Step, error) {
func (s *ClusterResource) GetInputTaskModifier(ts *TaskSpec, path string) (TaskModifier, error) {
var envVars []corev1.EnvVar
for _, sec := range s.Secrets {
ev := corev1.EnvVar{
Expand All @@ -160,16 +162,16 @@ func (s *ClusterResource) GetDownloadSteps(sourcePath string) ([]Step, error) {
}
envVars = append(envVars, ev)
}
return []Step{{Container: corev1.Container{
step := Step{Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix("kubeconfig"),
Image: *kubeconfigWriterImage,
Command: []string{"/ko-app/kubeconfigwriter"},
Args: []string{
"-clusterConfig", s.String(),
},
Env: envVars,
}}}, nil
}}
return &InternalTaskModifier{
StepsToPrepend: []Step{step},
}, nil
}

func (s *ClusterResource) GetUploadVolumeSpec(*TaskSpec) ([]corev1.Volume, error) { return nil, nil }
func (s *ClusterResource) GetDownloadVolumeSpec(*TaskSpec) ([]corev1.Volume, error) { return nil, nil }
9 changes: 6 additions & 3 deletions pkg/apis/pipeline/v1alpha1/cluster_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestNewClusterResource(t *testing.T) {
}
}

func Test_ClusterResource_GetDownloadSteps(t *testing.T) {
func Test_ClusterResource_GetInputTaskModifier(t *testing.T) {
names.TestingSeed()
clusterResource := &v1alpha1.ClusterResource{
Name: "test-cluster-resource",
Expand All @@ -146,6 +146,8 @@ func Test_ClusterResource_GetDownloadSteps(t *testing.T) {
SecretName: "secret1",
}},
}

ts := v1alpha1.TaskSpec{}
wantSteps := []v1alpha1.Step{{Container: corev1.Container{
Name: "kubeconfig-9l9zj",
Image: "override-with-kubeconfig-writer:latest",
Expand All @@ -163,11 +165,12 @@ func Test_ClusterResource_GetDownloadSteps(t *testing.T) {
},
}},
}}}
got, err := clusterResource.GetDownloadSteps("")

got, err := clusterResource.GetInputTaskModifier(&ts, "")
if err != nil {
t.Fatalf("GetDownloadSteps: %v", err)
}
if d := cmp.Diff(got, wantSteps); d != "" {
if d := cmp.Diff(got.GetStepsToPrepend(), wantSteps); d != "" {
t.Errorf("Error mismatch between download steps: %s", d)
}
}
53 changes: 31 additions & 22 deletions pkg/apis/pipeline/v1alpha1/gcs_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,57 +97,66 @@ func (s *GCSResource) Replacements() map[string]string {
}
}

// GetUploadSteps gets container spec for gcs resource to be uploaded like
// set environment variable from secret params and set volume mounts for those secrets
func (s *GCSResource) GetUploadSteps(sourcePath string) ([]Step, error) {
func (s *GCSResource) GetOutputTaskModifier(ts *TaskSpec, path string) (TaskModifier, error) {
var args []string
if s.TypeDir {
args = []string{"-args", fmt.Sprintf("rsync -d -r %s %s", sourcePath, s.Location)}
args = []string{"-args", fmt.Sprintf("rsync -d -r %s %s", path, s.Location)}
} else {
args = []string{"-args", fmt.Sprintf("cp %s %s", filepath.Join(sourcePath, "*"), s.Location)}
args = []string{"-args", fmt.Sprintf("cp %s %s", filepath.Join(path, "*"), s.Location)}
}

envVars, secretVolumeMount := getSecretEnvVarsAndVolumeMounts(s.Name, gcsSecretVolumeMountPath, s.Secrets)

return []Step{{Container: corev1.Container{
step := Step{Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(fmt.Sprintf("upload-%s", s.Name)),
Image: *gsutilImage,
Command: []string{"/ko-app/gsutil"},
Args: args,
VolumeMounts: secretVolumeMount,
Env: envVars,
}}}, nil
Env: envVars},
}

volumes, err := getStorageVolumeSpec(s, ts)
if err != nil {
return nil, err
}

return &InternalTaskModifier{
StepsToAppend: []Step{step},
Volumes: volumes,
}, nil
}

// GetDownloadSteps returns an array of container specs to download gcs storage object
func (s *GCSResource) GetDownloadSteps(sourcePath string) ([]Step, error) {
if sourcePath == "" {
func (s *GCSResource) GetInputTaskModifier(ts *TaskSpec, path string) (TaskModifier, error) {
if path == "" {
return nil, xerrors.Errorf("GCSResource: Expect Destination Directory param to be set %s", s.Name)
}
var args []string
if s.TypeDir {
args = []string{"-args", fmt.Sprintf("rsync -d -r %s %s", s.Location, sourcePath)}
args = []string{"-args", fmt.Sprintf("rsync -d -r %s %s", s.Location, path)}
} else {
args = []string{"-args", fmt.Sprintf("cp %s %s", s.Location, sourcePath)}
args = []string{"-args", fmt.Sprintf("cp %s %s", s.Location, path)}
}

envVars, secretVolumeMount := getSecretEnvVarsAndVolumeMounts(s.Name, gcsSecretVolumeMountPath, s.Secrets)
return []Step{
CreateDirStep(s.Name, sourcePath),
steps := []Step{
CreateDirStep(s.Name, path),
{Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(fmt.Sprintf("fetch-%s", s.Name)),
Image: *gsutilImage,
Command: []string{"/ko-app/gsutil"},
Args: args,
Env: envVars,
VolumeMounts: secretVolumeMount,
}}}, nil
}
}}}

func (s *GCSResource) GetUploadVolumeSpec(spec *TaskSpec) ([]corev1.Volume, error) {
return getStorageVolumeSpec(s, spec)
}
volumes, err := getStorageVolumeSpec(s, ts)
if err != nil {
return nil, err
}

func (s *GCSResource) GetDownloadVolumeSpec(spec *TaskSpec) ([]corev1.Volume, error) {
return getStorageVolumeSpec(s, spec)
return &InternalTaskModifier{
StepsToPrepend: steps,
Volumes: volumes,
}, nil
}
16 changes: 9 additions & 7 deletions pkg/apis/pipeline/v1alpha1/gcs_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Test_GetParams(t *testing.T) {
}
}

func Test_GetDownloadSteps(t *testing.T) {
func Test_GetInputSteps(t *testing.T) {
names.TestingSeed()

for _, tc := range []struct {
Expand Down Expand Up @@ -217,18 +217,19 @@ func Test_GetDownloadSteps(t *testing.T) {
}}},
}} {
t.Run(tc.name, func(t *testing.T) {
gotContainers, err := tc.gcsResource.GetDownloadSteps("/workspace")
ts := v1alpha1.TaskSpec{}
gotSpec, err := tc.gcsResource.GetInputTaskModifier(&ts, "/workspace")
if tc.wantErr && err == nil {
t.Fatalf("Expected error to be %t but got %v:", tc.wantErr, err)
}
if d := cmp.Diff(gotContainers, tc.wantSteps); d != "" {
if d := cmp.Diff(gotSpec.GetStepsToPrepend(), tc.wantSteps); d != "" {
t.Errorf("Error mismatch between download containers spec: %s", d)
}
})
}
}

func Test_GetUploadSteps(t *testing.T) {
func Test_GetOutputTaskModifier(t *testing.T) {
names.TestingSeed()

for _, tc := range []struct {
Expand Down Expand Up @@ -302,12 +303,13 @@ func Test_GetUploadSteps(t *testing.T) {
}}},
}} {
t.Run(tc.name, func(t *testing.T) {
gotContainers, err := tc.gcsResource.GetUploadSteps("/workspace/")
if tc.wantErr && err == nil {
ts := v1alpha1.TaskSpec{}
got, err := tc.gcsResource.GetOutputTaskModifier(&ts, "/workspace/")
if (err != nil) != tc.wantErr {
t.Fatalf("Expected error to be %t but got %v:", tc.wantErr, err)
}

if d := cmp.Diff(gotContainers, tc.wantSteps); d != "" {
if d := cmp.Diff(got.GetStepsToAppend(), tc.wantSteps); d != "" {
t.Errorf("Error mismatch between upload containers spec: %s", d)
}
})
Expand Down
29 changes: 17 additions & 12 deletions pkg/apis/pipeline/v1alpha1/git_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,27 @@ func (s *GitResource) Replacements() map[string]string {
}
}

func (s *GitResource) GetDownloadSteps(sourcePath string) ([]Step, error) {
func (s *GitResource) GetInputTaskModifier(_ *TaskSpec, path string) (TaskModifier, error) {
args := []string{"-url", s.URL,
"-revision", s.Revision,
}

args = append(args, []string{"-path", sourcePath}...)
args = append(args, []string{"-path", path}...)

return []Step{{Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(gitSource + "-" + s.Name),
Image: *gitImage,
Command: []string{"/ko-app/git-init"},
Args: args,
WorkingDir: WorkspaceDir,
}}}, nil
step := Step{
Container: corev1.Container{
Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix(gitSource + "-" + s.Name),
Image: *gitImage,
Command: []string{"/ko-app/git-init"},
Args: args,
WorkingDir: WorkspaceDir,
},
}
return &InternalTaskModifier{
StepsToPrepend: []Step{step},
}, nil
}

func (s *GitResource) GetUploadSteps(sourcePath string) ([]Step, error) { return nil, nil }
func (s *GitResource) GetUploadVolumeSpec(spec *TaskSpec) ([]corev1.Volume, error) { return nil, nil }
func (s *GitResource) GetDownloadVolumeSpec(spec *TaskSpec) ([]corev1.Volume, error) { return nil, nil }
func (s *GitResource) GetOutputTaskModifier(_ *TaskSpec, _ string) (TaskModifier, error) {
return &InternalTaskModifier{}, nil
}
Loading

0 comments on commit 4411681

Please sign in to comment.