Skip to content

Commit

Permalink
fix: Allow registering multiple functions with one server for local t…
Browse files Browse the repository at this point in the history
…esting. (#143)

* Allow registering multiple functions with one server for local testing.

* Allow registering multiple functions with one server for local testing.

* Allow registering multiple functions with one server for local testing.

* Let RegisterXXXFunctionContext call registry.Default().RegisterXXX

* fix some nits
  • Loading branch information
jihuin authored Aug 4, 2022
1 parent 5d5bf7a commit 3cab285
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 120 deletions.
121 changes: 67 additions & 54 deletions funcframework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ const (
fnErrorMessageStderrTmpl = "Function error: %v"
)

var (
handler http.Handler
)

// recoverPanic recovers from a panic in a consistent manner. panicSrc should
// describe what was happening when the panic was encountered, for example
// "user function execution". w is an http.ResponseWriter to write a generic
Expand Down Expand Up @@ -86,72 +82,92 @@ func RegisterEventFunction(path string, fn interface{}) {

// RegisterHTTPFunctionContext registers fn as an HTTP function.
func RegisterHTTPFunctionContext(ctx context.Context, path string, fn func(http.ResponseWriter, *http.Request)) error {
server, err := wrapHTTPFunction(path, fn)
if err == nil {
handler = server
}
return err
funcName := fmt.Sprintf("function_at_path_%q", path)
return registry.Default().RegisterHTTP(funcName, fn, registry.WithPath(path))
}

// RegisterEventFunctionContext registers fn as an event function. The function must have two arguments, a
// context.Context and a struct type depending on the event, and return an error. If fn has the
// wrong signature, RegisterEventFunction returns an error.
func RegisterEventFunctionContext(ctx context.Context, path string, fn interface{}) error {
server, err := wrapEventFunction(path, fn)
if err == nil {
handler = server
}
return err
funcName := fmt.Sprintf("function_at_path_%q", path)
return registry.Default().RegisterEvent(funcName, fn, registry.WithPath(path))
}

// RegisterCloudEventFunctionContext registers fn as an cloudevent function.
func RegisterCloudEventFunctionContext(ctx context.Context, path string, fn func(context.Context, cloudevents.Event) error) error {
server, err := wrapCloudEventFunction(ctx, path, fn)
if err == nil {
handler = server
}
return err
funcName := fmt.Sprintf("function_at_path_%q", path)
return registry.Default().RegisterCloudEvent(funcName, fn, registry.WithPath(path))
}

// Start serves an HTTP server with registered function(s).
func Start(port string) error {
// If FUNCTION_TARGET, try to start with that registered function
// If not set, assume non-declarative functions.
target := os.Getenv("FUNCTION_TARGET")
server, err := initServer()
if err != nil {
return err
}
return http.ListenAndServe(":"+port, server)
}

// Check if we have a function resource set, and if so, log progress.
if os.Getenv("K_SERVICE") == "" {
fmt.Printf("Serving function: %s\n", target)
func initServer() (*http.ServeMux, error) {
server := http.NewServeMux()

// If FUNCTION_TARGET is set, only serve this target function at path "/".
// If not set, serve all functions at the registered paths.
if target := os.Getenv("FUNCTION_TARGET"); len(target) > 0 {
fn, ok := registry.Default().GetRegisteredFunction(target)
if !ok {
return nil, fmt.Errorf("no matching function found with name: %q", target)
}
h, err := wrapFunction(fn)
if err != nil {
return nil, fmt.Errorf("failed to serve function %q: %v", target, err)
}
server.Handle("/", h)
return server, nil
}

// Check if there's a registered function, and use if possible
if fn, ok := registry.Default().GetRegisteredFunction(target); ok {
ctx := context.Background()
if fn.HTTPFn != nil {
server, err := wrapHTTPFunction("/", fn.HTTPFn)
if err != nil {
return fmt.Errorf("unexpected error in registerHTTPFunction: %v", err)
}
handler = server
} else if fn.CloudEventFn != nil {
server, err := wrapCloudEventFunction(ctx, "/", fn.CloudEventFn)
if err != nil {
return fmt.Errorf("unexpected error in registerCloudEventFunction: %v", err)
}
handler = server
fns := registry.Default().GetAllFunctions()
for funcName, fn := range fns {
h, err := wrapFunction(fn)
if err != nil {
return nil, fmt.Errorf("failed to serve function %q: %v", funcName, err)
}
server.Handle(fn.Path, h)
}
return server, nil
}

if handler == nil {
return fmt.Errorf("no matching function found with name: %q", target)
func wrapFunction(fn registry.RegisteredFunction) (http.Handler, error) {
// Check if we have a function resource set, and if so, log progress.
if os.Getenv("K_SERVICE") == "" {
fmt.Printf("Serving function %s\n", fn.Name)
}

return http.ListenAndServe(":"+port, handler)
if fn.HTTPFn != nil {
handler, err := wrapHTTPFunction(fn.HTTPFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapHTTPFunction: %v", err)
}
return handler, nil
} else if fn.CloudEventFn != nil {
handler, err := wrapCloudEventFunction(context.Background(), fn.CloudEventFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapCloudEventFunction: %v", err)
}
return handler, nil
} else if fn.EventFn != nil {
handler, err := wrapEventFunction(fn.EventFn)
if err != nil {
return nil, fmt.Errorf("unexpected error in wrapEventFunction: %v", err)
}
return handler, nil
}
return nil, fmt.Errorf("missing function entry in %v", fn)
}

func wrapHTTPFunction(path string, fn func(http.ResponseWriter, *http.Request)) (http.Handler, error) {
h := http.NewServeMux()
h.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
func wrapHTTPFunction(fn func(http.ResponseWriter, *http.Request)) (http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO(b/111823046): Remove following once Cloud Functions does not need flushing the logs anymore.
if os.Getenv("K_SERVICE") != "" {
// Force flush of logs after every function trigger when running on GCF.
Expand All @@ -160,17 +176,15 @@ func wrapHTTPFunction(path string, fn func(http.ResponseWriter, *http.Request))
}
defer recoverPanic(w, "user function execution")
fn(w, r)
})
return h, nil
}), nil
}

func wrapEventFunction(path string, fn interface{}) (http.Handler, error) {
h := http.NewServeMux()
func wrapEventFunction(fn interface{}) (http.Handler, error) {
err := validateEventFunction(fn)
if err != nil {
return nil, err
}
h.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if os.Getenv("K_SERVICE") != "" {
// Force flush of logs after every function trigger when running on GCF.
defer fmt.Println()
Expand All @@ -184,11 +198,10 @@ func wrapEventFunction(path string, fn interface{}) (http.Handler, error) {
}

handleEventFunction(w, r, fn)
})
return h, nil
}), nil
}

func wrapCloudEventFunction(ctx context.Context, path string, fn func(context.Context, cloudevents.Event) error) (http.Handler, error) {
func wrapCloudEventFunction(ctx context.Context, fn func(context.Context, cloudevents.Event) error) (http.Handler, error) {
p, err := cloudevents.NewHTTP()
if err != nil {
return nil, fmt.Errorf("failed to create protocol: %v", err)
Expand Down
Loading

0 comments on commit 3cab285

Please sign in to comment.