Skip to content

Commit

Permalink
Move waiting for platform to controller #797
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli authored and astefanutti committed Jul 4, 2019
1 parent cb988c7 commit daf9c0f
Show file tree
Hide file tree
Showing 21 changed files with 356 additions and 142 deletions.
8 changes: 6 additions & 2 deletions pkg/apis/camel/v1alpha1/build_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ const (
// BuildKind --
BuildKind string = "Build"

// BuildPhaseInitial --
BuildPhaseInitial BuildPhase = ""
// BuildPhaseNone --
BuildPhaseNone BuildPhase = ""
// BuildPhaseInitialization --
BuildPhaseInitialization BuildPhase = "initialization"
// BuildPhaseWaitingForPlatform --
BuildPhaseWaitingForPlatform BuildPhase = "Waiting For Platform"
// BuildPhaseScheduling --
BuildPhaseScheduling BuildPhase = "Scheduling"
// BuildPhasePending --
Expand Down
14 changes: 12 additions & 2 deletions pkg/apis/camel/v1alpha1/common_types_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ package v1alpha1

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (in *Artifact) String() string {
return in.ID
}

func (spec ConfigurationSpec) String() string {
return fmt.Sprintf("%s=%s", spec.Type, spec.Value)
func (in *ConfigurationSpec) String() string {
return fmt.Sprintf("%s=%s", in.Type, in.Value)
}

// NewErrorFailure --
func NewErrorFailure(err error) *Failure {
return &Failure{
Reason: err.Error(),
Time: metav1.Now(),
}
}
6 changes: 4 additions & 2 deletions pkg/apis/camel/v1alpha1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ const (
// IntegrationKind --
IntegrationKind string = "Integration"

// IntegrationPhaseInitial --
IntegrationPhaseInitial IntegrationPhase = ""
// IntegrationPhaseNone --
IntegrationPhaseNone IntegrationPhase = ""
// IntegrationPhaseInitialization --
IntegrationPhaseInitialization IntegrationPhase = "initialization"
// IntegrationPhaseWaitingForPlatform --
IntegrationPhaseWaitingForPlatform IntegrationPhase = "Waiting For Platform"
// IntegrationPhaseBuildingKit --
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/camel/v1alpha1/integrationkit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ const (
// IntegrationKitTypeExternal --
IntegrationKitTypeExternal = "external"

// IntegrationKitPhaseInitial --
IntegrationKitPhaseInitial IntegrationKitPhase = ""
// IntegrationKitPhaseNone --
IntegrationKitPhaseNone IntegrationKitPhase = ""
// IntegrationKitPhaseInitialization --
IntegrationKitPhaseInitialization IntegrationKitPhase = "initialization"
// IntegrationKitPhaseWaitingForPlatform --
IntegrationKitPhaseWaitingForPlatform IntegrationKitPhase = "Waiting For Platform"
// IntegrationKitPhaseBuildSubmitted --
Expand Down
61 changes: 44 additions & 17 deletions pkg/controller/build/build_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/apache/camel-k/pkg/platform"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -161,37 +162,58 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
return reconcile.Result{}, err
}

target := instance.DeepCopy()
targetLog := rlog.ForBuild(target)

if target.Status.Phase == v1alpha1.BuildPhaseNone || target.Status.Phase == v1alpha1.BuildPhaseWaitingForPlatform {
pl, err := platform.GetCurrentPlatform(ctx, r.client, target.Namespace)
switch {
case err != nil:
target.Status.Phase = v1alpha1.BuildPhaseError
target.Status.Failure = v1alpha1.NewErrorFailure(err)
case pl.Status.Phase != v1alpha1.IntegrationPlatformPhaseReady:
target.Status.Phase = v1alpha1.BuildPhaseWaitingForPlatform
default:
target.Status.Phase = v1alpha1.BuildPhaseInitialization
}

if instance.Status.Phase != target.Status.Phase {
err = r.update(ctx, target)
if err != nil {
if k8serrors.IsConflict(err) {
targetLog.Error(err, "conflict")
err = nil
}
}
}

return reconcile.Result{}, err
}

actions := []Action{
NewInitializeAction(),
NewScheduleRoutineAction(r.reader, r.builder, &r.routines),
NewSchedulePodAction(r.reader),
NewMonitorRoutineAction(&r.routines),
NewMonitorPodAction(),
NewErrorRecoveryAction(),
NewErrorAction(),
}

var err error

target := instance.DeepCopy()
targetPhase := target.Status.Phase
targetLog := rlog.ForBuild(target)

for _, a := range actions {
a.InjectClient(r.client)
a.InjectLogger(targetLog)

if a.CanHandle(target) {
targetLog.Infof("Invoking action %s", a.Name())

phaseFrom := target.Status.Phase

target, err = a.Handle(ctx, target)
newTarget, err := a.Handle(ctx, target)
if err != nil {
return reconcile.Result{}, err
}

if target != nil {
if err := r.client.Status().Update(ctx, target); err != nil {
if newTarget != nil {
if err := r.update(ctx, newTarget); err != nil {
if k8serrors.IsConflict(err) {
targetLog.Error(err, "conflict")
return reconcile.Result{
Expand All @@ -202,15 +224,15 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
return reconcile.Result{}, err
}

targetPhase = target.Status.Phase

if targetPhase != phaseFrom {
if newTarget.Status.Phase != target.Status.Phase {
targetLog.Info(
"state transition",
"phase-from", phaseFrom,
"phase-to", target.Status.Phase,
"phase-from", target.Status.Phase,
"phase-to", newTarget.Status.Phase,
)
}

target = newTarget
}

// handle one action at time so the resource
Expand All @@ -220,11 +242,16 @@ func (r *ReconcileBuild) Reconcile(request reconcile.Request) (reconcile.Result,
}

// Requeue scheduling build so that it re-enters the build working queue
if targetPhase == v1alpha1.BuildPhaseScheduling || targetPhase == v1alpha1.BuildPhaseFailed {
if target.Status.Phase == v1alpha1.BuildPhaseScheduling || target.Status.Phase == v1alpha1.BuildPhaseFailed {
return reconcile.Result{
RequeueAfter: 5 * time.Second,
}, nil
}

return reconcile.Result{}, nil
}

// Update --
func (r *ReconcileBuild) update(ctx context.Context, target *v1alpha1.Build) error {
return r.client.Status().Update(ctx, target)
}
48 changes: 48 additions & 0 deletions pkg/controller/build/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package build

import (
"context"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)

// NewErrorAction creates a new error action for scheduled routine
func NewErrorAction() Action {
return &errorAction{}
}

type errorAction struct {
baseAction
}

// Name returns a common name of the action
func (action *errorAction) Name() string {
return "error"
}

// CanHandle tells whether this action can handle the build
func (action *errorAction) CanHandle(build *v1alpha1.Build) bool {
return build.Status.Phase == v1alpha1.BuildPhaseError
}

// Handle handles the builds
func (action *errorAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/build/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (action *initializeAction) Name() string {

// CanHandle tells whether this action can handle the build
func (action *initializeAction) CanHandle(build *v1alpha1.Build) bool {
return build.Status.Phase == v1alpha1.BuildPhaseInitial
return build.Status.Phase == v1alpha1.BuildPhaseInitialization
}

// Handle handles the builds
Expand Down
10 changes: 1 addition & 9 deletions pkg/controller/build/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/platform"

"github.com/jpillora/backoff"
)

Expand Down Expand Up @@ -56,12 +54,6 @@ func (action *errorRecoveryAction) CanHandle(build *v1alpha1.Build) bool {
}

func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.Build) (*v1alpha1.Build, error) {
// The integration platform must be initialized before handling the error recovery
if _, err := platform.GetCurrentPlatform(ctx, action.client, build.Namespace); err != nil {
action.L.Info("Waiting for an integration platform to be initialized")
return nil, nil
}

if build.Status.Failure == nil {
build.Status.Failure = &v1alpha1.Failure{
Reason: build.Status.Error,
Expand Down Expand Up @@ -96,7 +88,7 @@ func (action *errorRecoveryAction) Handle(ctx context.Context, build *v1alpha1.B
}

build.Status = v1alpha1.BuildStatus{}
build.Status.Phase = v1alpha1.BuildPhaseInitial
build.Status.Phase = v1alpha1.BuildPhaseInitialization
build.Status.Failure.Recovery.Attempt++
build.Status.Failure.Recovery.AttemptTime = metav1.Now()

Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/integration/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func (action *deployAction) Handle(ctx context.Context, integration *v1alpha1.In
return nil, err
}

target := integration.DeepCopy()
target.Status.Phase = v1alpha1.IntegrationPhaseRunning
integration.Status.Phase = v1alpha1.IntegrationPhaseRunning

return integration, nil
}
61 changes: 61 additions & 0 deletions pkg/controller/integration/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"context"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/digest"
)

// NewErrorAction creates a new error action for an integration
func NewErrorAction() Action {
return &errorAction{}
}

type errorAction struct {
baseAction
}

func (action *errorAction) Name() string {
return "error"
}

func (action *errorAction) CanHandle(integration *v1alpha1.Integration) bool {
return integration.Status.Phase == v1alpha1.IntegrationPhaseError
}

func (action *errorAction) Handle(ctx context.Context, integration *v1alpha1.Integration) (*v1alpha1.Integration, error) {
hash, err := digest.ComputeForIntegration(integration)
if err != nil {
return nil, err
}

if hash != integration.Status.Digest {
action.L.Info("Integration needs a rebuild")

integration.Status.Digest = hash
integration.Status.Phase = v1alpha1.IntegrationPhaseInitialization

return integration, nil
}

// TODO check also if deployment matches (e.g. replicas)
return nil, nil
}
34 changes: 1 addition & 33 deletions pkg/controller/integration/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/platform"
"github.com/apache/camel-k/pkg/trait"
)

Expand All @@ -41,42 +40,11 @@ func (action *initializeAction) Name() string {

// CanHandle tells whether this action can handle the integration
func (action *initializeAction) CanHandle(integration *v1alpha1.Integration) bool {
return integration.Status.Phase == v1alpha1.IntegrationPhaseInitial || integration.Status.Phase == v1alpha1.IntegrationPhaseWaitingForPlatform
return integration.Status.Phase == v1alpha1.IntegrationPhaseInitialization
}

// Handle handles the integrations
func (action *initializeAction) Handle(ctx context.Context, integration *v1alpha1.Integration) (*v1alpha1.Integration, error) {
pl, err := platform.GetCurrentPlatform(ctx, action.client, integration.Namespace)
if err != nil {
return nil, err
}

// The integration platform needs to be ready before starting to create integrations
if pl.Status.Phase != v1alpha1.IntegrationPlatformPhaseReady {
action.L.Info("Waiting for the integration platform to be initialized")

if integration.Status.Phase != v1alpha1.IntegrationPhaseWaitingForPlatform {
integration.Status.Phase = v1alpha1.IntegrationPhaseWaitingForPlatform
return integration, nil
}

return nil, nil
}

//
// restore phase to initial phase as traits are not aware of
// WaitingForPlatform phase
//
if integration.Status.Phase == v1alpha1.IntegrationPhaseWaitingForPlatform {
integration.Status.Phase = v1alpha1.IntegrationPhaseInitial

return integration, nil
}

// better not changing the spec section of the target because it may be used for comparison by a
// higher level controller (e.g. Knative source controller)

// execute custom initialization
if _, err := trait.Apply(ctx, action.client, integration, nil); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit daf9c0f

Please sign in to comment.