Skip to content

Commit

Permalink
fix(api): Declare integration Flow as json.RawMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti authored and nicolaferraro committed Jul 6, 2020
1 parent fc95c5f commit d8708ca
Show file tree
Hide file tree
Showing 16 changed files with 210 additions and 28 deletions.
4 changes: 1 addition & 3 deletions deploy/crd-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ spec:
type: array
flows:
items:
description: Flow is an unstructured object representing a Camel Flow
in YAML/JSON DSL
type: string
type: object
type: array
kit:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ spec:
type: array
flows:
items:
description: Flow is an unstructured object representing a Camel Flow
in YAML/JSON DSL
type: string
type: object
type: array
kit:
type: string
Expand Down
4 changes: 2 additions & 2 deletions deploy/resources.go

Large diffs are not rendered by default.

20 changes: 13 additions & 7 deletions e2e/knative/knative_platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ import (
"strings"
"testing"

. "github.com/onsi/gomega"

"github.com/stretchr/testify/assert"

corev1 "k8s.io/api/core/v1"

. "github.com/apache/camel-k/e2e/support"
"github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/flow"
"github.com/apache/camel-k/pkg/util/knative"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

func TestKnativePlatformTest(t *testing.T) {
Expand All @@ -53,11 +58,12 @@ func TestKnativePlatformTest(t *testing.T) {
// Change something in the integration to produce a redeploy
Expect(UpdateIntegration(ns, "yaml", func(it *v1.Integration) {
it.Spec.Profile = ""
var flows []v1.Flow
for _, flow := range it.Spec.Flows {
flows = append(flows, v1.Flow(strings.ReplaceAll(string(flow), "string!", "string!!!")))
}
it.Spec.Flows = flows
content, err := flow.Marshal(it.Spec.Flows)
assert.NoError(t, err)
newData := strings.ReplaceAll(string(content), "string!", "string!!!")
newFlows, err := flow.UnmarshalString(newData)
assert.NoError(t, err)
it.Spec.Flows = newFlows
})).To(BeNil())
// Spec profile should be reset by "kamel run"
Eventually(IntegrationSpecProfile(ns, "yaml")).Should(Equal(v1.TraitProfile("")))
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1375,12 +1375,16 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
rsc.io/letsencrypt v0.0.3/go.mod h1:buyQKZ6IXrRnB7TdkHP0RyEybLx18HHyOSoTyoOLqNY=
sigs.k8s.io/controller-runtime v0.5.2 h1:pyXbUfoTo+HA3jeIfr0vgi+1WtmNh0CwlcnQGLXwsSw=
sigs.k8s.io/controller-runtime v0.5.2/go.mod h1:JZUwSMVbxDupo0lTJSSFP5pimEyxGynROImSsqIOx1A=
sigs.k8s.io/controller-tools v0.0.0-20200528125929-5c0c6ae3b64b/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/controller-tools v0.2.4/go.mod h1:m/ztfQNocGYBgTTCmFdnK94uVvgxeZeE3LtJvd/jIzA=
sigs.k8s.io/controller-tools v0.2.8/go.mod h1:9VKHPszmf2DHz/QmHkcfZoewO6BL7pPs9uAiBVsaJSE=
sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e h1:4Z09Hglb792X0kfOBBJUPFEyvVfQWrYT/l8h5EKA6JQ=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/structured-merge-diff/v2 v2.0.1/go.mod h1:Wb7vfKAodbKgf6tn1Kl0VvGj7mRH6DGaRcixXEJXTsE=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI=
4 changes: 1 addition & 3 deletions helm/camel-k/crds/crd-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ spec:
type: array
flows:
items:
description: Flow is an unstructured object representing a Camel Flow
in YAML/JSON DSL
type: string
type: object
type: array
kit:
type: string
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/camel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
sigs.k8s.io/controller-tools v0.3.0 // indirect
// Required to get https://github.com/kubernetes-sigs/controller-tools/pull/428
sigs.k8s.io/controller-tools v0.0.0-20200528125929-5c0c6ae3b64b // indirect
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e // indirect
)
2 changes: 2 additions & 0 deletions pkg/apis/camel/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c/go.mod h1:GRQhZsXIAJ1xR0C
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89 h1:d4vVOjXm687F1iLSP2q3lyPPuyvTUt3aVoBpi2DqRsU=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0=
sigs.k8s.io/controller-tools v0.0.0-20200528125929-5c0c6ae3b64b h1:jVf/McoMd0tHALAJrr4VgEVakuOhEYQ+m00kJTseL3s=
sigs.k8s.io/controller-tools v0.0.0-20200528125929-5c0c6ae3b64b/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/controller-tools v0.3.0 h1:y3YD99XOyWaXkiF1kd41uRvfp/64teWcrEZFuHxPhJ4=
sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e h1:4Z09Hglb792X0kfOBBJUPFEyvVfQWrYT/l8h5EKA6JQ=
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/camel/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package v1

import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -133,4 +135,7 @@ type ResourceCondition interface {
}

// Flow is an unstructured object representing a Camel Flow in YAML/JSON DSL
type Flow string
// +kubebuilder:validation:Type=object
type Flow struct {
json.RawMessage `json:",inline"`
}
27 changes: 26 additions & 1 deletion pkg/apis/camel/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/client/camel/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT7lCHcxMU+mDHEm+nx46H4zuuHZkDP6icnhu0=
sigs.k8s.io/controller-tools v0.0.0-20200528125929-5c0c6ae3b64b/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e h1:4Z09Hglb792X0kfOBBJUPFEyvVfQWrYT/l8h5EKA6JQ=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
Expand Down
7 changes: 6 additions & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/flow"
"github.com/apache/camel-k/pkg/util/gzip"
"github.com/apache/camel-k/pkg/util/kubernetes"
k8slog "github.com/apache/camel-k/pkg/util/kubernetes/log"
Expand Down Expand Up @@ -471,7 +472,11 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string)
}

