Skip to content

Commit

Permalink
Plugin System (#330). Closes #147
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw authored Oct 6, 2017
1 parent 4c2ba40 commit 31799d1
Show file tree
Hide file tree
Showing 20 changed files with 905 additions and 161 deletions.
28 changes: 26 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 48 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ yet ready for production applications.*
1. [Subscriptions](#subscriptions)
1. [Events API](#events-api)
1. [Configuration API](#configuration-api)
1. [System Events](#system-events)
1. [Plugin System](#plugin-system)
1. [Client Libraries](#client-libraries)
1. [Versioning](#versioning)
1. [Comparison](#comparison)
Expand Down Expand Up @@ -281,13 +283,6 @@ the data block is base64 encoded.

`invoke` is a built-in type of event allowing to call functions synchronously.

#### System Events

The Event Gateway emits system events allowing to react on internal events. Currently, only one internal event is
emitted. `gateway.info.functionError` happens when function invocation failed.

If you are looking for more system events, please comment [the corresponding issue](https://github.com/serverless/event-gateway/issues/215).

### Emit a Custom Event

Creating a subscription requires `path` property (by default it's "/"). `path` indicates path under which you can push.
Expand Down Expand Up @@ -551,6 +546,52 @@ Dummy endpoint (always returning `200 OK` status code) for checking if the event

`GET <Configuration API URL>/v1/status`

## System Events

System Events are special type of events emitted by the Event Gateway instance. They are emitted on each stage of event
processing flow starting from receiving event to function invocation end. Those events are:

- `gateway.event.received` - the event is emitted when an event was received by Events API. Data fields:
- `event` - event payload
- `path` - Events API path
- `headers` - HTTP request headers
- `gateway.function.invoking` - the event emitted before invoking a function. Data fields:
- `event` - event payload
- `functionId` - registered function ID
- `gateway.function.invoked` - the event emitted after successful function invocation. Data fields:
- `event` - event payload
- `functionId` - registered function ID
- `result` - function response
- `gateway.function.invocationFailed` - the event emitted after failed function invocation. Data fields:
- `event` - event payload
- `functionId` - registered function ID
- `error` - invocation error

## Plugin System

The Event Gateway is built with extensibility in mind. Built-in plugin system allows reacting on system events and
manipulate how an event is processed through the Event Gateway.

_Current implementation supports plugins written only in Golang. We plan to support other languages in the future._

Plugin system is based on [go-plugin](https://github.com/hashicorp/go-plugin). A plugin needs to implement the following
interface:

```go
type Reacter interface {
Subscriptions() []Subscription
React(event event.Event) error
}
```

`Subscription` model indicates the event that plugin subscribes to and the subscription type. A subscription can be either
sync or async. Sync (blocking) subscription means that in case of error returned from `React` method the event won't be
further processed by the Event Gateway.

`React` method is called for every system event that plugin subscribed to.

For more details, see [the example plugin](plugin/example).

## Client Libraries

- [FDK for Node.js](https://github.com/serverless/fdk)
Expand Down
21 changes: 10 additions & 11 deletions api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,33 @@ import (
"time"

"github.com/rs/cors"
"github.com/serverless/event-gateway/internal/cache"
"github.com/serverless/event-gateway/internal/httpapi"
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/router"
)

// StartEventsAPI creates a new gateway endpoint and listens for requests.
func StartEventsAPI(config httpapi.Config) httpapi.Server {
targetCache := cache.NewTarget("/serverless-event-gateway", config.KV, config.Log)
router := router.New(targetCache, metrics.DroppedPubSubEvents, config.Log)
router.StartWorkers()
// EventsAPIConfig stores configuration for Events API. Events API expects cofigured router. That's why new
// configuration object is needed.
type EventsAPIConfig struct {
httpapi.Config
Router http.Handler
}

// StartEventsAPI creates a new gateway endpoint and listens for requests.
func StartEventsAPI(config EventsAPIConfig) httpapi.Server {
handler := &http.Server{
Addr: ":" + strconv.Itoa(int(config.Port)),
Handler: cors.AllowAll().Handler(router),
Handler: cors.AllowAll().Handler(config.Router),
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}

server := httpapi.Server{
Config: config,
Config: config.Config,
HTTPHandler: handler,
}

config.ShutdownGuard.Add(1)
go func() {
server.Listen()
router.Drain()
config.ShutdownGuard.Done()
}()

Expand Down
62 changes: 48 additions & 14 deletions cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ import (
"go.uber.org/zap/zapcore"

"github.com/serverless/event-gateway/api"
"github.com/serverless/event-gateway/internal/cache"
"github.com/serverless/event-gateway/internal/embedded"
"github.com/serverless/event-gateway/internal/httpapi"
"github.com/serverless/event-gateway/internal/metrics"
"github.com/serverless/event-gateway/internal/sync"
"github.com/serverless/event-gateway/plugin"
"github.com/serverless/event-gateway/router"
)

var version = "dev"

func init() {
etcd.Register()

prometheus.MustRegister(metrics.RequestDuration)
prometheus.MustRegister(metrics.DroppedPubSubEvents)
}

// nolint: gocyclo
func main() {
showVersion := flag.Bool("version", false, "Show version.")
logLevel := zap.LevelFlag("log-level", zap.InfoLevel, `The level of logging to show after the event gateway has started. The available log levels are "debug", "info", "warn", and "err".`)
Expand All @@ -42,17 +49,16 @@ func main() {
eventsPort := flag.Uint("events-port", 4000, "Port to serve events API on.")
eventsTLSCrt := flag.String("events-tls-cert", "", "Path to events API TLS certificate file.")
eventsTLSKey := flag.String("events-tls-key", "", "Path to events API TLS key file.")
plugins := paths{}
flag.Var(&plugins, "plugin", "Path to a plugin to load.")
flag.Parse()

if *showVersion {
fmt.Printf("Event Gateway version: %s\n", version)
os.Exit(0)
}

prometheus.MustRegister(metrics.RequestDuration)
prometheus.MustRegister(metrics.DroppedPubSubEvents)

log, err := loggerConfiguration(*developmentMode, *logLevel, *logFormat).Build()
log, err := logger(*developmentMode, *logLevel, *logFormat).Build()
if err != nil {
panic(err)
}
Expand All @@ -64,10 +70,9 @@ func main() {
embedded.EmbedEtcd(*embedDataDir, *embedPeerAddr, *embedCliAddr, shutdownGuard)
}

dbHostStrings := strings.Split(*dbHosts, ",")
kv, err := libkv.NewStore(
store.ETCDV3,
dbHostStrings,
strings.Split(*dbHosts, ","),
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
Expand All @@ -76,13 +81,26 @@ func main() {
log.Fatal("Cannot create KV client.", zap.Error(err))
}

eventServer := api.StartEventsAPI(httpapi.Config{
KV: kv,
Log: log,
TLSCrt: eventsTLSCrt,
TLSKey: eventsTLSKey,
Port: *eventsPort,
ShutdownGuard: shutdownGuard,
pluginManager := plugin.NewManager(plugins, log)
err = pluginManager.Connect()
if err != nil {
log.Fatal("Loading plugins failed.", zap.Error(err))
}

targetCache := cache.NewTarget("/serverless-event-gateway", kv, log)
router := router.New(targetCache, pluginManager, metrics.DroppedPubSubEvents, log)
router.StartWorkers()

eventServer := api.StartEventsAPI(api.EventsAPIConfig{
Config: httpapi.Config{
KV: kv,
Log: log,
TLSCrt: eventsTLSCrt,
TLSKey: eventsTLSKey,
Port: *eventsPort,
ShutdownGuard: shutdownGuard,
},
Router: router,
})

configServer := api.StartConfigAPI(httpapi.Config{
Expand All @@ -108,14 +126,19 @@ func main() {
}

shutdownGuard.Wait()
router.Drain()

if pluginManager != nil {
pluginManager.Kill()
}
}

const (
consoleEncoding = "console"
jsonEncoding = "json"
)

func loggerConfiguration(dev bool, level zapcore.Level, format string) zap.Config {
func logger(dev bool, level zapcore.Level, format string) zap.Config {
cfg := zap.Config{
Level: zap.NewAtomicLevelAt(level),
Development: false,
Expand Down Expand Up @@ -153,3 +176,14 @@ func loggerConfiguration(dev bool, level zapcore.Level, format string) zap.Confi

return cfg
}

type paths []string

func (p *paths) String() string {
return strings.Join(*p, ",")
}

func (p *paths) Set(value string) error {
*p = append(*p, value)
return nil
}
21 changes: 21 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package event

import (
"encoding/json"
"strings"
"time"

"go.uber.org/zap/zapcore"

uuid "github.com/satori/go.uuid"
)

Expand Down Expand Up @@ -34,3 +38,20 @@ const TypeInvoke = Type("invoke")

// TypeHTTP is a special type of event for sync http subscriptions.
const TypeHTTP = Type("http")

// MarshalLogObject is a part of zapcore.ObjectMarshaler interface
func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("type", string(e.Type))
enc.AddString("id", e.ID)
enc.AddUint64("receivedAt", e.ReceivedAt)
payload, _ := json.Marshal(e.Data)
enc.AddString("data", string(payload))
enc.AddString("dataType", e.DataType)

return nil
}

// IsSystem indicates if th event is a system event.
func (e Event) IsSystem() bool {
return strings.HasPrefix(string(e.Type), "gateway.")
}
46 changes: 46 additions & 0 deletions event/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package event

import (
"net/http"

"github.com/serverless/event-gateway/functions"
)

// SystemEventReceivedType is a system event emmited when the Event Gateway receives an event.
const SystemEventReceivedType = Type("gateway.event.received")

// SystemEventReceivedData struct.
type SystemEventReceivedData struct {
Path string `json:"path"`
Event Event `json:"event"`
Headers http.Header `json:"header"`
}

// SystemFunctionInvokingType is a system event emmited before invoking a function.
const SystemFunctionInvokingType = Type("gateway.function.invoking")

// SystemFunctionInvokingData struct.
type SystemFunctionInvokingData struct {
FunctionID functions.FunctionID `json:"functionId"`
Event Event `json:"event"`
}

// SystemFunctionInvokedType is a system event emmited after successful function invocation.
const SystemFunctionInvokedType = Type("gateway.function.invoked")

// SystemFunctionInvokedData struct.
type SystemFunctionInvokedData struct {
FunctionID functions.FunctionID `json:"functionId"`
Event Event `json:"event"`
Result []byte `json:"result"`
}

// SystemFunctionInvocationFailedType is a system event emmited after successful function invocation.
const SystemFunctionInvocationFailedType = Type("gateway.function.invocationFailed")

// SystemFunctionInvocationFailedData struct.
type SystemFunctionInvocationFailedData struct {
FunctionID functions.FunctionID `json:"functionId"`
Event Event `json:"event"`
Error []byte `json:"result"`
}
Loading

0 comments on commit 31799d1

Please sign in to comment.