Skip to content

Commit

Permalink
adding webhook sensor (argoproj#15)
Browse files Browse the repository at this point in the history
* adding webhook as new type of signal
  • Loading branch information
VaibhavPage authored and magaldima committed Jun 8, 2018
1 parent daf02ab commit 47c64a4
Show file tree
Hide file tree
Showing 12 changed files with 428 additions and 4 deletions.
3 changes: 3 additions & 0 deletions cmd/sensor-job/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/blackrock/axis/job/resource"
"github.com/blackrock/axis/pkg/apis/sensor/v1alpha1"
sensorclientset "github.com/blackrock/axis/pkg/client/clientset/versioned"
"github.com/blackrock/axis/job/webhook"
)

func main() {
Expand Down Expand Up @@ -109,6 +110,8 @@ func getSignalRegisters(signals []v1alpha1.Signal) ([]func(*job.ExecutorSession)
registerFuncs = append(registerFuncs, resource.Resource)
case v1alpha1.SignalTypeCalendar:
registerFuncs = append(registerFuncs, calendar.Calendar)
case v1alpha1.SignalTypeWebhook:
registerFuncs = append(registerFuncs, webhook.Webhook)
default:
return registerFuncs, fmt.Errorf("%s signal type not supported", signal.GetType())
}
Expand Down
9 changes: 9 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ const (
// EnvVarKubeConfig is the path to the Kubernetes configuration
EnvVarKubeConfig = "KUBE_CONFIG"
)

// SENSOR JOB CONSTANTS
const (
// WebhookServicePort is the port of the service
WebhookServicePort = 9000

// WebhookServiceTargetPort is the port of the targeted job
WebhookServiceTargetPort = 9000
)
5 changes: 5 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func CreateJobPrefix(name string) string {
return name + "-" + sensor.Singular
}

// CreateServiceSuffix formats the service name backed by sensor job
func CreateServiceSuffix(name string) string {
return name + "-svc"
}

// ParseJobPrefix and return the sensorName
func ParseJobPrefix(job string) string {
// first trim the ID at the end, should be everything after and including the last '-'
Expand Down
41 changes: 39 additions & 2 deletions controller/sensorjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/blackrock/axis/common"
"github.com/blackrock/axis/pkg/apis/sensor/v1alpha1"
"k8s.io/apimachinery/pkg/util/intstr"
)

var (
Expand Down Expand Up @@ -102,11 +103,47 @@ func (soc *sOperationCtx) createSensorExecutorJob() error {
job.Spec.Template.ObjectMeta.Labels[common.LabelKeySensorControllerInstanceID] = soc.controller.Config.InstanceID
}

created, err := soc.controller.kubeClientset.BatchV1().Jobs(soc.s.ObjectMeta.Namespace).Create(&job)
createdJob, err := soc.controller.kubeClientset.BatchV1().Jobs(soc.s.ObjectMeta.Namespace).Create(&job)
if err != nil {
soc.log.Warnf("Failed to create executor job", zap.Error(err))
return err
}
soc.log.Infof("created signal executor job '%s'", created.Name)
soc.log.Infof("created signal executor job '%s'", createdJob.Name)

// If signal is of type Webhook, create a service backed by the job
for _, signal := range soc.s.Spec.Signals {
if signal.GetType() == v1alpha1.SignalTypeWebhook {
targetPort := signal.Webhook.Port
if signal.Webhook.Port == 0 {
targetPort = common.WebhookServiceTargetPort
}
webhookSvc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: common.CreateServiceSuffix(soc.s.Name),
Namespace: soc.s.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(soc.s, v1alpha1.SchemaGroupVersionKind),
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Selector: createdJob.Labels,
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: common.WebhookServicePort,
TargetPort: intstr.FromInt(targetPort),
},
},
},
}
createdSvc, err := soc.controller.kubeClientset.CoreV1().Services(soc.s.ObjectMeta.Namespace).Create(&webhookSvc)
if err != nil {
soc.log.Warnf("Failed to create executor service", zap.Error(err))
return err
}
soc.log.Infof("Created executor service %s", createdSvc.Name)
}
}
return nil
}
34 changes: 34 additions & 0 deletions examples/webhook-sensor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: core.events/v1alpha1
kind: Sensor
metadata:
name: webhook-example
namespace: cloud-native-scheduler
labels:
sensors.core.events/controller-instanceid: axis
spec:
repeat: true
signals:
- name: webhook
webhook:
port: 9000
endpoint: "/app"
method: "POST"
triggers:
- name: done-workflow
resource:
namespace: cloud-native-scheduler
group: argoproj.io
version: v1alpha1
kind: Workflow
artifactLocation:
s3:
bucket: workflows
key: hello-world.yaml
endpoint: minio-service.cloud-native-scheduler:9000
insecure: true
accessKey:
key: accesskey
name: artifacts-minio
secretKey:
key: secretkey
name: artifacts-minio
51 changes: 51 additions & 0 deletions job/webhook/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"time"

"github.com/blackrock/axis/job"
)

type event struct {
job.AbstractEvent
webhook *webhook
payload []byte
requestHost string
timestamp time.Time
}

func (e *event) GetID() string {
return e.webhook.AbstractSignal.GetID()
}

func (e *event) GetSource() string {
return e.requestHost
}

func (e *event) GetSignal() job.Signal {
return e.webhook
}

func (e *event) GetBody() []byte {
return e.payload
}