if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) {
integration.Spec.AddFlows(v1.Flow(data))
flows, err := flow.UnmarshalString(data)
if err != nil {
return nil, err
}
integration.Spec.AddFlows(flows...)
} else {
integration.Spec.AddSources(v1.SourceSpec{
DataSpec: v1.DataSpec{
Expand Down
13 changes: 9 additions & 4 deletions pkg/trait/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/flow"
)

const flowsInternalSourceName = "camel-k-embedded-flow-%d.yaml"
const flowsInternalSourceName = "camel-k-embedded-flow.yaml"

// Internal trait
type initTrait struct {
Expand All @@ -52,11 +53,15 @@ func (t *initTrait) Apply(e *Environment) error {
if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {

// Flows need to be turned into a generated source
for i, flow := range e.Integration.Spec.Flows {
if len(e.Integration.Spec.Flows) > 0 {
content, err := flow.Marshal(e.Integration.Spec.Flows)
if err != nil {
return err
}
e.Integration.Status.AddOrReplaceGeneratedSources(v1.SourceSpec{
DataSpec: v1.DataSpec{
Name: fmt.Sprintf(flowsInternalSourceName, i),
Content: string(flow),
Name: flowsInternalSourceName,
Content: string(content),
},
})
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/util/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/flow"
)

// ComputeForIntegration a digest of the fields that are relevant for the deployment
Expand Down Expand Up @@ -67,8 +68,12 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) {
}

// Integration flows
for _, flow := range integration.Spec.Flows {
if _, err := hash.Write([]byte(flow)); err != nil {
if len(integration.Spec.Flows) > 0 {
flows, err := flow.Marshal(integration.Spec.Flows)
if err != nil {
return "", err
}
if _, err := hash.Write(flows); err != nil {
return "", err
}
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/util/flow/flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flow

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"

yaml2 "gopkg.in/yaml.v2"

"k8s.io/apimachinery/pkg/util/yaml"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
)

// UnmarshalString reads flows contained in a string
func UnmarshalString(flowsString string) ([]v1.Flow, error) {
return Unmarshal(bytes.NewReader([]byte(flowsString)))
}

// Unmarshal flows from a stream
func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
buffered, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
var flows []v1.Flow
// Using the Kubernetes decoder to turn them into JSON before unmarshal.
// This avoids having map[interface{}]interface{} objects which are not JSON compatible.
jsonData, err := yaml.ToJSON(buffered)
if err != nil {
return nil, err
}

if err = json.Unmarshal(jsonData, &flows); err != nil {
return nil, err
}
return flows, err
}

// Marshal flows as byte array
func Marshal(flows []v1.Flow) ([]byte, error) {
data, err := json.Marshal(&flows)
if err != nil {
return nil, err
}
jsondata := make([]map[string]interface{}, 0)
err = json.Unmarshal(data, &jsondata)
if err != nil {
return nil, fmt.Errorf("error unmarshalling json: %v", err)
}
yamldata, err := yaml2.Marshal(&jsondata)
if err != nil {
return nil, fmt.Errorf("error marshalling to yaml: %v", err)
}

return yamldata, nil
}
53 changes: 53 additions & 0 deletions pkg/util/flow/flow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flow

import (
"bytes"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
)

func TestReadWriteYaml(t *testing.T) {
// yaml in conventional form as marshalled by the go runtime
yaml := `- from:
steps:
- to: log:info
uri: timer:tick
`

yamlReader := bytes.NewReader([]byte(yaml))
flows, err := Unmarshal(yamlReader)
assert.NoError(t, err)
assert.NotNil(t, flows)
assert.Len(t, flows, 1)

flow := map[string]interface{}{}
err = json.Unmarshal(flows[0].RawMessage, &flow)
assert.NoError(t, err)

assert.NotNil(t, flow["from"])
assert.Nil(t, flow["xx"])

data, err := Marshal(flows)
assert.NoError(t, err)
assert.NotNil(t, data)
assert.Equal(t, yaml, string(data))
}

0 comments on commit d8708ca

Please sign in to comment.