Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Generic patching of integration resources in running phase #1100

Merged
merged 13 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 4 additions & 26 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package integration
import (
"context"

"github.com/pkg/errors"

appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -257,7 +255,7 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R
target.SetIntegrationPlatform(pl)
}

return r.update(ctx, targetLog, target)
return r.update(ctx, &instance, target)
}

return reconcile.Result{}, err
Expand All @@ -280,21 +278,11 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R

newTarget, err := a.Handle(ctx, target)
if err != nil {
// Some traits, like the deployment and knative service ones,
// update owned resources in the running phase, so it's better
// handling update conflicts gracefully, consistently with the
// primary integration update requests.
if cause := errors.Cause(err); k8serrors.IsConflict(cause) {
log.Error(cause, "conflict")
return reconcile.Result{
Requeue: true,
}, nil
}
return reconcile.Result{}, err
}

if newTarget != nil {
if r, err := r.update(ctx, targetLog, newTarget); err != nil {
if r, err := r.update(ctx, &instance, newTarget); err != nil {
return r, err
}

Expand All @@ -316,25 +304,15 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R
return reconcile.Result{}, nil
}

// Update --
func (r *ReconcileIntegration) update(ctx context.Context, log log.Logger, target *v1alpha1.Integration) (reconcile.Result, error) {
func (r *ReconcileIntegration) update(ctx context.Context, base *v1alpha1.Integration, target *v1alpha1.Integration) (reconcile.Result, error) {
dgst, err := digest.ComputeForIntegration(target)
if err != nil {
return reconcile.Result{}, err
}

target.Status.Digest = dgst

err = r.client.Status().Update(ctx, target)
if err != nil {
if k8serrors.IsConflict(err) {
log.Error(err, "conflict")

return reconcile.Result{
Requeue: true,
}, nil
}
}
err = r.client.Status().Patch(ctx, target, k8sclient.MergeFrom(base))

return reconcile.Result{}, err
}
3 changes: 1 addition & 2 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1alpha1.I
return integration, nil
}

// Run traits that are enabled for the running phase,
// such as the deployment, garbage collector and Knative service traits.
// Run traits that are enabled for the running phase
_, err = trait.Apply(ctx, action.client, integration, nil)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/trait/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (t *affinityTrait) Configure(e *Environment) (bool, error) {
return false, fmt.Errorf("both pod affinity and pod anti-affinity can't be set simultaneously")
}

return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying), nil
return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying, v1alpha1.IntegrationPhaseRunning), nil
}

func (t *affinityTrait) Apply(e *Environment) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/trait/classpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (t *classpathTrait) Configure(e *Environment) (bool, error) {
return false, nil
}

return e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying), nil
return e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) ||
e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseRunning), nil
}

func (t *classpathTrait) Apply(e *Environment) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/trait/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (t *containerTrait) Configure(e *Environment) (bool, error) {
return false, nil
}

if !e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying) {
if !e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying, v1alpha1.IntegrationPhaseRunning) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/trait/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newDebugTrait() *debugTrait {

func (t *debugTrait) Configure(e *Environment) (bool, error) {
if t.Enabled != nil && *t.Enabled {
return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying), nil
return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying, v1alpha1.IntegrationPhaseRunning), nil
}

return false, nil
Expand Down
130 changes: 129 additions & 1 deletion pkg/trait/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ limitations under the License.

package trait

import (
"reflect"

"github.com/pkg/errors"

jsonpatch "github.com/evanphx/json-patch"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes"
)

// The deployer trait can be used to explicitly select the kind of high level resource that
// will deploy the integration.
//
Expand All @@ -34,14 +51,125 @@ func newDeployerTrait() *deployerTrait {
}

func (t *deployerTrait) Configure(e *Environment) (bool, error) {
return true, nil
return e.IntegrationInPhase(
v1alpha1.IntegrationPhaseInitialization,
v1alpha1.IntegrationPhaseDeploying,
v1alpha1.IntegrationPhaseRunning,
), nil
}

func (t *deployerTrait) Apply(e *Environment) error {
switch e.Integration.Status.Phase {

case v1alpha1.IntegrationPhaseInitialization, v1alpha1.IntegrationPhaseDeploying:
// Register a post action that updates the resources generated by the traits
e.PostActions = append(e.PostActions, func(env *Environment) error {
if err := kubernetes.ReplaceResources(env.C, env.Client, env.Resources.Items()); err != nil {
return errors.Wrap(err, "error during replace resource")
}
return nil
})

case v1alpha1.IntegrationPhaseRunning:
// Register a post action that patches the resources generated by the traits
e.PostActions = append(e.PostActions, func(env *Environment) error {
for _, resource := range env.Resources.Items() {
key, err := client.ObjectKeyFromObject(resource)
if err != nil {
return err
}

object := resource.DeepCopyObject()
err = env.Client.Get(env.C, key, object)
if err != nil {
return err
}

patch, err := positiveMergePatch(object, resource)
if err != nil {
return err
} else if len(patch) == 0 {
// Avoid triggering a patch request for nothing
continue
}

err = env.Client.Patch(env.C, resource, client.ConstantPatch(types.MergePatchType, patch))
if err != nil {
return errors.Wrap(err, "error during patch resource")
}
}
return nil
})
}

return nil
}

// IsPlatformTrait overrides base class method
func (t *deployerTrait) IsPlatformTrait() bool {
return true
}

