Skip to content

Commit

Permalink
feat: remove wf dep (#758)
Browse files Browse the repository at this point in the history
* feat: remove wf dep

* docs: add updated go sum

* feat: remove openfaas cli from sensor

* chore: update argo-cli binary and remove faas-cli from assets
  • Loading branch information
VaibhavPage authored Jul 19, 2020
1 parent f3f28a4 commit 073995c
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 256 deletions.
Binary file modified assets/argo-linux-amd64
Binary file not shown.
Binary file removed assets/faas-cli
Binary file not shown.
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ require (
github.com/ahmetb/gen-crd-api-reference-docs v0.2.0
github.com/antonmedv/expr v1.8.8
github.com/apache/openwhisk-client-go v0.0.0-20190915054138-716c6f973eb2
github.com/argoproj/argo v2.5.2+incompatible
github.com/argoproj/argo-cd v1.5.1
github.com/argoproj/argo-rollouts v0.7.2
github.com/argoproj/pkg v0.0.0-20200319004004-f46beff7cd54 // indirect
github.com/aws/aws-sdk-go v1.30.7
github.com/cloudevents/sdk-go/v2 v2.1.0
Expand Down
84 changes: 0 additions & 84 deletions go.sum

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions sensors/cmd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
FROM centos:8
RUN yum -y update && yum -y install ca-certificates openssh openssh-server openssh-clients openssl-libs curl git

# OpenFass CLI
COPY assets/faas-cli /usr/local/bin/faas

# Argo Workflow CLI
COPY assets/argo-linux-amd64 /usr/local/bin/argo

RUN argo version
RUN faas version

RUN mkdir /.openfaas
RUN chmod 777 /.openfaas

RUN mkdir /bin/workflows
RUN chmod -R 777 /bin/workflows

COPY dist/sensor /bin/
ENTRYPOINT [ "/bin/sensor" ]
51 changes: 18 additions & 33 deletions sensors/triggers/argo-workflow/argo-workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ limitations under the License.
package argo_workflow

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"

wf_v1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/sensors/policy"
"github.com/argoproj/argo-events/sensors/triggers"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,10 +32,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/sensors/policy"
"github.com/argoproj/argo-events/sensors/triggers"
)

// ArgoWorkflowTrigger implements Trigger interface for Argo workflow
Expand Down Expand Up @@ -67,7 +64,7 @@ func NewArgoWorkflowTrigger(k8sClient kubernetes.Interface, dynamicClient dynami
// FetchResource fetches the trigger resource from external source
func (t *ArgoWorkflowTrigger) FetchResource() (interface{}, error) {
trigger := t.Trigger
return triggers.FetchKubernetesResource(t.K8sClient, trigger.Template.ArgoWorkflow.Source, t.Sensor.Namespace, trigger.Template.ArgoWorkflow.GroupVersionResource)
return triggers.FetchKubernetesResource(t.K8sClient, trigger.Template.ArgoWorkflow.Source, t.Sensor.Namespace)
}

// ApplyResourceParameters applies parameters to the trigger resource
Expand Down Expand Up @@ -96,26 +93,19 @@ func (t *ArgoWorkflowTrigger) Execute(events map[string]*v1alpha1.Event, resourc
return nil, err
}

var workflow *wf_v1alpha1.Workflow
if err := json.Unmarshal(jObj, &workflow); err != nil {
return nil, errors.Wrap(err, "internal un-marshalling of the trigger resource failed")
name := obj.GetName()

if name == "" {
name = obj.GetGenerateName()
}
if name == "" {
return nil, fmt.Errorf("failed to trigger the workflow, no name is given")
}

namespace := obj.GetNamespace()
// Defaults to sensor's namespace
if namespace == "" {
namespace = t.Sensor.Namespace
}
obj.SetNamespace(namespace)

if workflow.Name == "" && workflow.GenerateName == "" {
return nil, errors.New("workflow is malformed. neither name nor generateName is specified")
}

name := workflow.Name
if name == "" && workflow.GenerateName != "" {
name = workflow.GenerateName
}

op := v1alpha1.Submit
if trigger.Template.ArgoWorkflow.Operation != "" {
Expand All @@ -126,19 +116,14 @@ func (t *ArgoWorkflowTrigger) Execute(events map[string]*v1alpha1.Event, resourc

switch op {
case v1alpha1.Submit:
file, err := ioutil.TempFile("/bin/workflows", workflow.Name)
file, err := ioutil.TempFile("", name)
if err != nil {
return nil, errors.Wrapf(err, "failed to create a temp file for the workflow %s", name)
return nil, errors.Wrapf(err, "failed to create a temp file for the workflow %s", obj.GetName())
}
defer os.Remove(file.Name())

workflowYaml, err := yaml.Marshal(workflow)
if err != nil {
return nil, errors.Wrap(err, "internal marshalling to YAML of the trigger resource failed")
}

if _, err := file.Write(workflowYaml); err != nil {
return nil, errors.Wrapf(err, "failed to write workflow yaml %s to the temp file %s", name, file.Name())
if _, err := file.Write(jObj); err != nil {
return nil, errors.Wrapf(err, "failed to write workflow json %s to the temp file %s", name, file.Name())
}
cmd = exec.Command("argo", "-n", namespace, "submit", file.Name())
case v1alpha1.Resubmit:
Expand All @@ -160,8 +145,8 @@ func (t *ArgoWorkflowTrigger) Execute(events map[string]*v1alpha1.Event, resourc
}

t.namespableDynamicClient = t.DynamicClient.Resource(schema.GroupVersionResource{
Group: workflow.GroupVersionKind().Group,
Version: workflow.GroupVersionKind().Version,
Group: "argoproj.io",
Version: "v1alpha1",
Resource: "workflows",
})

Expand Down
7 changes: 3 additions & 4 deletions sensors/triggers/argo-workflow/argo-workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package argo_workflow
import (
"testing"

"github.com/argoproj/argo-events/common/logging"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
dynamicFake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"

"github.com/argoproj/argo-events/common/logging"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

var sensorObj = &v1alpha1.Sensor{
Expand Down
5 changes: 2 additions & 3 deletions sensors/triggers/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package triggers

import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/store"
)

func FetchKubernetesResource(client kubernetes.Interface, source *v1alpha1.ArtifactLocation, namespace string, gvr metav1.GroupVersionResource) (*unstructured.Unstructured, error) {
func FetchKubernetesResource(client kubernetes.Interface, source *v1alpha1.ArtifactLocation, namespace string) (*unstructured.Unstructured, error) {
if source == nil {
return nil, errors.Errorf("trigger source for k8s is empty")
}
Expand All @@ -38,7 +37,7 @@ func FetchKubernetesResource(client kubernetes.Interface, source *v1alpha1.Artif
if err != nil {
return nil, err
}
uObj, err := store.FetchArtifact(reader, gvr)
uObj, err := store.FetchArtifact(reader)
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions sensors/triggers/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand All @@ -33,11 +32,7 @@ func TestFetchKubernetesResource(t *testing.T) {
sensorObj.Spec.Triggers[0].Template.K8s.Source = &v1alpha1.ArtifactLocation{
Resource: &artifact,
}
uObj, err := FetchKubernetesResource(fake.NewSimpleClientset(), sensorObj.Spec.Triggers[0].Template.K8s.Source, sensorObj.Namespace, metav1.GroupVersionResource{
Group: "argoproj.io",
Version: "v1alpha1",
Resource: "workflows",
})
uObj, err := FetchKubernetesResource(fake.NewSimpleClientset(), sensorObj.Spec.Triggers[0].Template.K8s.Source, sensorObj.Namespace)
assert.Nil(t, err)
assert.NotNil(t, uObj)
assert.Equal(t, deployment.GetName(), uObj.GetName())
Expand Down
2 changes: 1 addition & 1 deletion sensors/triggers/standard-k8s/standar-k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (k8sTrigger *StandardK8sTrigger) FetchResource() (interface{}, error) {

// uObj will either hold the resource definition stored in the trigger or just
// a stub to provide enough information to fetch the object from K8s cluster
uObj, err := store.FetchArtifact(reader, trigger.Template.K8s.GroupVersionResource)
uObj, err := store.FetchArtifact(reader)
if err != nil {
return nil, err
}
Expand Down
23 changes: 9 additions & 14 deletions store/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,26 @@ package store
import (
"testing"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/smartystreets/goconvey/convey"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

func TestConfigmapReader_Read(t *testing.T) {
kubeClientset := fake.NewSimpleClientset()
key := "wf"

cmArtifact := &v1alpha1.ConfigmapArtifact{
Name: "wf-configmap",
Namespace: "argo-events",
Name: "fake-cm",
Namespace: "fake-ns",
Key: key,
}

configmap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmArtifact.Name,
Namespace: cmArtifact.Namespace,
Name: "fake-cm",
Namespace: "fake-ns",
},
Data: map[string]string{
key: `apiVersion: argoproj.io/v1alpha1
Expand All @@ -46,7 +43,7 @@ spec:
}

convey.Convey("Given a configmap", t, func() {
cm, err := kubeClientset.CoreV1().ConfigMaps(cmArtifact.Namespace).Create(configmap)
cm, err := kubeClientset.CoreV1().ConfigMaps("fake-ns").Create(configmap)
convey.So(err, convey.ShouldBeNil)
convey.So(cm, convey.ShouldNotBeNil)

Expand All @@ -58,11 +55,9 @@ spec:
convey.Convey("Create a workflow from configmap minio", func() {
resourceBody, err := cmReader.Read()
convey.So(err, convey.ShouldBeNil)

var wf *wfv1.Workflow
err = yaml.Unmarshal(resourceBody, &wf)
obj, err := decodeAndUnstructure(resourceBody)
convey.So(err, convey.ShouldBeNil)
convey.So(wf.Name, convey.ShouldEqual, "hello-world")
convey.So(obj.GetName(), convey.ShouldEqual, "hello-world")
})
})
})
Expand Down
60 changes: 9 additions & 51 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,10 @@ package store
import (
"fmt"

cd_v1alpha1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
rollouts_v1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
wf_v1alpha1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/ghodss/yaml"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"

gw_v1alpha1 "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1"
ss_v1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

// NOTE: custom resources must be manually added here
func init() {
if err := wf_v1alpha1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
if err := ss_v1alpha1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
if err := gw_v1alpha1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
if err := rollouts_v1alpha1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
if err := cd_v1alpha1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
}

var (
registry = runtime.NewEquivalentResourceRegistry()
)

// ArtifactReader enables reading artifacts from an external store
Expand All @@ -62,18 +31,18 @@ type ArtifactReader interface {
}

// FetchArtifact from the location, decode it using explicit types, and unstructure it
func FetchArtifact(reader ArtifactReader, gvr metav1.GroupVersionResource) (*unstructured.Unstructured, error) {
func FetchArtifact(reader ArtifactReader) (*unstructured.Unstructured, error) {
var err error
var obj []byte
obj, err = reader.Read()
if err != nil {
return nil, err
}
return decodeAndUnstructure(obj, gvr)
return decodeAndUnstructure(obj)
}

// GetArtifactReader returns the ArtifactReader for this location
func GetArtifactReader(loc *ss_v1alpha1.ArtifactLocation, creds *Credentials, clientset kubernetes.Interface) (ArtifactReader, error) {
func GetArtifactReader(loc *v1alpha1.ArtifactLocation, creds *Credentials, clientset kubernetes.Interface) (ArtifactReader, error) {
if loc.S3 != nil {
return NewS3Reader(loc.S3, creds)
}
Expand All @@ -98,21 +67,10 @@ func GetArtifactReader(loc *ss_v1alpha1.ArtifactLocation, creds *Credentials, cl
return nil, fmt.Errorf("unknown artifact location: %v", *loc)
}

func decodeAndUnstructure(b []byte, gvr metav1.GroupVersionResource) (*unstructured.Unstructured, error) {
gvk := registry.KindFor(schema.GroupVersionResource{
Group: gvr.Group,
Version: gvr.Version,
Resource: gvr.Resource,
}, "")

obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(b, &gvk, nil)
if err != nil {
return nil, err
}

uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
func decodeAndUnstructure(b []byte) (*unstructured.Unstructured, error) {
var result map[string]interface{}
if err := yaml.Unmarshal(b, &result); err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: uObj}, nil
return &unstructured.Unstructured{Object: result}, nil
}
Loading

0 comments on commit 073995c

Please sign in to comment.