Skip to content

Commit

Permalink
Fix #1107: disable applier code to detect real CI errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Jan 12, 2022
1 parent 5520b63 commit 885d2bd
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 11 deletions.
94 changes: 91 additions & 3 deletions pkg/install/kamelets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,47 @@ package install

import (
"context"
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/kubernetes"
"k8s.io/apimachinery/pkg/runtime"
"github.com/apache/camel-k/pkg/util/patch"
)

const (
kameletDirEnv = "KAMELET_CATALOG_DIR"
defaultKameletDir = "/kamelets/"
)

var (
log = logf.Log

hasServerSideApply atomic.Value
tryServerSideApply sync.Once
)

// KameletCatalog installs the bundled Kamelets into the specified namespace.
func KameletCatalog(ctx context.Context, c client.Client, namespace string) error {
kameletDir := os.Getenv(kameletDirEnv)
Expand All @@ -58,7 +77,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
}

g, gCtx := errgroup.WithContext(ctx)
applier := c.ServerOrClientSideApplier()

err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error {
if err != nil {
return err
Expand All @@ -75,9 +94,31 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
if err != nil {
return err
}
if err := applier.Apply(gCtx, kamelet); err != nil {
once := false
tryServerSideApply.Do(func() {
once = true
if err = serverSideApply(gCtx, c, kamelet); err != nil {
if isIncompatibleServerError(err) {
log.Info("Fallback to client-side apply for installing bundled Kamelets")
hasServerSideApply.Store(false)
err = nil
} else {
tryServerSideApply = sync.Once{}
}
} else {
hasServerSideApply.Store(true)
}
})
if err != nil {
return err
}
if v := hasServerSideApply.Load(); v.(bool) {
if !once {
return serverSideApply(gCtx, c, kamelet)
}
} else {
return clientSideApply(gCtx, c, kamelet)
}
return nil
})
return nil
Expand All @@ -89,6 +130,53 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro
return g.Wait()
}

func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error {
target, err := patch.PositiveApplyPatch(resource)
if err != nil {
return err
}
return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
}

func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error {
err := c.Create(ctx, resource)
if err == nil {
return nil
} else if !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
}
object := &unstructured.Unstructured{}
object.SetNamespace(resource.GetNamespace())
object.SetName(resource.GetName())
object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object)
if err != nil {
return err
}
p, err := patch.PositiveMergePatch(object, resource)
if err != nil {
return err
} else if len(p) == 0 {
return nil
}
return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
}

func isIncompatibleServerError(err error) bool {
// First simpler check for older servers (i.e. OpenShift 3.11)
if strings.Contains(err.Error(), "415: Unsupported Media Type") {
return true
}
// 415: Unsupported media type means we're talking to a server which doesn't
// support server-side apply.
var serr *k8serrors.StatusError
if errors.As(err, &serr) {
return serr.Status().Code == http.StatusUnsupportedMediaType
}
// Non-StatusError means the error isn't because the server is incompatible.
return false
}

func loadKamelet(path string, namespace string, scheme *runtime.Scheme) (*v1alpha1.Kamelet, error) {
content, err := util.ReadFile(path)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/resources/resources.go

Large diffs are not rendered by default.

108 changes: 106 additions & 2 deletions pkg/trait/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ limitations under the License.

package trait

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/util/patch"
)

// The deployer trait is responsible for deploying the resources owned by the integration, and can be used
// to explicitly select the underlying controller that will manage the integration pods.
//
Expand All @@ -29,6 +45,8 @@ type deployerTrait struct {

var _ ControllerStrategySelector = &deployerTrait{}

var hasServerSideApply = true

func newDeployerTrait() Trait {
return &deployerTrait{
BaseTrait: NewBaseTrait("deployer", 900),
Expand All @@ -42,9 +60,28 @@ func (t *deployerTrait) Configure(e *Environment) (bool, error) {
func (t *deployerTrait) Apply(e *Environment) error {
// Register a post action that patches the resources generated by the traits
e.PostActions = append(e.PostActions, func(env *Environment) error {
applier := e.Client.ServerOrClientSideApplier()
for _, resource := range env.Resources.Items() {
if err := applier.Apply(e.Ctx, resource); err != nil {
// We assume that server-side apply is enabled by default.
// It is currently convoluted to check pro-actively whether server-side apply
// is enabled. This is possible to fetch the OpenAPI endpoint, which returns
// the entire server API document, then lookup the resource PATCH endpoint, and
// check its list of accepted MIME types.
// As a simpler solution, we fall back to client-side apply at the first
// 415 error, and assume server-side apply is not available globally.
if hasServerSideApply {
err := t.serverSideApply(env, resource)
switch {
case err == nil:
continue
case isIncompatibleServerError(err):
t.L.Info("Fallback to client-side apply to patch resources")
hasServerSideApply = false
default:
// Keep server-side apply unless server is incompatible with it
return err
}
}
if err := t.clientSideApply(env, resource); err != nil {
return err
}
}
Expand All @@ -54,6 +91,73 @@ func (t *deployerTrait) Apply(e *Environment) error {
return nil
}

func (t *deployerTrait) serverSideApply(env *Environment, resource ctrl.Object) error {
target, err := patch.PositiveApplyPatch(resource)
if err != nil {
return err
}
err = env.Client.Patch(env.Ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
if err != nil {
return fmt.Errorf("error during apply resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
}
// Update the resource with the response returned from the API server
return t.unstructuredToRuntimeObject(target, resource)
}

func (t *deployerTrait) clientSideApply(env *Environment, resource ctrl.Object) error {
err := env.Client.Create(env.Ctx, resource)
if err == nil {
return nil
} else if !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
}
object := &unstructured.Unstructured{}
object.SetNamespace(resource.GetNamespace())
object.SetName(resource.GetName())
object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind())
err = env.Client.Get(env.Ctx, ctrl.ObjectKeyFromObject(object), object)
if err != nil {
return err
}
p, err := patch.PositiveMergePatch(object, resource)
if err != nil {
return err
} else if len(p) == 0 {
// Update the resource with the object returned from the API server
return t.unstructuredToRuntimeObject(object, resource)
}
err = env.Client.Patch(env.Ctx, resource, ctrl.RawPatch(types.MergePatchType, p))
if err != nil {
return fmt.Errorf("error during patch %s/%s: %w", resource.GetNamespace(), resource.GetName(), err)
}
return nil
}

func (t *deployerTrait) unstructuredToRuntimeObject(u *unstructured.Unstructured, obj ctrl.Object) error {
data, err := json.Marshal(u)
if err != nil {
return err
}
return json.Unmarshal(data, obj)
}

func isIncompatibleServerError(err error) bool {
// First simpler check for older servers (i.e. OpenShift 3.11)
if strings.Contains(err.Error(), "415: Unsupported Media Type") {
return true
}

// 415: Unsupported media type means we're talking to a server which doesn't
// support server-side apply.
var serr *k8serrors.StatusError
if errors.As(err, &serr) {
return serr.Status().Code == http.StatusUnsupportedMediaType
}

// Non-StatusError means the error isn't because the server is incompatible.
return false
}

func (t *deployerTrait) SelectControllerStrategy(e *Environment) (*ControllerStrategy, error) {
if IsFalse(t.Enabled) {
return nil, nil
Expand Down

0 comments on commit 885d2bd

Please sign in to comment.