From 07964706101c629bfaa0bea515dda0a2fc3d1fbf Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Wed, 24 Oct 2018 09:15:42 -0400 Subject: [PATCH] Add support for the CLI to push a lambda to monitor SQS queues. (#8649) * Add support for the CLI to push a lambda to monitor SQS queues. This all support to configure triggers in the configuration file to allow a lambda function to monitor one or more queue. Note: we assume that the body of an SQS message is a string, if we receive a JSON encoded string we can use the `decode_json_fields`. --- libbeat/common/string.go | 24 +++++ libbeat/common/string_test.go | 64 +++++++++++++ x-pack/functionbeat/Makefile | 3 +- x-pack/functionbeat/_meta/beat.reference.yml | 44 ++++++++- x-pack/functionbeat/_meta/beat.yml | 93 +++++++++++++----- .../functionbeat/functionbeat.reference.yml | 44 ++++++++- x-pack/functionbeat/functionbeat.yml | 95 ++++++++++++++----- .../functionbeat/provider/aws/cli_manager.go | 9 +- .../provider/aws/cli_manager_test.go | 31 ++++++ .../provider/aws/cloudwatch_logs.go | 7 +- x-pack/functionbeat/provider/aws/sqs.go | 60 +++++++++++- x-pack/functionbeat/provider/aws/sqs_test.go | 5 + x-pack/functionbeat/provider/registry.go | 4 + x-pack/functionbeat/provider/registry_test.go | 68 +++++++++++++ 14 files changed, 490 insertions(+), 61 deletions(-) create mode 100644 libbeat/common/string_test.go diff --git a/libbeat/common/string.go b/libbeat/common/string.go index d9cd6215a26f..e2c4017a876f 100644 --- a/libbeat/common/string.go +++ b/libbeat/common/string.go @@ -17,6 +17,8 @@ package common +import "strings" + // NetString store the byte length of the data that follows, making it easier // to unambiguously pass text and byte data between programs that could be // sensitive to values that could be interpreted as delimiters or terminators @@ -28,3 +30,25 @@ type NetString []byte func (n NetString) MarshalText() ([]byte, error) { return n, nil } + +// RemoveChars takes a string candidate and a string of chars to remove from the candidate. +func RemoveChars(s string, chars string) string { + buf := &strings.Builder{} + for len(s) > 0 { + idx := strings.IndexAny(s, chars) + if idx < 0 { + if buf.Len() > 0 { + buf.WriteString(s) + } + break + } + + buf.WriteString(s[:idx]) + s = s[idx+1:] + } + + if buf.Len() == 0 { + return s + } + return buf.String() +} diff --git a/libbeat/common/string_test.go b/libbeat/common/string_test.go new file mode 100644 index 000000000000..2ad2ed5a4b5f --- /dev/null +++ b/libbeat/common/string_test.go @@ -0,0 +1,64 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRemoveChar(t *testing.T) { + tests := []struct { + title string + candidate string + chars string + expected string + }{ + { + title: "when we have one char to replace", + candidate: "hello foobar", + chars: "a", + expected: "hello foobr", + }, + { + title: "when we have multiple chars to replace", + candidate: "hello foobar", + chars: "all", + expected: "heo foobr", + }, + { + title: "when we have no chars to replace", + candidate: "hello foobar", + chars: "x", + expected: "hello foobar", + }, + { + title: "when we have an empty string", + candidate: "", + chars: "x", + expected: "", + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + assert.Equal(t, test.expected, RemoveChars(test.candidate, test.chars)) + }) + } +} diff --git a/x-pack/functionbeat/Makefile b/x-pack/functionbeat/Makefile index 7a9f26670caf..0fc8cf629236 100644 --- a/x-pack/functionbeat/Makefile +++ b/x-pack/functionbeat/Makefile @@ -15,8 +15,7 @@ include $(ES_BEATS)/libbeat/scripts/Makefile .PHONY: collect collect: -# TODO(ph) This is used for debugging until we change the build to create 2 artifacts, -# we will do this in another PR. +# Generate an artifact to be push on serverless provider. .PHONY: linux linux: GOOS=linux go build -o pkg/functionbeat diff --git a/x-pack/functionbeat/_meta/beat.reference.yml b/x-pack/functionbeat/_meta/beat.reference.yml index a230bd50ed2b..e29d6061d8b8 100644 --- a/x-pack/functionbeat/_meta/beat.reference.yml +++ b/x-pack/functionbeat/_meta/beat.reference.yml @@ -16,7 +16,9 @@ functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy" functionbeat.provider.aws.functions: # Define the list of function availables, each function required to have a unique name. + # Create a function that accepts events coming from cloudwatchlogs. - name: cloudwatch + enabled: false type: cloudwatch_logs # Description of the method to help identify them when you run multiples functions. @@ -33,7 +35,7 @@ functionbeat.provider.aws.functions: #memory_size: 128MiB # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. - # dead_letter_config.target_arn: + #dead_letter_config.target_arn: # Optional fields that you can specify to add additional information to the # output. Fields can be scalar values, arrays, dictionaries, or any nested @@ -50,3 +52,43 @@ functionbeat.provider.aws.functions: #processors: # - dissect: # tokenizer: "%{key1} %{key2}" + + # Create a function that accepts events from SQS queues. + - name: sqs + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for sqs events" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # List of cloudwatch log group registered to that function. + triggers: + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false diff --git a/x-pack/functionbeat/_meta/beat.yml b/x-pack/functionbeat/_meta/beat.yml index 05dfbf9fb6ef..a57bc650f710 100644 --- a/x-pack/functionbeat/_meta/beat.yml +++ b/x-pack/functionbeat/_meta/beat.yml @@ -12,30 +12,77 @@ # Configure functions to run on AWS Lambda, currently we assume that the credentials # are present in the environment to correctly create the function when using the CLI. # - -# Configure which S3 bucket we should upload the lambda artifact. -functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy" - functionbeat.provider.aws.functions: -# Accepts events from a cloudwatch log group. - - name: cloudwatch + # Define the list of function availables, each function required to have a unique name. + # Create a function that accepts events coming from cloudwatchlogs. + - name: fn_cloudwatch_logs + enabled: false type: cloudwatch_logs - # The IAM role that the lambda will take when executing your function. - role: iam - # List of cloudwatch streams registered to this function. + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for cloudwatch logs" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # List of cloudwatch log group registered to that function. triggers: - - log_group_name: /aws/lambda/functionbeat-cloudwatch - filter_name: myfiltername - #filter_pattern: mylog_ + - log_group_name: /aws/lambda/functionbeat-cloudwatch_logs + filter_pattern: mylog_ -# Accepts events from a SQS queue. -# - name: fn_sqs -# type: sqs -# -# Accepts events form a Kinesis stream -# - name: fn_kinesis -# type: kinesis -# -# Accepts events from an api gateway proxy call. -# - name: fn_apigateway_proxy -# type: api_gateway_proxy + # Define custom processors for this function. + #processors: + # - dissect: + # tokenizer: "%{key1} %{key2}" + + # Create a function that accepts events from SQS queues. + - name: sqs + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for SQS events" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 960fee60703e..0528c87a4f2c 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -16,7 +16,9 @@ functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy" functionbeat.provider.aws.functions: # Define the list of function availables, each function required to have a unique name. + # Create a function that accepts events coming from cloudwatchlogs. - name: cloudwatch + enabled: false type: cloudwatch_logs # Description of the method to help identify them when you run multiples functions. @@ -33,7 +35,7 @@ functionbeat.provider.aws.functions: #memory_size: 128MiB # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. - # dead_letter_config.target_arn: + #dead_letter_config.target_arn: # Optional fields that you can specify to add additional information to the # output. Fields can be scalar values, arrays, dictionaries, or any nested @@ -51,6 +53,46 @@ functionbeat.provider.aws.functions: # - dissect: # tokenizer: "%{key1} %{key2}" + # Create a function that accepts events from SQS queues. + - name: sqs + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for sqs events" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # List of cloudwatch log group registered to that function. + triggers: + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index 99833f8d8919..1e7e79c8da64 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -12,33 +12,80 @@ # Configure functions to run on AWS Lambda, currently we assume that the credentials # are present in the environment to correctly create the function when using the CLI. # - -# Configure which S3 bucket we should upload the lambda artifact. -functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy" - functionbeat.provider.aws.functions: -# Accepts events from a cloudwatch log group. - - name: cloudwatch + # Define the list of function availables, each function required to have a unique name. + # Create a function that accepts events coming from cloudwatchlogs. + - name: fn_cloudwatch_logs + enabled: false type: cloudwatch_logs - # The IAM role that the lambda will take when executing your function. - role: iam - # List of cloudwatch streams registered to this function. - triggers: - - log_group_name: /aws/lambda/functionbeat-cloudwatch - filter_name: myfiltername - #filter_pattern: mylog_ -# Accepts events from a SQS queue. -# - name: fn_sqs -# type: sqs -# -# Accepts events form a Kinesis stream -# - name: fn_kinesis -# type: kinesis -# -# Accepts events from an api gateway proxy call. -# - name: fn_apigateway_proxy -# type: api_gateway_proxy + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for cloudwatch logs" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # List of cloudwatch log group registered to that function. + triggers: + - log_group_name: /aws/lambda/functionbeat-cloudwatch_logs + filter_pattern: mylog_ + + # Define custom processors for this function. + #processors: + # - dissect: + # tokenizer: "%{key1} %{key2}" + + # Create a function that accepts events from SQS queues. + - name: sqs + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for SQS events" + + # Concurrency, is the reserved number of instances for that function. + # Default is unreserved. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false #================================ General ===================================== diff --git a/x-pack/functionbeat/provider/aws/cli_manager.go b/x-pack/functionbeat/provider/aws/cli_manager.go index 6971b241f0ed..c70b73b26860 100644 --- a/x-pack/functionbeat/provider/aws/cli_manager.go +++ b/x-pack/functionbeat/provider/aws/cli_manager.go @@ -25,6 +25,9 @@ const ( // AWS lambda currently support go 1.x as a runtime. runtime = "go1.x" handlerName = "functionbeat" + + // invalidChars for resource name + invalidChars = ":-/" ) // AWSLambdaFunction add 'dependsOn' as a serializable parameters, for no good reason it's @@ -98,7 +101,7 @@ func (c *CLIManager) template(function installer, name, templateLoc string) *clo }, }, Path: "/", - RoleName: "functionbeat-lambda", + RoleName: "functionbeat-lambda-" + name, // Allow the lambda to write log to cloudwatch logs. // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-policy.html Policies: []cloudformation.AWSIAMRole_Policy{ @@ -346,6 +349,10 @@ func mergeTemplate(to, from *cloudformation.Template) error { return nil } +func normalizeResourceName(s string) string { + return common.RemoveChars(s, invalidChars) +} + func codeKey(name string, content []byte) string { sha := sha256.Sum256(content) checksum := base64.RawURLEncoding.EncodeToString(sha[:]) diff --git a/x-pack/functionbeat/provider/aws/cli_manager_test.go b/x-pack/functionbeat/provider/aws/cli_manager_test.go index f2d46159a7c7..6f4a2906d728 100644 --- a/x-pack/functionbeat/provider/aws/cli_manager_test.go +++ b/x-pack/functionbeat/provider/aws/cli_manager_test.go @@ -38,3 +38,34 @@ func TestCodeKey(t *testing.T) { assert.NotEqual(t, codeKey(name, content), codeKey(name, other)) }) } + +func TestNormalize(t *testing.T) { + tests := []struct { + title string + candidate string + chars string + expected string + }{ + { + title: "when the string contains invalid chars", + candidate: "/var/log-alpha/tmp:ok", + expected: "varlogalphatmpok", + }, + { + title: "when we have an empty string", + candidate: "", + expected: "", + }, + { + title: "when we don't have any invalid chars", + candidate: "hello", + expected: "hello", + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + assert.Equal(t, test.expected, normalizeResourceName(test.candidate)) + }) + } +} diff --git a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go b/x-pack/functionbeat/provider/aws/cloudwatch_logs.go index 82f8d21e6a15..01e0d9043aa9 100644 --- a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go +++ b/x-pack/functionbeat/provider/aws/cloudwatch_logs.go @@ -11,7 +11,6 @@ import ( "fmt" "regexp" "strconv" - "strings" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" @@ -197,12 +196,8 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template { ), } - normalize := func(c string) string { - return strings.Replace(c, "/", "", -1) - } - // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-subscriptionfilter.html - template.Resources[prefix("SubscriptionFilter"+normalize(string(trigger.LogGroupName)))] = &AWSLogsSubscriptionFilter{ + template.Resources[prefix("SubscriptionFilter"+normalizeResourceName(string(trigger.LogGroupName)))] = &AWSLogsSubscriptionFilter{ DestinationArn: cloudformation.GetAtt(prefix(""), "Arn"), FilterPattern: trigger.FilterPattern, LogGroupName: string(trigger.LogGroupName), diff --git a/x-pack/functionbeat/provider/aws/sqs.go b/x-pack/functionbeat/provider/aws/sqs.go index 1117ffe5fa2f..fd7a93c07a8f 100644 --- a/x-pack/functionbeat/provider/aws/sqs.go +++ b/x-pack/functionbeat/provider/aws/sqs.go @@ -6,9 +6,11 @@ package aws import ( "context" + "errors" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" + "github.com/awslabs/goformation/cloudformation" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -17,14 +19,42 @@ import ( "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" ) +const batchSize = 10 + +// SQSConfig is the configuration for the SQS event type. +type SQSConfig struct { + Triggers []*SQSTriggerConfig `config:"triggers"` + Description string `config:"description"` + Name string `config:"name" validate:"nonzero,required"` + LambdaConfig *lambdaConfig `config:",inline"` +} + +// SQSTriggerConfig configuration for the current trigger. +type SQSTriggerConfig struct { + EventSourceArn string `config:"event_source_arn"` +} + +// Validate validates the configuration. +func (cfg *SQSConfig) Validate() error { + if len(cfg.Triggers) == 0 { + return errors.New("you need to specify at least one trigger") + } + return nil +} + // SQS receives events from the web service and forward them to elasticsearch. type SQS struct { - log *logp.Logger + log *logp.Logger + config *SQSConfig } // NewSQS creates a new function to receives events from a SQS queue. -func NewSQS(provider provider.Provider, config *common.Config) (provider.Function, error) { - return &SQS{log: logp.NewLogger("sqs")}, nil +func NewSQS(provider provider.Provider, cfg *common.Config) (provider.Function, error) { + config := &SQSConfig{LambdaConfig: DefaultLambdaConfig} + if err := cfg.Unpack(config); err != nil { + return nil, err + } + return &SQS{log: logp.NewLogger("sqs"), config: config}, nil } // Run starts the lambda function and wait for web triggers. @@ -51,3 +81,27 @@ func (s *SQS) createHandler(client core.Client) func(request events.SQSEvent) er func (s *SQS) Name() string { return "sqs" } + +// Template returns the cloudformation template for configuring the service with the specified triggers. +func (s *SQS) Template() *cloudformation.Template { + template := cloudformation.NewTemplate() + + prefix := func(suffix string) string { + return "fnb" + s.config.Name + suffix + } + + for _, trigger := range s.config.Triggers { + resourceName := prefix("SQS") + normalizeResourceName(trigger.EventSourceArn) + template.Resources[resourceName] = &cloudformation.AWSLambdaEventSourceMapping{ + BatchSize: batchSize, + EventSourceArn: trigger.EventSourceArn, + FunctionName: cloudformation.GetAtt(prefix(""), "Arn"), + } + } + return template +} + +// LambdaConfig returns the configuration to use when creating the lambda. +func (s *SQS) LambdaConfig() *lambdaConfig { + return s.config.LambdaConfig +} diff --git a/x-pack/functionbeat/provider/aws/sqs_test.go b/x-pack/functionbeat/provider/aws/sqs_test.go index 85a2ed944c6a..db4aa93dc05e 100644 --- a/x-pack/functionbeat/provider/aws/sqs_test.go +++ b/x-pack/functionbeat/provider/aws/sqs_test.go @@ -18,6 +18,11 @@ import ( func TestSQS(t *testing.T) { cfg := common.MustNewConfigFrom(map[string]interface{}{ "name": "foobar", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc1234", + }, + }, }) t.Run("when publish is succesful", func(t *testing.T) { diff --git a/x-pack/functionbeat/provider/registry.go b/x-pack/functionbeat/provider/registry.go index a1de06fbc9b4..b36809bcf720 100644 --- a/x-pack/functionbeat/provider/registry.go +++ b/x-pack/functionbeat/provider/registry.go @@ -164,6 +164,10 @@ func FindFunctionByName( continue } + if !c.Enabled { + return nil, fmt.Errorf("function '%s' not enabled for provider '%s'", name, provider.Name()) + } + f, err := registry.LookupFunction(provider.Name(), c.Type) if err != nil { return nil, err diff --git a/x-pack/functionbeat/provider/registry_test.go b/x-pack/functionbeat/provider/registry_test.go index 22e86cabc052..9340a5917e2b 100644 --- a/x-pack/functionbeat/provider/registry_test.go +++ b/x-pack/functionbeat/provider/registry_test.go @@ -206,3 +206,71 @@ func testStrInSlice(t *testing.T) { assert.Equal(t, -1, strInSlice(haystack, "robert")) }) } + +func TestFindFunctionByName(t *testing.T) { + t.Run("when the function is not enabled", withRegistry(func( + t *testing.T, + global *feature.Registry, + wrapper *Registry, + ) { + configs := []*common.Config{ + common.MustNewConfigFrom(map[string]interface{}{ + "name": "mysqs", + "type": "sqs", + "enabled": false, + }), + } + + myprovider := &mockProvider{} + + _, err := FindFunctionByName(wrapper, myprovider, configs, "mysqs") + assert.Error(t, err) + })) + + t.Run("when the function is enabled", withRegistry(func( + t *testing.T, + global *feature.Registry, + wrapper *Registry, + ) { + fnName := "sqs" + configs := []*common.Config{ + common.MustNewConfigFrom(map[string]interface{}{ + "name": "mysqs", + "type": fnName, + "enabled": true, + }), + } + + name := "myprovider" + myprovider := &mockProvider{name: name} + + providerFn := func(log *logp.Logger, registry *Registry, config *common.Config) (Provider, error) { + return myprovider, nil + } + f := Feature(name, providerFn, feature.NewDetails(name, "provider for testing", feature.Experimental)) + + myfunction := &mockFunction{name} + functionFn := func(provider Provider, config *common.Config) (Function, error) { + return myfunction, nil + } + + fnFeature := FunctionFeature(name, fnName, functionFn, feature.NewDetails( + name, + "provider for testing", + feature.Experimental, + )) + + err := global.Register(f) + if !assert.NoError(t, err) { + return + } + + err = global.Register(fnFeature) + if !assert.NoError(t, err) { + return + } + + _, err = FindFunctionByName(wrapper, myprovider, configs, "mysqs") + assert.NoError(t, err) + })) +}