diff --git a/Gopkg.lock b/Gopkg.lock index beaa51e..699577d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -115,6 +115,24 @@ revision = "18d159699f2e83fc5bb9ef2f79465ca3f3122676" version = "v1.2.0" +[[projects]] + branch = "master" + name = "github.com/hashicorp/go-hclog" + packages = ["."] + revision = "8105cc0a3736cc153a2025f5d0d91b80045fc9ff" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/go-plugin" + packages = ["."] + revision = "3e6d191694b5a3a2b99755f31b47fa209e4bcd09" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/yamux" + packages = ["."] + revision = "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd" + [[projects]] branch = "master" name = "github.com/jmespath/go-jmespath" @@ -139,6 +157,12 @@ packages = ["pbutil"] revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" +[[projects]] + branch = "master" + name = "github.com/mitchellh/go-testing-interface" + packages = ["."] + revision = "7bf6f6eaf1bed2fd3c6c63114b18cb64facb9de2" + [[projects]] branch = "master" name = "github.com/petar/GoLLRB" @@ -248,7 +272,7 @@ [[projects]] name = "google.golang.org/grpc" - packages = [".","codes","credentials","grpclog","internal","keepalive","metadata","naming","peer","stats","tap","transport"] + packages = [".","codes","credentials","grpclog","health","health/grpc_health_v1","internal","keepalive","metadata","naming","peer","stats","tap","transport"] revision = "8050b9cbc271307e5a716a9d782803d09b0d6f2d" version = "v1.2.1" @@ -266,6 +290,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "20e4a55db41a69a449aa2de156e25d0075096699977a2bd715302870619c62ee" + inputs-digest = "04eac75ae64d5fdbde36076cfe6d7b8d74207ebf268b7fa1f923fc5947e425a8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/README.md b/README.md index 79e36bc..f6fae91 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ yet ready for production applications.* 1. [Subscriptions](#subscriptions) 1. [Events API](#events-api) 1. [Configuration API](#configuration-api) +1. [System Events](#system-events) +1. [Plugin System](#plugin-system) 1. [Client Libraries](#client-libraries) 1. [Versioning](#versioning) 1. [Comparison](#comparison) @@ -281,13 +283,6 @@ the data block is base64 encoded. `invoke` is a built-in type of event allowing to call functions synchronously. -#### System Events - -The Event Gateway emits system events allowing to react on internal events. Currently, only one internal event is -emitted. `gateway.info.functionError` happens when function invocation failed. - -If you are looking for more system events, please comment [the corresponding issue](https://github.com/serverless/event-gateway/issues/215). - ### Emit a Custom Event Creating a subscription requires `path` property (by default it's "/"). `path` indicates path under which you can push. @@ -551,6 +546,52 @@ Dummy endpoint (always returning `200 OK` status code) for checking if the event `GET /v1/status` +## System Events + +System Events are special type of events emitted by the Event Gateway instance. They are emitted on each stage of event +processing flow starting from receiving event to function invocation end. Those events are: + +- `gateway.event.received` - the event is emitted when an event was received by Events API. Data fields: + - `event` - event payload + - `path` - Events API path + - `headers` - HTTP request headers +- `gateway.function.invoking` - the event emitted before invoking a function. Data fields: + - `event` - event payload + - `functionId` - registered function ID +- `gateway.function.invoked` - the event emitted after successful function invocation. Data fields: + - `event` - event payload + - `functionId` - registered function ID + - `result` - function response +- `gateway.function.invocationFailed` - the event emitted after failed function invocation. Data fields: + - `event` - event payload + - `functionId` - registered function ID + - `error` - invocation error + +## Plugin System + +The Event Gateway is built with extensibility in mind. Built-in plugin system allows reacting on system events and +manipulate how an event is processed through the Event Gateway. + +_Current implementation supports plugins written only in Golang. We plan to support other languages in the future._ + +Plugin system is based on [go-plugin](https://github.com/hashicorp/go-plugin). A plugin needs to implement the following +interface: + +```go +type Reacter interface { + Subscriptions() []Subscription + React(event event.Event) error +} +``` + +`Subscription` model indicates the event that plugin subscribes to and the subscription type. A subscription can be either +sync or async. Sync (blocking) subscription means that in case of error returned from `React` method the event won't be +further processed by the Event Gateway. + +`React` method is called for every system event that plugin subscribed to. + +For more details, see [the example plugin](plugin/example). + ## Client Libraries - [FDK for Node.js](https://github.com/serverless/fdk) diff --git a/api/events.go b/api/events.go index 2e369ec..bcf100e 100644 --- a/api/events.go +++ b/api/events.go @@ -6,34 +6,33 @@ import ( "time" "github.com/rs/cors" - "github.com/serverless/event-gateway/internal/cache" "github.com/serverless/event-gateway/internal/httpapi" - "github.com/serverless/event-gateway/internal/metrics" - "github.com/serverless/event-gateway/router" ) -// StartEventsAPI creates a new gateway endpoint and listens for requests. -func StartEventsAPI(config httpapi.Config) httpapi.Server { - targetCache := cache.NewTarget("/serverless-event-gateway", config.KV, config.Log) - router := router.New(targetCache, metrics.DroppedPubSubEvents, config.Log) - router.StartWorkers() +// EventsAPIConfig stores configuration for Events API. Events API expects cofigured router. That's why new +// configuration object is needed. +type EventsAPIConfig struct { + httpapi.Config + Router http.Handler +} +// StartEventsAPI creates a new gateway endpoint and listens for requests. +func StartEventsAPI(config EventsAPIConfig) httpapi.Server { handler := &http.Server{ Addr: ":" + strconv.Itoa(int(config.Port)), - Handler: cors.AllowAll().Handler(router), + Handler: cors.AllowAll().Handler(config.Router), ReadTimeout: 3 * time.Second, WriteTimeout: 3 * time.Second, } server := httpapi.Server{ - Config: config, + Config: config.Config, HTTPHandler: handler, } config.ShutdownGuard.Add(1) go func() { server.Listen() - router.Drain() config.ShutdownGuard.Done() }() diff --git a/cmd/event-gateway/main.go b/cmd/event-gateway/main.go index 62951d7..c9c3f57 100644 --- a/cmd/event-gateway/main.go +++ b/cmd/event-gateway/main.go @@ -15,18 +15,25 @@ import ( "go.uber.org/zap/zapcore" "github.com/serverless/event-gateway/api" + "github.com/serverless/event-gateway/internal/cache" "github.com/serverless/event-gateway/internal/embedded" "github.com/serverless/event-gateway/internal/httpapi" "github.com/serverless/event-gateway/internal/metrics" "github.com/serverless/event-gateway/internal/sync" + "github.com/serverless/event-gateway/plugin" + "github.com/serverless/event-gateway/router" ) var version = "dev" func init() { etcd.Register() + + prometheus.MustRegister(metrics.RequestDuration) + prometheus.MustRegister(metrics.DroppedPubSubEvents) } +// nolint: gocyclo func main() { showVersion := flag.Bool("version", false, "Show version.") logLevel := zap.LevelFlag("log-level", zap.InfoLevel, `The level of logging to show after the event gateway has started. The available log levels are "debug", "info", "warn", and "err".`) @@ -42,6 +49,8 @@ func main() { eventsPort := flag.Uint("events-port", 4000, "Port to serve events API on.") eventsTLSCrt := flag.String("events-tls-cert", "", "Path to events API TLS certificate file.") eventsTLSKey := flag.String("events-tls-key", "", "Path to events API TLS key file.") + plugins := paths{} + flag.Var(&plugins, "plugin", "Path to a plugin to load.") flag.Parse() if *showVersion { @@ -49,10 +58,7 @@ func main() { os.Exit(0) } - prometheus.MustRegister(metrics.RequestDuration) - prometheus.MustRegister(metrics.DroppedPubSubEvents) - - log, err := loggerConfiguration(*developmentMode, *logLevel, *logFormat).Build() + log, err := logger(*developmentMode, *logLevel, *logFormat).Build() if err != nil { panic(err) } @@ -64,10 +70,9 @@ func main() { embedded.EmbedEtcd(*embedDataDir, *embedPeerAddr, *embedCliAddr, shutdownGuard) } - dbHostStrings := strings.Split(*dbHosts, ",") kv, err := libkv.NewStore( store.ETCDV3, - dbHostStrings, + strings.Split(*dbHosts, ","), &store.Config{ ConnectionTimeout: 10 * time.Second, }, @@ -76,13 +81,26 @@ func main() { log.Fatal("Cannot create KV client.", zap.Error(err)) } - eventServer := api.StartEventsAPI(httpapi.Config{ - KV: kv, - Log: log, - TLSCrt: eventsTLSCrt, - TLSKey: eventsTLSKey, - Port: *eventsPort, - ShutdownGuard: shutdownGuard, + pluginManager := plugin.NewManager(plugins, log) + err = pluginManager.Connect() + if err != nil { + log.Fatal("Loading plugins failed.", zap.Error(err)) + } + + targetCache := cache.NewTarget("/serverless-event-gateway", kv, log) + router := router.New(targetCache, pluginManager, metrics.DroppedPubSubEvents, log) + router.StartWorkers() + + eventServer := api.StartEventsAPI(api.EventsAPIConfig{ + Config: httpapi.Config{ + KV: kv, + Log: log, + TLSCrt: eventsTLSCrt, + TLSKey: eventsTLSKey, + Port: *eventsPort, + ShutdownGuard: shutdownGuard, + }, + Router: router, }) configServer := api.StartConfigAPI(httpapi.Config{ @@ -108,6 +126,11 @@ func main() { } shutdownGuard.Wait() + router.Drain() + + if pluginManager != nil { + pluginManager.Kill() + } } const ( @@ -115,7 +138,7 @@ const ( jsonEncoding = "json" ) -func loggerConfiguration(dev bool, level zapcore.Level, format string) zap.Config { +func logger(dev bool, level zapcore.Level, format string) zap.Config { cfg := zap.Config{ Level: zap.NewAtomicLevelAt(level), Development: false, @@ -153,3 +176,14 @@ func loggerConfiguration(dev bool, level zapcore.Level, format string) zap.Confi return cfg } + +type paths []string + +func (p *paths) String() string { + return strings.Join(*p, ",") +} + +func (p *paths) Set(value string) error { + *p = append(*p, value) + return nil +} diff --git a/event/event.go b/event/event.go index 52b45f5..fd1721a 100644 --- a/event/event.go +++ b/event/event.go @@ -1,8 +1,12 @@ package event import ( + "encoding/json" + "strings" "time" + "go.uber.org/zap/zapcore" + uuid "github.com/satori/go.uuid" ) @@ -34,3 +38,20 @@ const TypeInvoke = Type("invoke") // TypeHTTP is a special type of event for sync http subscriptions. const TypeHTTP = Type("http") + +// MarshalLogObject is a part of zapcore.ObjectMarshaler interface +func (e Event) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("type", string(e.Type)) + enc.AddString("id", e.ID) + enc.AddUint64("receivedAt", e.ReceivedAt) + payload, _ := json.Marshal(e.Data) + enc.AddString("data", string(payload)) + enc.AddString("dataType", e.DataType) + + return nil +} + +// IsSystem indicates if th event is a system event. +func (e Event) IsSystem() bool { + return strings.HasPrefix(string(e.Type), "gateway.") +} diff --git a/event/system.go b/event/system.go new file mode 100644 index 0000000..e81c484 --- /dev/null +++ b/event/system.go @@ -0,0 +1,46 @@ +package event + +import ( + "net/http" + + "github.com/serverless/event-gateway/functions" +) + +// SystemEventReceivedType is a system event emmited when the Event Gateway receives an event. +const SystemEventReceivedType = Type("gateway.event.received") + +// SystemEventReceivedData struct. +type SystemEventReceivedData struct { + Path string `json:"path"` + Event Event `json:"event"` + Headers http.Header `json:"header"` +} + +// SystemFunctionInvokingType is a system event emmited before invoking a function. +const SystemFunctionInvokingType = Type("gateway.function.invoking") + +// SystemFunctionInvokingData struct. +type SystemFunctionInvokingData struct { + FunctionID functions.FunctionID `json:"functionId"` + Event Event `json:"event"` +} + +// SystemFunctionInvokedType is a system event emmited after successful function invocation. +const SystemFunctionInvokedType = Type("gateway.function.invoked") + +// SystemFunctionInvokedData struct. +type SystemFunctionInvokedData struct { + FunctionID functions.FunctionID `json:"functionId"` + Event Event `json:"event"` + Result []byte `json:"result"` +} + +// SystemFunctionInvocationFailedType is a system event emmited after successful function invocation. +const SystemFunctionInvocationFailedType = Type("gateway.function.invocationFailed") + +// SystemFunctionInvocationFailedData struct. +type SystemFunctionInvocationFailedData struct { + FunctionID functions.FunctionID `json:"functionId"` + Event Event `json:"event"` + Error []byte `json:"result"` +} diff --git a/internal/cache/target.go b/internal/cache/target.go index 4080b89..0e1397b 100644 --- a/internal/cache/target.go +++ b/internal/cache/target.go @@ -12,14 +12,7 @@ import ( "github.com/serverless/event-gateway/internal/pathtree" ) -// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions. -type Targeter interface { - HTTPBackingFunction(method, path string) (*functions.FunctionID, pathtree.Params) - Function(functionID functions.FunctionID) *functions.Function - SubscribersOfEvent(path string, eventType event.Type) []functions.FunctionID -} - -// Target is an implementation of Targeter using the docker/libkv library for watching data in etcd, zookeeper, and +// Target is an implementation of router.Targeter using the docker/libkv library for watching data in etcd, zookeeper, and // consul. type Target struct { log *zap.Logger diff --git a/plugin/example/main.go b/plugin/example/main.go new file mode 100644 index 0000000..57a6160 --- /dev/null +++ b/plugin/example/main.go @@ -0,0 +1,23 @@ +package main + +import ( + goplugin "github.com/hashicorp/go-plugin" + "github.com/serverless/event-gateway/plugin" +) + +func main() { + pluginMap := map[string]goplugin.Plugin{ + "subscriber": &plugin.SubscriberPlugin{Reacter: &Simple{}}, + } + + handshakeConfig := goplugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "EVENT_GATEWAY_MAGIC_COOKIE", + MagicCookieValue: "0329c93c-a64c-4eb5-bf72-63172430d433", + } + + goplugin.Serve(&goplugin.ServeConfig{ + HandshakeConfig: handshakeConfig, + Plugins: pluginMap, + }) +} diff --git a/plugin/example/plugin.go b/plugin/example/plugin.go new file mode 100644 index 0000000..4113be9 --- /dev/null +++ b/plugin/example/plugin.go @@ -0,0 +1,40 @@ +package main + +import ( + "encoding/gob" + "log" + + "github.com/serverless/event-gateway/event" + "github.com/serverless/event-gateway/plugin" +) + +func init() { + gob.Register(map[string]interface{}{}) + gob.Register(event.SystemEventReceivedData{}) + gob.Register(event.SystemFunctionInvokingData{}) +} + +// Simple plugin demonstrating how to build plugins. +type Simple struct{} + +// Subscriptions return list of events that plugin listens to. +func (s *Simple) Subscriptions() []plugin.Subscription { + return []plugin.Subscription{ + plugin.Subscription{ + EventType: event.SystemEventReceivedType, + Type: plugin.Sync, + }, + } +} + +// React is called for every event that plugin subscribed to. +func (s *Simple) React(instance event.Event) error { + switch instance.Type { + case event.SystemEventReceivedType: + received := instance.Data.(event.SystemEventReceivedData) + log.Printf("received gateway.received.event for event: %q", received.Event.Type) + break + } + + return nil +} diff --git a/plugin/logger.go b/plugin/logger.go new file mode 100644 index 0000000..8f9afa1 --- /dev/null +++ b/plugin/logger.go @@ -0,0 +1,87 @@ +package plugin + +import ( + "fmt" + "io/ioutil" + "log" + + hclog "github.com/hashicorp/go-hclog" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// Hclog2ZapLogger implements Hashicorp's hclog.Logger interface using Uber's zap.Logger. It's a workaround for plugin +// system. go-plugin doesn't support other logger than hclog. This logger implements only methods used by the go-plugin. +type Hclog2ZapLogger struct { + Zap *zap.Logger +} + +// Trace implementation. +func (l Hclog2ZapLogger) Trace(msg string, args ...interface{}) {} + +// Debug implementation. +func (l Hclog2ZapLogger) Debug(msg string, args ...interface{}) { + l.Zap.Debug(msg, argsToFields(args...)...) +} + +// Info implementation. +func (l Hclog2ZapLogger) Info(msg string, args ...interface{}) { + l.Zap.Info(msg, argsToFields(args...)...) +} + +// Warn implementation. +func (l Hclog2ZapLogger) Warn(msg string, args ...interface{}) { + l.Zap.Warn(msg, argsToFields(args...)...) +} + +// Error implementation. +func (l Hclog2ZapLogger) Error(msg string, args ...interface{}) { + l.Zap.Error(msg, argsToFields(args...)...) +} + +// IsTrace implementation. +func (l Hclog2ZapLogger) IsTrace() bool { return false } + +// IsDebug implementation. +func (l Hclog2ZapLogger) IsDebug() bool { return false } + +// IsInfo implementation. +func (l Hclog2ZapLogger) IsInfo() bool { return false } + +// IsWarn implementation. +func (l Hclog2ZapLogger) IsWarn() bool { return false } + +// IsError implementation. +func (l Hclog2ZapLogger) IsError() bool { return false } + +// With implementation. +func (l Hclog2ZapLogger) With(args ...interface{}) hclog.Logger { + return Hclog2ZapLogger{l.Zap.With(argsToFields(args...)...)} +} + +// Named implementation. +func (l Hclog2ZapLogger) Named(name string) hclog.Logger { + return Hclog2ZapLogger{l.Zap.Named(name)} +} + +// ResetNamed implementation. +func (l Hclog2ZapLogger) ResetNamed(name string) hclog.Logger { + // no need to implement that as go-plugin doesn't use this method. + return Hclog2ZapLogger{} +} + +// StandardLogger implementation. +func (l Hclog2ZapLogger) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger { + // no need to implement that as go-plugin doesn't use this method. + return log.New(ioutil.Discard, "", 0) +} + +func argsToFields(args ...interface{}) []zapcore.Field { + fields := []zapcore.Field{} + for i := 0; i < len(args); i += 2 { + fields = append(fields, zap.String(args[i].(string), fmt.Sprintf("%v", args[i+1]))) + } + + return fields +} diff --git a/plugin/manager.go b/plugin/manager.go new file mode 100644 index 0000000..52a65c9 --- /dev/null +++ b/plugin/manager.go @@ -0,0 +1,123 @@ +package plugin + +import ( + "encoding/gob" + "os/exec" + + "go.uber.org/zap" + + goplugin "github.com/hashicorp/go-plugin" + "github.com/serverless/event-gateway/event" +) + +func init() { + gob.Register(map[string]interface{}{}) + gob.Register(event.SystemEventReceivedData{}) + gob.Register(event.SystemFunctionInvokingData{}) + gob.Register(event.SystemFunctionInvokedData{}) +} + +// Plugin is a generic struct for storing info about a plugin. +type Plugin struct { + Path string + Client *goplugin.Client + Reacter Reacter + Subscriptions []Subscription +} + +// Manager handles lifecycle of plugin management. +type Manager struct { + Plugins []*Plugin + Log *zap.Logger +} + +// NewManager creates new Manager. +func NewManager(paths []string, log *zap.Logger) *Manager { + plugins := []*Plugin{} + logger := Hclog2ZapLogger{log} + for _, path := range paths { + client := goplugin.NewClient(&goplugin.ClientConfig{ + HandshakeConfig: handshakeConfig, + Plugins: pluginMap, + Cmd: exec.Command(path), + Logger: logger.Named("PluginManager"), + }) + + plugins = append(plugins, &Plugin{ + Client: client, + Path: path, + }) + } + + return &Manager{ + Plugins: plugins, + Log: log, + } +} + +// Connect connects to plugins. +func (m *Manager) Connect() error { + for _, plugin := range m.Plugins { + rpcClient, err := plugin.Client.Client() + if err != nil { + return err + } + + // Request the plugin + raw, err := rpcClient.Dispense("subscriber") + if err != nil { + return err + } + + plugin.Reacter = raw.(*Subscriber) + plugin.Subscriptions = plugin.Reacter.Subscriptions() + } + + return nil +} + +// Kill disconnects plugins and kill subprocesses. +func (m *Manager) Kill() { + for _, plugin := range m.Plugins { + plugin.Client.Kill() + } +} + +// React call all plugins' React method. It returns when the first error is returned by a plugin. +func (m *Manager) React(event *event.Event) error { + for _, plugin := range m.Plugins { + for _, subscription := range plugin.Subscriptions { + if subscription.EventType == event.Type { + err := plugin.Reacter.React(*event) + if err != nil { + m.Log.Debug("Plugin returned error.", + zap.String("plugin", plugin.Path), + zap.Error(err), + zap.String("subscriptionType", string(subscription.Type))) + if subscription.Type == Sync { + return err + } + } + } + } + } + + return nil +} + +// handshakeConfig is used to just do a basic handshake between a plugin and host. If the handshake fails, a user +// friendly error is shown. This prevents users from executing bad plugins or executing a plugin directory. It is a UX +// feature, not a security feature. +var handshakeConfig = goplugin.HandshakeConfig{ + // The ProtocolVersion is the version that must match between EG core and EG plugins. This should be bumped whenever + // a change happens in one or the other that makes it so that they can't safely communicate. + ProtocolVersion: 1, + // The magic cookie values should NEVER be changed. + MagicCookieKey: "EVENT_GATEWAY_MAGIC_COOKIE", + MagicCookieValue: "0329c93c-a64c-4eb5-bf72-63172430d433", +} + +// pluginMap is the map of plugins we can dispense. +var pluginMap = map[string]goplugin.Plugin{ + "subscriber": &SubscriberPlugin{}, +} diff --git a/plugin/plugin.go b/plugin/plugin.go new file mode 100644 index 0000000..08ac80c --- /dev/null +++ b/plugin/plugin.go @@ -0,0 +1,22 @@ +package plugin + +import ( + "net/rpc" + + goplugin "github.com/hashicorp/go-plugin" +) + +// SubscriberPlugin is the plugin.Plugin implementation. +type SubscriberPlugin struct { + Reacter Reacter +} + +// Server hosts SubscriberServer. +func (s *SubscriberPlugin) Server(*goplugin.MuxBroker) (interface{}, error) { + return &SubscriberServer{Reacter: s.Reacter}, nil +} + +// Client provides Subscriber client. +func (s *SubscriberPlugin) Client(b *goplugin.MuxBroker, c *rpc.Client) (interface{}, error) { + return &Subscriber{client: c}, nil +} diff --git a/plugin/subscriber.go b/plugin/subscriber.go new file mode 100644 index 0000000..1c98665 --- /dev/null +++ b/plugin/subscriber.go @@ -0,0 +1,95 @@ +package plugin + +import ( + "net/rpc" + + goplugin "github.com/hashicorp/go-plugin" + "github.com/serverless/event-gateway/event" +) + +// Reacter provides subscriptions for events that can react to. +type Reacter interface { + Subscriptions() []Subscription + React(event event.Event) error +} + +// Type of a subscription. +type Type string + +const ( + // Async subscription type. Plugin host will not block on it. + Async Type = "async" + // Sync subscription type. Plugin host will use the response from the plugin before proceeding. + Sync Type = "sync" +) + +// Subscription use by plugin to indicate which event it want to react to. +type Subscription struct { + EventType event.Type + Type Type +} + +// Subscriber is a RPC implementation of Reacter. +type Subscriber struct { + client *rpc.Client +} + +// Subscriptions call plugin implementation. +func (s *Subscriber) Subscriptions() []Subscription { + var resp SubscriberSubscriptionsResponse + err := s.client.Call("Plugin.Subscriptions", new(interface{}), &resp) + if err != nil { + return []Subscription{} + } + + return resp.Subscriptions +} + +// SubscriberSubscriptionsResponse RPC response +type SubscriberSubscriptionsResponse struct { + Subscriptions []Subscription +} + +// React calls plugin implementation. +func (s *Subscriber) React(event event.Event) error { + args := &SubscriberReactArgs{Event: event} + var resp SubscriberReactResponse + err := s.client.Call("Plugin.React", args, &resp) + if err != nil { + return err + } + if resp.Error != nil { + err = resp.Error + } + + return err +} + +// SubscriberReactArgs RPC args +type SubscriberReactArgs struct { + Event event.Event +} + +// SubscriberReactResponse RPC response +type SubscriberReactResponse struct { + Error *goplugin.BasicError +} + +// SubscriberServer is a net/rpc compatibile structure for serving a Reacter. +type SubscriberServer struct { + Reacter Reacter +} + +// Subscriptions server implementation. +func (s *SubscriberServer) Subscriptions(_ interface{}, resp *SubscriberSubscriptionsResponse) error { + *resp = SubscriberSubscriptionsResponse{Subscriptions: s.Reacter.Subscriptions()} + return nil +} + +// React server implementation. +func (s *SubscriberServer) React(args *SubscriberReactArgs, resp *SubscriberReactResponse) error { + err := s.Reacter.React(args.Event) + + *resp = SubscriberReactResponse{Error: goplugin.NewBasicError(err)} + return nil +} diff --git a/router/event.go b/router/event.go index 6588b1b..7f78d41 100644 --- a/router/event.go +++ b/router/event.go @@ -2,6 +2,7 @@ package router import ( "encoding/json" + "errors" "io/ioutil" "net/http" @@ -31,9 +32,13 @@ func fromRequest(r *http.Request) (*eventpkg.Event, error) { mime = mimeOctetStrem } - body, err := ioutil.ReadAll(r.Body) - if err != nil { - return nil, err + body := []byte{} + var err error + if r.Body != nil { + body, err = ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } } event := eventpkg.NewEvent(eventType, mime, body) @@ -41,7 +46,7 @@ func fromRequest(r *http.Request) (*eventpkg.Event, error) { if mime == mimeJSON && len(body) > 0 { err := json.Unmarshal(body, &event.Data) if err != nil { - return nil, err + return nil, errors.New("malformed JSON body") } } @@ -58,8 +63,7 @@ func fromRequest(r *http.Request) (*eventpkg.Event, error) { return event, nil } -type event struct { - path string - eventType eventpkg.Type - payload []byte +type workEvent struct { + path string + event eventpkg.Event } diff --git a/router/integration_test.go b/router/integration_test.go index e96efae..29569c2 100644 --- a/router/integration_test.go +++ b/router/integration_test.go @@ -22,6 +22,7 @@ import ( "github.com/serverless/event-gateway/internal/kv" "github.com/serverless/event-gateway/internal/metrics" "github.com/serverless/event-gateway/internal/sync" + "github.com/serverless/event-gateway/plugin" "github.com/serverless/event-gateway/subscriptions" "github.com/serverless/libkv" "github.com/serverless/libkv/store" @@ -97,27 +98,6 @@ func TestIntegration_AsyncSubscription(t *testing.T) { shutdownGuard.ShutdownAndWait() } -func TestIntegration_AsyncFunctionNotFound(t *testing.T) { - logCfg := zap.NewDevelopmentConfig() - logCfg.DisableStacktrace = true - log, _ := logCfg.Build() - - kv, shutdownGuard := newTestEtcd() - - testAPIServer := newConfigAPIServer(kv, log) - defer testAPIServer.Close() - - router, testRouterServer := newTestRouterServer(kv, log) - defer testRouterServer.Close() - - statusCode, _, body := get(testRouterServer.URL) - assert.Equal(t, statusCode, 404) - assert.Equal(t, body, "Resource not found\n") - - router.Drain() - shutdownGuard.ShutdownAndWait() -} - func TestIntegration_HTTPSubscription(t *testing.T) { logCfg := zap.NewDevelopmentConfig() logCfg.DisableStacktrace = true @@ -215,7 +195,7 @@ func get(url string) (int, http.Header, string) { func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) { targetCache := cache.NewTarget("/serverless-event-gateway", kvstore, log) - router := New(targetCache, metrics.DroppedPubSubEvents, log) + router := New(targetCache, plugin.NewManager([]string{}, log), metrics.DroppedPubSubEvents, log) return router, httptest.NewServer(router) } diff --git a/router/mock/mock.go b/router/mock/mock.go new file mode 100644 index 0000000..bf359ae --- /dev/null +++ b/router/mock/mock.go @@ -0,0 +1,3 @@ +//go:generate mockgen -package mock -destination ./targetcache.go github.com/serverless/event-gateway/router Targeter + +package mock diff --git a/router/mock/targetcache.go b/router/mock/targetcache.go new file mode 100644 index 0000000..9a7c19c --- /dev/null +++ b/router/mock/targetcache.go @@ -0,0 +1,63 @@ +// Automatically generated by MockGen. DO NOT EDIT! +// Source: github.com/serverless/event-gateway/router (interfaces: Targeter) + +package mock + +import ( + gomock "github.com/golang/mock/gomock" + event "github.com/serverless/event-gateway/event" + functions "github.com/serverless/event-gateway/functions" + pathtree "github.com/serverless/event-gateway/internal/pathtree" +) + +// Mock of Targeter interface +type MockTargeter struct { + ctrl *gomock.Controller + recorder *_MockTargeterRecorder +} + +// Recorder for MockTargeter (not exported) +type _MockTargeterRecorder struct { + mock *MockTargeter +} + +func NewMockTargeter(ctrl *gomock.Controller) *MockTargeter { + mock := &MockTargeter{ctrl: ctrl} + mock.recorder = &_MockTargeterRecorder{mock} + return mock +} + +func (_m *MockTargeter) EXPECT() *_MockTargeterRecorder { + return _m.recorder +} + +func (_m *MockTargeter) Function(_param0 functions.FunctionID) *functions.Function { + ret := _m.ctrl.Call(_m, "Function", _param0) + ret0, _ := ret[0].(*functions.Function) + return ret0 +} + +func (_mr *_MockTargeterRecorder) Function(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Function", arg0) +} + +func (_m *MockTargeter) HTTPBackingFunction(_param0 string, _param1 string) (*functions.FunctionID, pathtree.Params) { + ret := _m.ctrl.Call(_m, "HTTPBackingFunction", _param0, _param1) + ret0, _ := ret[0].(*functions.FunctionID) + ret1, _ := ret[1].(pathtree.Params) + return ret0, ret1 +} + +func (_mr *_MockTargeterRecorder) HTTPBackingFunction(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "HTTPBackingFunction", arg0, arg1) +} + +func (_m *MockTargeter) SubscribersOfEvent(_param0 string, _param1 event.Type) []functions.FunctionID { + ret := _m.ctrl.Call(_m, "SubscribersOfEvent", _param0, _param1) + ret0, _ := ret[0].([]functions.FunctionID) + return ret0 +} + +func (_mr *_MockTargeterRecorder) SubscribersOfEvent(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "SubscribersOfEvent", arg0, arg1) +} diff --git a/router/router.go b/router/router.go index 30f56ee..326fbc0 100644 --- a/router/router.go +++ b/router/router.go @@ -3,6 +3,7 @@ package router import ( "encoding/json" "errors" + "fmt" "net/http" "strings" "sync" @@ -13,32 +14,33 @@ import ( eventpkg "github.com/serverless/event-gateway/event" "github.com/serverless/event-gateway/functions" - "github.com/serverless/event-gateway/internal/cache" + "github.com/serverless/event-gateway/plugin" ) // Router calls a target function when an endpoint is hit, and handles pubsub message delivery. type Router struct { sync.Mutex - targetCache cache.Targeter - dropMetric prometheus.Counter - log *zap.Logger - NWorkers uint - drain chan struct{} - drainWaitGroup sync.WaitGroup - active bool - work chan event - responseWriteTimeout time.Duration + targetCache Targeter + plugins *plugin.Manager + dropMetric prometheus.Counter + log *zap.Logger + workerNumber uint + drain chan struct{} + drainWaitGroup sync.WaitGroup + active bool + work chan workEvent } // New instantiates a new Router -func New(targetCache cache.Targeter, dropMetric prometheus.Counter, log *zap.Logger) *Router { +func New(targetCache Targeter, plugins *plugin.Manager, dropMetric prometheus.Counter, log *zap.Logger) *Router { return &Router{ - targetCache: targetCache, - dropMetric: dropMetric, - log: log, - NWorkers: 20, - drain: make(chan struct{}), - work: nil, + targetCache: targetCache, + plugins: plugins, + dropMetric: dropMetric, + log: log, + workerNumber: 20, + drain: make(chan struct{}), + work: nil, } } @@ -51,25 +53,31 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) { event, err := fromRequest(r) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusBadRequest) return } - payload, err := json.Marshal(event) + router.log.Debug("Event received.", zap.String("path", r.URL.Path), zap.Object("event", event)) + err = router.emitSystemEventReceived(r.URL.Path, *event, r.Header) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + router.log.Debug("Event processing stopped because sync plugin subscription returned an error.", + zap.Object("event", event), + zap.Error(err)) return } - if event.Type == eventpkg.TypeHTTP || event.Type == eventpkg.TypeInvoke { - router.handleSyncEvent(event, payload, w, r) - } else if r.Method == http.MethodPost { - router.enqueueWork(r.URL.Path, event.Type, payload) + if event.Type == eventpkg.TypeHTTP || (r.Method == http.MethodPost && event.Type == eventpkg.TypeInvoke) { + router.handleSyncEvent(event, w, r) + } else if r.Method == http.MethodPost && !event.IsSystem() { + router.enqueueWork(r.URL.Path, event) w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, "custom event emitted with non POST method") } } -// StartWorkers spins up NWorkers goroutines for processing +// StartWorkers spins up workerNumber goroutines for processing // the event subscriptions. func (router *Router) StartWorkers() { router.Lock() @@ -82,17 +90,16 @@ func (router *Router) StartWorkers() { router.active = true if router.work == nil { - router.work = make(chan event, router.NWorkers*2) + router.work = make(chan workEvent, router.workerNumber*2) } - for i := 0; i < int(router.NWorkers); i++ { + for i := 0; i < int(router.workerNumber); i++ { router.drainWaitGroup.Add(1) go router.loop() } } -// Drain causes new requests to return 503, and blocks until -// the work queue is processed. +// Drain causes new requests to return 503, and blocks until the work queue is processed. func (router *Router) Drain() { // try to close the draining chan router.Lock() @@ -165,20 +172,14 @@ func (router *Router) WaitForSubscriber(path string, eventType eventpkg.Type) <- return updatedChan } -const ( - // headerFunctionID is a header name for specifying function id for sync invocation. - headerFunctionID = "function-id" - - internalFunctionError = "gateway.info.functionError" -) +// headerFunctionID is a header name for specifying function id for sync invocation. +const headerFunctionID = "function-id" var ( errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function") ) -func (router *Router) handleSyncEvent(event *eventpkg.Event, payload []byte, w http.ResponseWriter, r *http.Request) { - router.log.Debug("Event received.", zap.String("event", string(payload))) - +func (router *Router) handleSyncEvent(event *eventpkg.Event, w http.ResponseWriter, r *http.Request) { var resp []byte var functionID functions.FunctionID @@ -187,40 +188,23 @@ func (router *Router) handleSyncEvent(event *eventpkg.Event, payload []byte, w h } else if event.Type == eventpkg.TypeHTTP { backingFunction, params := router.targetCache.HTTPBackingFunction(strings.ToUpper(r.Method), r.URL.EscapedPath()) if backingFunction == nil { - router.log.Debug("Function not found for HTTP event.", zap.String("event", string(payload))) - http.Error(w, "Resource not found", http.StatusNotFound) + router.log.Debug("Function not found for HTTP event.", zap.Object("event", event)) + http.Error(w, "resource not found", http.StatusNotFound) return } httpdata := event.Data.(*eventpkg.HTTPEvent) httpdata.Params = params event.Data = httpdata - var err error - payload, err = json.Marshal(event) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - functionID = *backingFunction } - router.log.Debug("Function triggered.", zap.String("functionId", string(functionID)), zap.String("event", string(payload))) - - resp, err := router.callFunction(functionID, payload) + resp, err := router.callFunction(functionID, *event) if err != nil { - router.log.Info("Function invocation failed.", - zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.Error(err)) - http.Error(w, err.Error(), http.StatusInternalServerError) - router.emitFunctionErrorEvent(functionID, payload, err) return } - router.log.Debug("Function finished.", - zap.String("functionId", string(functionID)), zap.String("event", string(payload)), - zap.String("response", string(resp))) - if event.Type == eventpkg.TypeHTTP { httpResponse := &HTTPResponse{StatusCode: http.StatusOK} err = json.Unmarshal(resp, httpResponse) @@ -247,14 +231,15 @@ func (router *Router) handleSyncEvent(event *eventpkg.Event, payload []byte, w h } } -func (router *Router) enqueueWork(path string, eventType eventpkg.Type, payload []byte) { - router.log.Debug("Event received.", zap.String("path", path), zap.String("event", string(payload))) +func (router *Router) enqueueWork(path string, event *eventpkg.Event) { + if event.IsSystem() { + router.log.Debug("System event received.", zap.Object("event", event)) + } select { - case router.work <- event{ - path: path, - eventType: eventType, - payload: payload, + case router.work <- workEvent{ + path: path, + event: *event, }: default: // We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now. @@ -263,7 +248,7 @@ func (router *Router) enqueueWork(path string, eventType eventpkg.Type, payload } // callFunction looks up a function and calls it. -func (router *Router) callFunction(backingFunctionID functions.FunctionID, payload []byte) ([]byte, error) { +func (router *Router) callFunction(backingFunctionID functions.FunctionID, event eventpkg.Event) ([]byte, error) { backingFunction := router.targetCache.Function(backingFunctionID) if backingFunction == nil { return []byte{}, errUnableToLookUpRegisteredFunction @@ -273,18 +258,45 @@ func (router *Router) callFunction(backingFunctionID functions.FunctionID, paylo if backingFunction.Provider.Type == functions.Weighted { chosen, err := backingFunction.Provider.Weighted.Choose() if err != nil { - return []byte{}, err + return nil, err } chosenFunction = chosen } + router.log.Debug("Invoking function.", zap.String("functionId", string(backingFunctionID)), zap.Object("event", event)) + err := router.emitSystemFunctionInvoking(backingFunctionID, event) + if err != nil { + router.log.Debug("Event processing stopped because sync plugin subscription returned an error.", + zap.Object("event", event), + zap.Error(err)) + return nil, err + } + // Call the target backing function. f := router.targetCache.Function(chosenFunction) if f == nil { return []byte{}, errUnableToLookUpRegisteredFunction } - return f.Call(payload) + payload, err := json.Marshal(event) + if err != nil { + return nil, err + } + + result, err := f.Call(payload) + if err != nil { + router.log.Info("Function invocation failed.", + zap.String("functionId", string(backingFunctionID)), zap.Object("event", event), zap.Error(err)) + + router.emitSystemFunctionInvocationFailed(backingFunctionID, event, err) + } else { + router.log.Debug("Function invoked.", + zap.String("functionId", string(backingFunctionID)), zap.Object("event", event), zap.ByteString("result", result)) + + router.emitSystemFunctionInvoked(backingFunctionID, event, payload) + } + + return result, err } // loop is the main loop for a pub/sub worker goroutine @@ -326,36 +338,49 @@ func (router *Router) loop() { } // processEvent call all functions subscribed for an event -func (router *Router) processEvent(e event) { - subscribers := router.targetCache.SubscribersOfEvent(e.path, e.eventType) +func (router *Router) processEvent(e workEvent) { + subscribers := router.targetCache.SubscribersOfEvent(e.path, e.event.Type) for _, subscriber := range subscribers { - router.log.Debug("Function triggered.", - zap.String("functionId", string(subscriber)), zap.String("path", e.path), zap.String("event", string(e.payload))) + router.callFunction(subscriber, e.event) + } +} - resp, err := router.callFunction(subscriber, e.payload) +func (router *Router) emitSystemEventReceived(path string, event eventpkg.Event, headers http.Header) error { + system := eventpkg.NewEvent( + eventpkg.SystemEventReceivedType, + mimeJSON, + eventpkg.SystemEventReceivedData{Path: path, Event: event, Headers: headers}, + ) + router.enqueueWork("/", system) + return router.plugins.React(system) +} - if err != nil { - router.log.Info("Function invocation failed.", - zap.String("functionId", string(subscriber)), zap.String("path", e.path), zap.String("event", string(e.payload)), zap.Error(err)) - - router.emitFunctionErrorEvent(subscriber, e.payload, err) - } else { - router.log.Debug("Function finished.", - zap.String("functionId", string(subscriber)), zap.String("path", e.path), zap.String("event", string(e.payload)), - zap.String("response", string(resp))) - } - } +func (router *Router) emitSystemFunctionInvoking(functionID functions.FunctionID, event eventpkg.Event) error { + system := eventpkg.NewEvent( + eventpkg.SystemFunctionInvokingType, + mimeJSON, + eventpkg.SystemFunctionInvokingData{FunctionID: functionID, Event: event}, + ) + router.enqueueWork("/", system) + return router.plugins.React(system) } -func (router *Router) emitFunctionErrorEvent(functionID functions.FunctionID, payload []byte, err error) { +func (router *Router) emitSystemFunctionInvoked(functionID functions.FunctionID, event eventpkg.Event, result []byte) error { + system := eventpkg.NewEvent( + eventpkg.SystemFunctionInvokedType, + mimeJSON, + eventpkg.SystemFunctionInvokedData{FunctionID: functionID, Event: event, Result: result}) + router.enqueueWork("/", system) + return router.plugins.React(system) +} + +func (router *Router) emitSystemFunctionInvocationFailed(functionID functions.FunctionID, event eventpkg.Event, err error) { if _, ok := err.(*functions.ErrFunctionError); ok { - internal := eventpkg.NewEvent(internalFunctionError, mimeJSON, struct { + system := eventpkg.NewEvent("gateway.function.invocationFailed", mimeJSON, struct { FunctionID string `json:"functionId"` }{string(functionID)}) - payload, err = json.Marshal(internal) - if err == nil { - router.enqueueWork("/", internal.Type, payload) - } + + router.enqueueWork("/", system) } } diff --git a/router/router_test.go b/router/router_test.go new file mode 100644 index 0000000..aee3412 --- /dev/null +++ b/router/router_test.go @@ -0,0 +1,107 @@ +package router + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "go.uber.org/zap" + + "github.com/golang/mock/gomock" + "github.com/serverless/event-gateway/event" + "github.com/serverless/event-gateway/functions" + "github.com/serverless/event-gateway/internal/metrics" + "github.com/serverless/event-gateway/internal/pathtree" + "github.com/serverless/event-gateway/plugin" + "github.com/serverless/event-gateway/router/mock" + "github.com/stretchr/testify/assert" +) + +func TestRouterServeHTTP_StatusUnavailableWhenDraining(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + target := mock.NewMockTargeter(ctrl) + router := testrouter(target) + router.Drain() + + req, _ := http.NewRequest(http.MethodGet, "/", nil) + recorder := httptest.NewRecorder() + router.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusServiceUnavailable, recorder.Code) + assert.Equal(t, "Service Unavailable\n", recorder.Body.String()) +} + +func TestRouterServeHTTP_HTTPEventFunctionNotFound(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + target := mock.NewMockTargeter(ctrl) + target.EXPECT().HTTPBackingFunction(http.MethodGet, "/notfound").Return(nil, pathtree.Params{}).MaxTimes(1) + target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1) + router := testrouter(target) + + req, _ := http.NewRequest(http.MethodGet, "/notfound", nil) + recorder := httptest.NewRecorder() + router.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusNotFound, recorder.Code) + assert.Equal(t, "resource not found\n", recorder.Body.String()) +} + +func TestRouterServeHTTP_InvokeEventFunctionNotFound(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + target := mock.NewMockTargeter(ctrl) + target.EXPECT().Function(functions.FunctionID("testfunc")).Return(nil).MaxTimes(1) + target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1) + router := testrouter(target) + + req, _ := http.NewRequest(http.MethodPost, "/", nil) + req.Header.Set("event", "invoke") + req.Header.Set("function-id", "testfunc") + recorder := httptest.NewRecorder() + router.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusInternalServerError, recorder.Code) + assert.Equal(t, "unable to look up registered function\n", recorder.Body.String()) +} + +func TestRouterServeHTTP_ErrorMalformedJSONRequest(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + target := mock.NewMockTargeter(ctrl) + router := testrouter(target) + + req, _ := http.NewRequest(http.MethodPost, "/", strings.NewReader("not json")) + req.Header.Set("content-type", "application/json") + recorder := httptest.NewRecorder() + router.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusBadRequest, recorder.Code) + assert.Equal(t, "malformed JSON body\n", recorder.Body.String()) +} + +func TestRouterServeHTTP_ErrorOnCustomEventEmittedWithNonPostMethod(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + target := mock.NewMockTargeter(ctrl) + target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]functions.FunctionID{}).MaxTimes(1) + router := testrouter(target) + + req, _ := http.NewRequest(http.MethodGet, "/", nil) + req.Header.Set("event", "user.created") + recorder := httptest.NewRecorder() + router.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusBadRequest, recorder.Code) + assert.Equal(t, "custom event emitted with non POST method\n", recorder.Body.String()) +} + +func testrouter(target Targeter) *Router { + log := zap.NewNop() + plugins := plugin.NewManager([]string{}, log) + router := New(target, plugins, metrics.DroppedPubSubEvents, log) + router.StartWorkers() + return router +} diff --git a/router/targeter.go b/router/targeter.go new file mode 100644 index 0000000..47f940f --- /dev/null +++ b/router/targeter.go @@ -0,0 +1,14 @@ +package router + +import ( + "github.com/serverless/event-gateway/event" + "github.com/serverless/event-gateway/functions" + "github.com/serverless/event-gateway/internal/pathtree" +) + +// Targeter is an interface for retrieving cached configuration for driving performance-sensitive routing decisions. +type Targeter interface { + HTTPBackingFunction(method, path string) (*functions.FunctionID, pathtree.Params) + Function(functionID functions.FunctionID) *functions.Function + SubscribersOfEvent(path string, eventType event.Type) []functions.FunctionID +}