func (e *event) GetTimestamp() time.Time {
return e.timestamp
}
21 changes: 21 additions & 0 deletions job/webhook/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package webhook

import (
"github.com/blackrock/axis/job"
"github.com/blackrock/axis/pkg/apis/sensor/v1alpha1"
"go.uber.org/zap"
)

type factory struct{}

func (f *factory) Create(abstract job.AbstractSignal) job.Signal {
abstract.Log.Info("creating signal", zap.String("endpoint", abstract.Webhook.Endpoint))
return &webhook{
AbstractSignal: abstract,
}
}

// Webhook will be added to the executor session
func Webhook(es *job.ExecutorSession) {
es.AddFactory(v1alpha1.SignalTypeWebhook, &factory{})
}
90 changes: 90 additions & 0 deletions job/webhook/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"net/http"
"go.uber.org/zap"

"github.com/blackrock/axis/job"
"io"
"strconv"
"fmt"
"time"
"io/ioutil"
)

type webhook struct {
job.AbstractSignal
events chan job.Event
server *http.Server
payload io.ReadCloser
}

// Handler for the http rest endpoint
func (w *webhook) handler(writer http.ResponseWriter, request *http.Request) {
w.Log.Info("received a request from", zap.String("host", request.Host))
if request.Method == w.Webhook.Method {
payload, err := ioutil.ReadAll(request.Body)
if err != nil {
w.Log.Warn("unable to process request payload. Cause: %v", zap.Error(err))
writer.WriteHeader(http.StatusInternalServerError)
return
}
event := &event{
webhook: w,
requestHost: request.Host,
timestamp: time.Now().UTC(),
payload: payload,
}
w.events <- event
writer.WriteHeader(http.StatusOK)
} else {
w.Log.Warn("HTTP method mismatch", zap.String("actual", request.Method), zap.String("expected", w.Webhook.Method))
writer.WriteHeader(http.StatusBadRequest)
}
}

// Start signal
func (w *webhook) Start(events chan job.Event) error {
w.events = events
port := strconv.Itoa(w.AbstractSignal.Webhook.Port)
endpoint := w.AbstractSignal.Webhook.Endpoint
// Attach handler
http.HandleFunc(endpoint, w.handler)
w.server = &http.Server{Addr: fmt.Sprintf(":%s", port)}

// Start http server
go func() {
err := w.server.ListenAndServe()
if err == http.ErrServerClosed {
w.Log.Info("server successfully shutdown")
} else {
panic(fmt.Errorf("error occurred while server listening. Cause: %v", err))
}
}()
return nil
}

// Stop signal
func (w *webhook) Stop() error {
err := w.server.Shutdown(nil)
if err != nil {
return fmt.Errorf("unable to shutdown server. Cause: %v", err)
}
return nil
}
89 changes: 89 additions & 0 deletions job/webhook/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2018 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"testing"
"github.com/blackrock/axis/job"
"go.uber.org/zap"
"github.com/blackrock/axis/pkg/apis/sensor/v1alpha1"
"github.com/stretchr/testify/assert"
"github.com/blackrock/axis/common"
"net/http"
"fmt"
"strings"
"net"
"time"
)

var client = &http.Client{}

func createWebhookSignal(t *testing.T, httpMethod string, endpoint string) job.Signal {
es := job.New(nil, nil, zap.NewNop())
Webhook(es)
webhookFactory, ok := es.GetFactory(v1alpha1.SignalTypeWebhook)
assert.True(t, ok, "webhook factory not found")
abstractSignal := job.AbstractSignal{
Signal: v1alpha1.Signal{
Webhook: &v1alpha1.WebhookSignal{
Port: common.WebhookServiceTargetPort,
Endpoint: endpoint,
Method: httpMethod,
},
},
Log: zap.NewNop(),
Session: es,
}
webhookSignal := webhookFactory.Create(abstractSignal)
return webhookSignal
}

func handleEvent(t *testing.T, testEventChan chan job.Event) {
event := <-testEventChan
assert.Equal(t, event.GetSource(), fmt.Sprintf("localhost:%d", common.WebhookServicePort))
}

func makeAPIRequest(t *testing.T, httpMethod string, endpoint string) {
webhookSignal := createWebhookSignal(t, httpMethod, endpoint)
testEventChan := make(chan job.Event)
webhookSignal.Start(testEventChan)

go handleEvent(t, testEventChan)

conn, err := net.DialTimeout("tcp", fmt.Sprintf(":%d", common.WebhookServicePort), time.Second * 5)
// Server has started
if err == nil {
conn.Close()
request, err := http.NewRequest(httpMethod, fmt.Sprintf("http://localhost:%d%s", common.WebhookServicePort, endpoint), strings.NewReader("{name: x}"))
if err != nil {
assert.Fail(t, "unable to create http request", err)
}
resp, err := client.Do(request)
assert.Nil(t, err)
assert.Equal(t, resp.Status, "200 OK")
err = webhookSignal.Stop()
assert.Equal(t, err, nil)
} else {
assert.Fail(t, "unable to connect to http server")
}
}

func TestSignal(t *testing.T) {
makeAPIRequest(t, http.MethodPost, "/post")
makeAPIRequest(t, http.MethodPut, "/put")
makeAPIRequest(t, http.MethodDelete, "/delete")
}
Loading

0 comments on commit 47c64a4

Please sign in to comment.