Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event types #433

Merged
merged 19 commits into from
May 28, 2018
3 changes: 2 additions & 1 deletion cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func main() {

// Implementation of function and subscription services
service := &eventgateway.Service{
EventTypeStore: intstore.NewPrefixed("/serverless-event-gateway/eventtypes", kvstore),
FunctionStore: intstore.NewPrefixed("/serverless-event-gateway/functions", kvstore),
SubscriptionStore: intstore.NewPrefixed("/serverless-event-gateway/subscriptions", kvstore),
Log: log,
Expand All @@ -113,7 +114,7 @@ func main() {
ShutdownGuard: shutdownGuard,
})

httpapi.StartConfigAPI(service, service, httpapi.ServerConfig{
httpapi.StartConfigAPI(service, service, service, httpapi.ServerConfig{
TLSCrt: configTLSCrt,
TLSKey: configTLSKey,
Port: *configPort,
Expand Down
11 changes: 11 additions & 0 deletions docs/prometheus-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@ Both Events and Configuration API exposes Prometheus metrics. The metrics are ac
| `gateway_events_backlog` | Gauge of asynchronous events count waiting to be processed. | Gauge | |
| `gateway_events_custom_processing_seconds` | Bucketed histogram of processing duration of an event. From receiving the asynchronous custom event to calling a function. | Histogram | |

### Labels

- `space` - space name
- `type` - event type name

## Configuration API Metrics

| Metric Name | Description | Type | Labels |
| ----------------------------------------- | ------------------------------------------------------------ | --------- | --------------------------------- |
| `gateway_eventtypes_total` | Gauge of registered event types count. | Gauge | `space` |
| `gateway_functions_total` | Gauge of registered functions count. | Gauge | `space` |
| `gateway_subscriptions_total` | Gauge of created subscriptions count. | Gauge | `space` |
| `gateway_config_requests_total` | Total of Config API requests. | Counter | `space`, `resource`, `operation` |
| `gateway_config_request_duration_seconds` | Bucketed histogram of request duration of Config API requests. | Histogram | |
### Labels

- `space` - space name
- `resource` - Configuration API resource, possible values: `eventtype`, `function` or `subscription`
- `operation` - Configuration API operation, possible values: `create`, `get`, `delete`, `list`, `update`
34 changes: 34 additions & 0 deletions event/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,40 @@ package event

import "fmt"

// ErrEventTypeNotFound occurs when event type cannot be found.
type ErrEventTypeNotFound struct {
Name TypeName
}

func (e ErrEventTypeNotFound) Error() string {
return fmt.Sprintf("Event Type %q not found.", e.Name)
}

// ErrEventTypeAlreadyExists occurs when event type with specified name already exists.
type ErrEventTypeAlreadyExists struct {
Name TypeName
}

func (e ErrEventTypeAlreadyExists) Error() string {
return fmt.Sprintf("Event Type %q already exists.", e.Name)
}

// ErrEventTypeValidation occurs when event type payload doesn't validate.
type ErrEventTypeValidation struct {
Message string
}

func (e ErrEventTypeValidation) Error() string {
return fmt.Sprintf("Event Type doesn't validate. Validation error: %s", e.Message)
}

// ErrEventTypeHasSubscriptionsError occurs when there are subscription for the event type.
type ErrEventTypeHasSubscriptionsError struct{}

func (e ErrEventTypeHasSubscriptionsError) Error() string {
return fmt.Sprintf("Event type cannot be deleted because there are subscriptions using it.")
}

// ErrParsingCloudEvent occurs when payload is not valid CloudEvent.
type ErrParsingCloudEvent struct {
Message string
Expand Down
16 changes: 5 additions & 11 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,7 @@ import (
"gopkg.in/go-playground/validator.v9"
)

// Type uniquely identifies an event type.
type Type string

const (
// TypeHTTPRequest is a special type of event for sync http subscriptions.
TypeHTTPRequest = Type("http.request")

// TransformationVersion is indicative of the revision of how Event Gateway transforms a request into CloudEvents format.
TransformationVersion = "0.1"

Expand All @@ -35,7 +29,7 @@ const (
// Event is a default event structure. All data that passes through the Event Gateway
// is formatted to a format defined CloudEvents v0.1 spec.
type Event struct {
EventType Type `json:"eventType" validate:"required"`
EventType TypeName `json:"eventType" validate:"required"`
EventTypeVersion string `json:"eventTypeVersion,omitempty"`
CloudEventsVersion string `json:"cloudEventsVersion" validate:"required"`
Source string `json:"source" validate:"uri,required"`
Expand All @@ -48,7 +42,7 @@ type Event struct {
}

// New return new instance of Event.
func New(eventType Type, mimeType string, payload interface{}) *Event {
func New(eventType TypeName, mimeType string, payload interface{}) *Event {
event := &Event{
EventType: eventType,
CloudEventsVersion: CloudEventsVersion,
Expand Down Expand Up @@ -99,12 +93,12 @@ func FromRequest(r *http.Request) (*Event, error) {
if mimeType == mimeJSON { // CloudEvent in Legacy Mode
event, err = parseAsCloudEvent(mimeType, body)
if err != nil {
return New(Type(r.Header.Get("event")), mimeType, body), nil
return New(TypeName(r.Header.Get("event")), mimeType, body), nil
}
return event, err
}

return New(Type(r.Header.Get("event")), mimeType, body), nil
return New(TypeName(r.Header.Get("event")), mimeType, body), nil
}

return New(TypeHTTPRequest, mimeCloudEventsJSON, NewHTTPRequestData(r, body)), nil
Expand Down Expand Up @@ -172,7 +166,7 @@ func isCloudEventsBinaryContentMode(headers http.Header) bool {

func parseAsCloudEventBinary(headers http.Header, payload interface{}) (*Event, error) {
event := &Event{
EventType: Type(headers.Get("CE-EventType")),
EventType: TypeName(headers.Get("CE-EventType")),
EventTypeVersion: headers.Get("CE-EventTypeVersion"),
CloudEventsVersion: headers.Get("CE-CloudEventsVersion"),
Source: headers.Get("CE-Source"),
Expand Down
22 changes: 11 additions & 11 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestNew(t *testing.T) {

func TestNew_Encoding(t *testing.T) {
for _, testCase := range encodingTests {
result := eventpkg.New(eventpkg.Type("test.event"), testCase.contentType, testCase.body)
result := eventpkg.New(eventpkg.TypeName("test.event"), testCase.contentType, testCase.body)

assert.Equal(t, testCase.expectedBody, result.Data)
}
Expand Down Expand Up @@ -62,18 +62,18 @@ func TestFromRequest(t *testing.T) {

var newTests = []struct {
name string
eventType eventpkg.Type
eventType eventpkg.TypeName
mime string
payload interface{}
expectedEvent eventpkg.Event
}{
{
name: "not CloudEvent",
eventType: eventpkg.Type("user.created"),
eventType: eventpkg.TypeName("user.created"),
mime: "application/json",
payload: []byte("test"),
expectedEvent: eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand All @@ -88,11 +88,11 @@ var newTests = []struct {
},
{
name: "system event",
eventType: eventpkg.Type("user.created"),
eventType: eventpkg.TypeName("user.created"),
mime: "application/json",
payload: eventpkg.SystemEventReceivedData{},
expectedEvent: eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand Down Expand Up @@ -158,7 +158,7 @@ var fromRequestTests = []struct {
"data": "test"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "http://example.com",
ContentType: "text/plain",
Expand Down Expand Up @@ -188,7 +188,7 @@ var fromRequestTests = []struct {
},
requestBody: []byte("hey there"),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("myevent"),
EventType: eventpkg.TypeName("myevent"),
CloudEventsVersion: "0.1",
Source: "https://example.com",
ContentType: "text/plain",
Expand Down Expand Up @@ -217,7 +217,7 @@ var fromRequestTests = []struct {
},
requestBody: []byte("hey there"),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("myevent"),
EventType: eventpkg.TypeName("myevent"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/octet-stream",
Expand All @@ -241,7 +241,7 @@ var fromRequestTests = []struct {
"data": "test"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "http://example.com",
ContentType: "text/plain",
Expand All @@ -258,7 +258,7 @@ var fromRequestTests = []struct {
"eventType": "user.created"
}`),
expectedEvent: &eventpkg.Event{
EventType: eventpkg.Type("user.created"),
EventType: eventpkg.TypeName("user.created"),
CloudEventsVersion: "0.1",
Source: "https://serverless.com/event-gateway/#transformationVersion=0.1",
ContentType: "application/json",
Expand Down
9 changes: 9 additions & 0 deletions event/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package event

// Service represents service for managing event types.
type Service interface {
CreateEventType(eventType *Type) (*Type, error)
GetEventType(space string, name TypeName) (*Type, error)
GetEventTypes(space string) (Types, error)
DeleteEventType(space string, name TypeName) error
}
8 changes: 4 additions & 4 deletions event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// SystemEventReceivedType is a system event emitted when the Event Gateway receives an event.
const SystemEventReceivedType = Type("gateway.event.received")
const SystemEventReceivedType = TypeName("gateway.event.received")

// SystemEventReceivedData struct.
type SystemEventReceivedData struct {
Expand All @@ -15,7 +15,7 @@ type SystemEventReceivedData struct {
}

// SystemFunctionInvokingType is a system event emitted before invoking a function.
const SystemFunctionInvokingType = Type("gateway.function.invoking")
const SystemFunctionInvokingType = TypeName("gateway.function.invoking")

// SystemFunctionInvokingData struct.
type SystemFunctionInvokingData struct {
Expand All @@ -25,7 +25,7 @@ type SystemFunctionInvokingData struct {
}

// SystemFunctionInvokedType is a system event emitted after successful function invocation.
const SystemFunctionInvokedType = Type("gateway.function.invoked")
const SystemFunctionInvokedType = TypeName("gateway.function.invoked")

// SystemFunctionInvokedData struct.
type SystemFunctionInvokedData struct {
Expand All @@ -36,7 +36,7 @@ type SystemFunctionInvokedData struct {
}

// SystemFunctionInvocationFailedType is a system event emitted after successful function invocation.
const SystemFunctionInvocationFailedType = Type("gateway.function.invocationFailed")
const SystemFunctionInvocationFailedType = TypeName("gateway.function.invocationFailed")

// SystemFunctionInvocationFailedData struct.
type SystemFunctionInvocationFailedData struct {
Expand Down
28 changes: 28 additions & 0 deletions event/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package event

import "go.uber.org/zap/zapcore"

const (
// TypeHTTPRequest is a special type of event HTTP requests that are not CloudEvents.
TypeHTTPRequest = TypeName("http.request")
)

// TypeName uniquely identifies an event type.
type TypeName string

// Type is a registered event type.
type Type struct {
Space string `json:"space" validate:"required,min=3,space"`
Name TypeName `json:"name" validate:"required"`
}

// Types is an array of subscriptions.
type Types []*Type

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface
func (t Type) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("space", string(t.Space))
enc.AddString("name", string(t.Name))

return nil
}
4 changes: 3 additions & 1 deletion httpapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"time"

"github.com/julienschmidt/httprouter"
"github.com/serverless/event-gateway/event"
"github.com/serverless/event-gateway/function"
"github.com/serverless/event-gateway/subscription"
)

// StartConfigAPI creates a new configuration API server and listens for requests.
func StartConfigAPI(functions function.Service, subscriptions subscription.Service, config ServerConfig) {
func StartConfigAPI(eventtypes event.Service, functions function.Service, subscriptions subscription.Service, config ServerConfig) {
router := httprouter.New()
api := &HTTPAPI{
EventTypes: eventtypes,
Functions: functions,
Subscriptions: subscriptions,
}
Expand Down
Loading