Skip to content

Commit

Permalink
Fix a few issues with functionbeat (elastic#8713)
Browse files Browse the repository at this point in the history
- Allow users to define the S3 bucket used for artifact
- Replace the log group name with valid string
- Enforce function name to only contains some chars
- Enforce log group name validation
- Add better code handling when waiting for cloudformation status.
  • Loading branch information
ph committed Oct 24, 2018
1 parent c64c630 commit b735836
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 76 deletions.
9 changes: 6 additions & 3 deletions x-pack/functionbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
# Configure which S3 bucket we should upload the lambda artifact.
functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Define the list of function availables, each function required to have a unique name.
- name: fn_cloudwatch_logs
- name: cloudwatch
type: cloudwatch_logs

# Description of the method to help identify them when you run multiples functions.
Expand All @@ -40,8 +43,8 @@ functionbeat.provider.aws.functions:

# List of cloudwatch log group registered to that function.
triggers:
- log_group_name: /aws/lambda/functionbeat-cloudwatch_logs
filter_pattern: mylog_
- log_group_name: /aws/lambda/functionbeat-cloudwatch
#filter_pattern: mylog_

# Define custom processors for this function.
#processors:
Expand Down
10 changes: 7 additions & 3 deletions x-pack/functionbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#

# Configure which S3 bucket we should upload the lambda artifact.
functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Accepts events from a cloudwatch log group.
- name: fn_cloudwatch_logs
- name: cloudwatch
type: cloudwatch_logs
# The IAM role that the lambda will take when executing your function.
role: iam
# List of cloudwatch streams registered to this function.
triggers:
- log_group_name: /aws/lambda/functionbeat-cloudwatch_logs
- log_group_name: /aws/lambda/functionbeat-cloudwatch
filter_name: myfiltername
filter_pattern: mylog_
#filter_pattern: mylog_

# Accepts events from a SQS queue.
# - name: fn_sqs
Expand Down
31 changes: 27 additions & 4 deletions x-pack/functionbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ package config

import (
"fmt"
"regexp"

"github.com/elastic/beats/libbeat/common"
)

var (
functionPattern = "^[A-Za-z][A-Za-z0-9\\-]{0,139}$"
functionRE = regexp.MustCompile(functionPattern)
)

// ConfigOverrides overrides the defaults provided by libbeat.
var ConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{
"path.data": "/tmp",
Expand Down Expand Up @@ -41,9 +47,9 @@ type ProviderConfig struct {

// FunctionConfig minimal configuration from each function.
type FunctionConfig struct {
Type string `config:"type"`
Name string `config:"name"`
Enabled bool `config:"enabled"`
Type string `config:"type"`
Name functionName `config:"name"`
Enabled bool `config:"enabled"`
}

// DefaultConfig is the default configuration for Functionbeat.
Expand All @@ -54,9 +60,26 @@ var DefaultFunctionConfig = FunctionConfig{
Enabled: true,
}

type functionName string

func (f *functionName) Unpack(s string) error {
if !functionRE.MatchString(s) {
return fmt.Errorf(
"invalid name: '%s', name must match [a-zA-Z0-9-] and be at most 140 characters",
s,
)
}
*f = functionName(s)
return nil
}

func (f *functionName) String() string {
return string(*f)
}

// Validate enforces that function names are unique.
func (p *ProviderConfig) Validate() error {
names := make(map[string]bool)
names := make(map[functionName]bool)
for _, rawfn := range p.Functions {
fc := FunctionConfig{}
rawfn.Unpack(&fc)
Expand Down
17 changes: 17 additions & 0 deletions x-pack/functionbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,20 @@ func TestNameMustBeUnique(t *testing.T) {
})
}
}

func TestFunctionName(t *testing.T) {
t.Run("valid function name", func(t *testing.T) {
f := functionName("")
err := f.Unpack("hello-world")
if !assert.NoError(t, err) {
return
}
assert.Equal(t, functionName("hello-world"), f)
})

t.Run("invalid function name", func(t *testing.T) {
f := functionName("")
err := f.Unpack("hello world")
assert.Error(t, err)
})
}
9 changes: 6 additions & 3 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#
# Configure which S3 bucket we should upload the lambda artifact.
functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Define the list of function availables, each function required to have a unique name.
- name: fn_cloudwatch_logs
- name: cloudwatch
type: cloudwatch_logs

# Description of the method to help identify them when you run multiples functions.
Expand All @@ -40,8 +43,8 @@ functionbeat.provider.aws.functions:

# List of cloudwatch log group registered to that function.
triggers:
- log_group_name: /aws/lambda/functionbeat-cloudwatch_logs
filter_pattern: mylog_
- log_group_name: /aws/lambda/functionbeat-cloudwatch
#filter_pattern: mylog_

# Define custom processors for this function.
#processors:
Expand Down
10 changes: 7 additions & 3 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#

# Configure which S3 bucket we should upload the lambda artifact.
functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Accepts events from a cloudwatch log group.
- name: fn_cloudwatch_logs
- name: cloudwatch
type: cloudwatch_logs
# The IAM role that the lambda will take when executing your function.
role: iam
# List of cloudwatch streams registered to this function.
triggers:
- log_group_name: /aws/lambda/functionbeat-cloudwatch_logs
- log_group_name: /aws/lambda/functionbeat-cloudwatch
filter_name: myfiltername
filter_pattern: mylog_
#filter_pattern: mylog_

# Accepts events from a SQS queue.
# - name: fn_sqs
Expand Down
26 changes: 18 additions & 8 deletions x-pack/functionbeat/provider/aws/cli_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
const (
// AWS lambda currently support go 1.x as a runtime.
runtime = "go1.x"
bucket = "functionbeat-deploy"
handlerName = "functionbeat"
)

Expand All @@ -47,6 +46,7 @@ type CLIManager struct {
provider provider.Provider
awsCfg aws.Config
log *logp.Logger
config *Config
}

func (c *CLIManager) findFunction(name string) (installer, error) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *CLIManager) template(function installer, name, templateLoc string) *clo
template.Resources[prefix("")] = &AWSLambdaFunction{
AWSLambdaFunction: &cloudformation.AWSLambdaFunction{
Code: &cloudformation.AWSLambdaFunction_Code{
S3Bucket: bucket,
S3Bucket: c.bucket(),
S3Key: templateLoc,
},
Description: lambdaConfig.Description,
Expand Down Expand Up @@ -198,33 +198,33 @@ func (c *CLIManager) deployTemplate(update bool, name string) error {
c.log.Debugf("Using cloudformation template:\n%s", json)

executer := newExecutor(c.log)
executer.Add(newOpEnsureBucket(c.log, c.awsCfg, bucket))
executer.Add(newOpUploadToBucket(c.log, c.awsCfg, bucket, codeLoc, content))
executer.Add(newOpEnsureBucket(c.log, c.awsCfg, c.bucket()))
executer.Add(newOpUploadToBucket(c.log, c.awsCfg, c.bucket(), codeLoc, content))
executer.Add(newOpUploadToBucket(
c.log,
c.awsCfg,
bucket,
c.bucket(),
"functionbeat-deployment/"+name+"/cloudformation-template-create.json",
json,
))
if update {
executer.Add(newOpUpdateCloudFormation(
c.log,
c.awsCfg,
"https://s3.amazonaws.com/"+bucket+"/functionbeat-deployment/"+name+"/cloudformation-template-create.json",
"https://s3.amazonaws.com/"+c.bucket()+"/functionbeat-deployment/"+name+"/cloudformation-template-create.json",
c.stackName(name),
))
} else {
executer.Add(newOpCreateCloudFormation(
c.log,
c.awsCfg,
"https://s3.amazonaws.com/"+bucket+"/functionbeat-deployment/"+name+"/cloudformation-template-create.json",
"https://s3.amazonaws.com/"+c.bucket()+"/functionbeat-deployment/"+name+"/cloudformation-template-create.json",
c.stackName(name),
))
}

executer.Add(newOpWaitCloudFormation(c.log, c.awsCfg, c.stackName(name)))
executer.Add(newOpDeleteFileBucket(c.log, c.awsCfg, bucket, codeLoc))
executer.Add(newOpDeleteFileBucket(c.log, c.awsCfg, c.bucket(), codeLoc))

if err := executer.Execute(); err != nil {
if rollbackErr := executer.Rollback(); rollbackErr != nil {
Expand Down Expand Up @@ -278,6 +278,10 @@ func (c *CLIManager) Remove(name string) error {
return nil
}

func (c *CLIManager) bucket() string {
return string(c.config.DeployBucket)
}

// NewCLI returns the interface to manage function on Amazon lambda.
func NewCLI(
log *logp.Logger,
Expand All @@ -289,7 +293,13 @@ func NewCLI(
return nil, err
}

config := &Config{}
if err := cfg.Unpack(config); err != nil {
return nil, err
}

return &CLIManager{
config: config,
provider: provider,
awsCfg: awsCfg,
log: logp.NewLogger("aws lambda cli"),
Expand Down
44 changes: 39 additions & 5 deletions x-pack/functionbeat/provider/aws/cloudwatch_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"

Expand All @@ -22,6 +24,11 @@ import (
"github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer"
)

var (
logGroupNamePattern = "^[\\.\\-_/#A-Za-z0-9]+$"
logGroupNameRE = regexp.MustCompile(logGroupNamePattern)
)

// CloudwatchLogsConfig is the configuration for the cloudwatchlogs event type.
type CloudwatchLogsConfig struct {
Triggers []*CloudwatchLogsTriggerConfig `config:"triggers"`
Expand All @@ -32,8 +39,8 @@ type CloudwatchLogsConfig struct {

// CloudwatchLogsTriggerConfig is the configuration for the specific triggers for cloudwatch.
type CloudwatchLogsTriggerConfig struct {
LogGroupName string `config:"log_group_name" validate:"nonzero,required"`
FilterPattern string `config:"filter_pattern"`
LogGroupName logGroupName `config:"log_group_name" validate:"nonzero,required"`
FilterPattern string `config:"filter_pattern"`
}

// Validate validates the configuration.
Expand All @@ -44,6 +51,33 @@ func (cfg *CloudwatchLogsConfig) Validate() error {
return nil
}

// DOC: see validations rules at https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html
type logGroupName string

// Unpack takes a string and validate the log group format.
func (l *logGroupName) Unpack(s string) error {
const max = 512
const min = 1

if len(s) > max {
return fmt.Errorf("log group name '%s' is too long, maximum length is %d", s, max)
}

if len(s) < min {
return fmt.Errorf("log group name too short, minimum length is %d", min)
}

if !logGroupNameRE.MatchString(s) {
return fmt.Errorf(
"invalid characters in log group name '%s', name must match regular expression: '%s'",
s,
logGroupNamePattern,
)
}
*l = logGroupName(s)
return nil
}

// CloudwatchLogs receives CloudwatchLogs events from a lambda function and forward the logs to
// an Elasticsearch cluster.
type CloudwatchLogs struct {
Expand Down Expand Up @@ -157,7 +191,7 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template {
":",
cloudformation.Ref("AWS::AccountId"),
":log-group:",
trigger.LogGroupName,
string(trigger.LogGroupName),
":*",
},
),
Expand All @@ -168,10 +202,10 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template {
}

// doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-subscriptionfilter.html
template.Resources[prefix("SubscriptionFilter"+normalize(trigger.LogGroupName))] = &AWSLogsSubscriptionFilter{
template.Resources[prefix("SubscriptionFilter"+normalize(string(trigger.LogGroupName)))] = &AWSLogsSubscriptionFilter{
DestinationArn: cloudformation.GetAtt(prefix(""), "Arn"),
FilterPattern: trigger.FilterPattern,
LogGroupName: trigger.LogGroupName,
LogGroupName: string(trigger.LogGroupName),
}
}
return template
Expand Down
31 changes: 31 additions & 0 deletions x-pack/functionbeat/provider/aws/cloudwatch_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,34 @@ func generateCloudwatchLogRawEvent() events.CloudwatchLogsEvent {
},
}
}

func TestLogGroupName(t *testing.T) {
t.Run("valid name", func(t *testing.T) {
l := logGroupName("")
err := l.Unpack("helloworld")
if !assert.NoError(t, err) {
return
}

assert.Equal(t, logGroupName("helloworld"), l)
})

t.Run("fail if contains invalid chars", func(t *testing.T) {
l := logGroupName("")
err := l.Unpack("hello world")
assert.Error(t, err)
})

t.Run("fail if too short", func(t *testing.T) {
l := logGroupName("")
err := l.Unpack("")
assert.Error(t, err)
})

t.Run("fail if above 512 chars", func(t *testing.T) {
r, _ := common.RandomBytes(513)
l := logGroupName("")
err := l.Unpack(string(r[:513]))
assert.Error(t, err)
})
}
Loading

0 comments on commit b735836

Please sign in to comment.