func positiveMergePatch(source runtime.Object, target runtime.Object) ([]byte, error) {
sourceJSON, err := json.Marshal(source)
if err != nil {
return nil, err
}

targetJSON, err := json.Marshal(target)
if err != nil {
return nil, err
}

mergePatch, err := jsonpatch.CreateMergePatch(sourceJSON, targetJSON)
if err != nil {
return nil, err
}

var positivePatch map[string]interface{}
err = json.Unmarshal(mergePatch, &positivePatch)
if err != nil {
return nil, err
}

// The following is a work-around to remove null fields from the JSON merge patch,
// so that values defaulted by controllers server-side are not deleted.
// It's generally acceptable as these values are orthogonal to the values managed
// by the traits.
removeNilValues(reflect.ValueOf(positivePatch), reflect.Value{})

// Return an empty patch if no keys remain
if len(positivePatch) == 0 {
return make([]byte, 0), nil
}

return json.Marshal(positivePatch)
}

func removeNilValues(v reflect.Value, parent reflect.Value) {
for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
v = v.Elem()
}
switch v.Kind() {
case reflect.Array, reflect.Slice:
for i := 0; i < v.Len(); i++ {
removeNilValues(v.Index(i), v)
}
case reflect.Map:
for _, k := range v.MapKeys() {
switch c := v.MapIndex(k); {
case !c.IsValid():
// Skip keys previously deleted
continue
case c.IsNil(), c.Elem().Kind() == reflect.Map && len(c.Elem().MapKeys()) == 0:
v.SetMapIndex(k, reflect.Value{})
default:
removeNilValues(c, v)
}
}
// Back process the parent map in case it has been emptied so that it's deleted as well
if len(v.MapKeys()) == 0 && parent.Kind() == reflect.Map {
removeNilValues(parent, reflect.Value{})
}
}
}
46 changes: 15 additions & 31 deletions pkg/trait/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@ limitations under the License.
package trait

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)

Expand Down Expand Up @@ -97,7 +93,6 @@ func (t *deploymentTrait) Configure(e *Environment) (bool, error) {
func (t *deploymentTrait) Apply(e *Environment) error {
if e.IntegrationKitInPhase(v1alpha1.IntegrationKitPhaseReady) &&
e.IntegrationInPhase(v1alpha1.IntegrationPhaseBuildingKit, v1alpha1.IntegrationPhaseResolvingKit) {

e.PostProcessors = append(e.PostProcessors, func(environment *Environment) error {
// trigger integration deploy
e.Integration.Status.Phase = v1alpha1.IntegrationPhaseDeploying
Expand All @@ -107,44 +102,33 @@ func (t *deploymentTrait) Apply(e *Environment) error {
return nil
}

if e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) {
if e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseDeploying) ||
e.InPhase(v1alpha1.IntegrationKitPhaseReady, v1alpha1.IntegrationPhaseRunning) {
maps := e.ComputeConfigMaps()
depl := t.getDeploymentFor(e)
deployment := t.getDeploymentFor(e)

e.Resources.AddAll(maps)
e.Resources.Add(depl)
e.Resources.Add(deployment)

e.Integration.Status.SetCondition(
v1alpha1.IntegrationConditionDeploymentAvailable,
corev1.ConditionTrue,
v1alpha1.IntegrationConditionDeploymentAvailableReason,
depl.Name,
deployment.Name,
)

return nil
}

if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
// Reconcile the deployment replicas
deployment := &appsv1.Deployment{}
err := t.client.Get(context.TODO(), client.ObjectKey{Namespace: e.Integration.Namespace, Name: e.Integration.Name}, deployment)
if err != nil {
return err
}
replicas := e.Integration.Spec.Replicas
// Deployment replicas defaults to 1, so we avoid forcing
// an update to nil that will result to another update cycle
// back to that default value by the Deployment controller.
if replicas == nil && *deployment.Spec.Replicas != 1 ||
replicas != nil && *deployment.Spec.Replicas != *replicas {
deployment.Spec.Replicas = replicas
err := t.client.Update(context.TODO(), deployment)
if err != nil {
return err
if e.IntegrationInPhase(v1alpha1.IntegrationPhaseRunning) {
// Reconcile the deployment replicas
replicas := e.Integration.Spec.Replicas
// Deployment replicas defaults to 1, so we avoid forcing
// an update to nil that will result to another update cycle
// back to that default value by the Deployment controller.
if replicas == nil {
one := int32(1)
replicas = &one
}
deployment.Spec.Replicas = replicas
}

return nil
}

return nil
Expand Down
10 changes: 4 additions & 6 deletions pkg/trait/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/test"
"github.com/stretchr/testify/assert"

"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/stretchr/testify/assert"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -132,14 +131,13 @@ func TestApplyDeploymentTraitWhileRunningIntegrationDoesSucceed(t *testing.T) {

assert.Nil(t, err)

deployment := &appsv1.Deployment{}
err = deploymentTrait.client.Get(context.TODO(), client.ObjectKey{Namespace: "namespace", Name: "integration-name"}, deployment)
assert.Nil(t, err)
deployment := environment.Resources.GetDeployment(func(deployment *appsv1.Deployment) bool { return true })
assert.NotNil(t, deployment)
assert.Equal(t, "integration-name", deployment.Name)
assert.Equal(t, int32(3), *deployment.Spec.Replicas)
}

func createNominalDeploymentTest() (*deploymentTrait, *Environment) {

trait := newDeploymentTrait()
enabled := true
trait.Enabled = &enabled
Expand Down
2 changes: 1 addition & 1 deletion pkg/trait/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newEnvironmentTrait() *environmentTrait {

func (t *environmentTrait) Configure(e *Environment) (bool, error) {
if t.Enabled == nil || *t.Enabled {
return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying), nil
return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying, v1alpha1.IntegrationPhaseRunning), nil
}

return false, nil
Expand Down
Loading