Skip to content

Commit

Permalink
Support Task-level Resources Requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
austinzhao-go committed May 16, 2022
1 parent 9486d48 commit fcd18c3
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 33 deletions.
32 changes: 31 additions & 1 deletion pkg/apis/pipeline/v1beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1beta1/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ type TaskResources struct {
// DeclaredPipelineResources to the input PipelineResources required by the Task.
// +listType=atomic
Outputs []TaskResource `json:"outputs,omitempty"`
// Limits describes the maximum amount of compute resources allowed.
// More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
// +optional
Limits v1.ResourceList `json:"limits,omitempty"`
// Requests describes the minimum amount of compute resources required.
// If Requests is omitted for a container, it defaults to Limits if that is explicitly specified,
// otherwise to an implementation-defined value.
// More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
// +optional
Requests v1.ResourceList `json:"requests,omitempty"`
}

// TaskResource defines an input or output Resource declared as a requirement
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/pipeline/v1beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2424,6 +2424,14 @@
},
"x-kubernetes-list-type": "atomic"
},
"limits": {
"description": "Limits describes the maximum amount of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/",
"type": "object",
"additionalProperties": {
"default": {},
"$ref": "#/definitions/k8s.io.apimachinery.pkg.api.resource.Quantity"
}
},
"outputs": {
"description": "Outputs holds the mapping from the PipelineResources declared in DeclaredPipelineResources to the input PipelineResources required by the Task.",
"type": "array",
Expand All @@ -2432,6 +2440,14 @@
"$ref": "#/definitions/v1beta1.TaskResource"
},
"x-kubernetes-list-type": "atomic"
},
"requests": {
"description": "Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/",
"type": "object",
"additionalProperties": {
"default": {},
"$ref": "#/definitions/k8s.io.apimachinery.pkg.api.resource.Quantity"
}
}
}
},
Expand Down
14 changes: 14 additions & 0 deletions pkg/apis/pipeline/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,23 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1beta1.TaskRun) (*v1beta1
return nil, nil, controller.NewPermanentError(err)
}

if err := validateTaskSpecRequestResources(ctx, taskSpec); err != nil {
logger.Errorf("TaskRun %s taskSpec request resources are invalid: %v", tr.Name, err)
if isResourcesRequirementsInStepLevel(taskSpec) && isResourcesRequirementsInTaskLevel(taskSpec) {
err := fmt.Errorf("TaskRun %s can't be configured with both step-level and task-level resources requirements", tr.Name)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil, nil, controller.NewPermanentError(err)
} else if isResourcesRequirementsInStepLevel(taskSpec) {
if err := validateStepLevelResourcesRequirements(taskSpec); err != nil {
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil, nil, controller.NewPermanentError(err)
}
} else if isResourcesRequirementsInTaskLevel(taskSpec) {
// validate task level
if err := validateTaskLevelResourcesRequirements(taskSpec); err != nil {
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil, nil, controller.NewPermanentError(err)
}
// updatewithTask
// updatewithTaskrun
}

if err := ValidateResolvedTaskResources(ctx, tr.Spec.Params, []v1beta1.Param{}, rtr); err != nil {
Expand Down
27 changes: 12 additions & 15 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3912,10 +3912,8 @@ status:
}
}

func Test_validateTaskSpecRequestResources_ValidResources(t *testing.T) {
ctx := context.Background()

tcs := []struct {
func Test_validateStepLevelResourcesRequirements_validResourcesRequirements(t *testing.T) {
taskSpecs := []struct {
name string
taskSpec *v1beta1.TaskSpec
}{{
Expand Down Expand Up @@ -4017,19 +4015,18 @@ func Test_validateTaskSpecRequestResources_ValidResources(t *testing.T) {
},
}}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if err := validateTaskSpecRequestResources(ctx, tc.taskSpec); err != nil {
t.Fatalf("Expected to see error when validating invalid TaskSpec resources but saw none")
for _, taskSpec := range taskSpecs {
t.Run(taskSpec.name, func(t *testing.T) {
if err := validateStepLevelResourcesRequirements(taskSpec.taskSpec); err != nil {
t.Errorf("Expected no errors when validating valid step-level resources requirements, but got: %v.", err)
}
})
}

}

func Test_validateTaskSpecRequestResources_InvalidResources(t *testing.T) {
ctx := context.Background()
tcs := []struct {
func Test_validateStepLevelResourcesRequirements_invalidResourcesRequirements(t *testing.T) {
taskSpecs := []struct {
name string
taskSpec *v1beta1.TaskSpec
}{{
Expand Down Expand Up @@ -4073,10 +4070,10 @@ func Test_validateTaskSpecRequestResources_InvalidResources(t *testing.T) {
},
}}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if err := validateTaskSpecRequestResources(ctx, tc.taskSpec); err == nil {
t.Fatalf("Expected to see error when validating invalid TaskSpec resources but saw none")
for _, taskSpec := range taskSpecs {
t.Run(taskSpec.name, func(t *testing.T) {
if err := validateStepLevelResourcesRequirements(taskSpec.taskSpec); err == nil {
t.Errorf("Expected errors when validating invalid step-level resources requirements, but got none.")
}
})
}
Expand Down
79 changes: 64 additions & 15 deletions pkg/reconciler/taskrun/validate_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,26 @@ func ValidateResolvedTaskResources(ctx context.Context, params []v1beta1.Param,
return nil
}

func validateTaskSpecRequestResources(ctx context.Context, taskSpec *v1beta1.TaskSpec) error {
if taskSpec != nil {
for _, step := range taskSpec.Steps {
for k, request := range step.Resources.Requests {
// First validate the limit in step
if limit, ok := step.Resources.Limits[k]; ok {
// validateStepLevelResourcesRequirements() validates step-level resources requirements under 'Task.TaskSpec.Step' and 'Task.TaskSpec.StepTemplate'
func validateStepLevelResourcesRequirements(taskSpec *v1beta1.TaskSpec) error {
if !isResourcesRequirementsInStepLevel(taskSpec) {
return nil
}

for _, step := range taskSpec.Steps {
for key, request := range step.Resources.Requests {
// First validate the limit in step
if limit, ok := step.Resources.Limits[key]; ok {
// Check if limit < request (so out of limit)
if (&limit).Cmp(request) == -1 {
return fmt.Errorf("invalid request resource value: %v must be less or equal to limit %v", request.String(), limit.String())
}
} else if taskSpec.StepTemplate != nil {
// If step doesn't configure the limit, validate the limit in stepTemplate
if limit, ok := taskSpec.StepTemplate.Resources.Limits[key]; ok {
if (&limit).Cmp(request) == -1 {
return fmt.Errorf("Invalid request resource value: %v must be less or equal to limit %v", request.String(), limit.String())
}
} else if taskSpec.StepTemplate != nil {
// If step doesn't configure the limit, validate the limit in stepTemplate
if limit, ok := taskSpec.StepTemplate.Resources.Limits[k]; ok {
if (&limit).Cmp(request) == -1 {
return fmt.Errorf("Invalid request resource value: %v must be less or equal to limit %v", request.String(), limit.String())
}
return fmt.Errorf("invalid request resource value: %v must be less or equal to limit %v", request.String(), limit.String())
}

}

}
Expand All @@ -249,6 +252,52 @@ func validateTaskSpecRequestResources(ctx context.Context, taskSpec *v1beta1.Tas
return nil
}

// isResourcesRequirementsInStepLevel() checks if any resources requirements specified under step-level
func isResourcesRequirementsInStepLevel(taskSpec *v1beta1.TaskSpec) bool {
if taskSpec == nil {
return false
}

for _, step := range taskSpec.Steps {
if step.Resources.String() != "" {
return true
}
}

return taskSpec.StepTemplate != nil && taskSpec.StepTemplate.Resources.String() != ""
}

// isResourcesRequirementsInTaskLevel() checks if any resources requirements specified under task-level
func isResourcesRequirementsInTaskLevel(taskSpec *v1beta1.TaskSpec) bool {
if taskSpec == nil || taskSpec.Resources == nil {
return false
}
return taskSpec.Resources.Limits != nil || taskSpec.Resources.Requests != nil
}

// validateTaskLevelResourcesRequirements() validates task-level resource requirements under 'Task.TaskSpec'
func validateTaskLevelResourcesRequirements(taskSpec *v1beta1.TaskSpec) error {
if !isResourcesRequirementsInTaskLevel(taskSpec) {
return nil
}

for key, request := range taskSpec.Resources.Requests {
if limit, ok := taskSpec.Resources.Limits[key]; ok {
// Check if limit < request (so out of limit)
if (&limit).Cmp(request) == -1 {
return fmt.Errorf("invalid request resource value: %v must be less or equal to limit %v", request.String(), limit.String())
}
}
}

return nil
}

//
func updateByTaskLevelRequestResources(taskSpec *v1beta1.TaskSpec) {

}

// validateOverrides validates that all stepOverrides map to valid steps, and likewise for sidecarOverrides
func validateOverrides(ctx context.Context, ts *v1beta1.TaskSpec, trs *v1beta1.TaskRunSpec) error {
stepErr := validateStepOverrides(ctx, ts, trs)
Expand Down
Loading

0 comments on commit fcd18c3

Please sign in to comment.