Skip to content

Commit

Permalink
Add support for the CLI to push a lambda to monitor SQS queues. (#8649)
Browse files Browse the repository at this point in the history
* Add support for the CLI to push a lambda to monitor SQS queues.

This all support to configure triggers in the configuration file to
allow a lambda function to monitor one or more queue.

Note: we assume that the body of an SQS message is a string, if we receive
a JSON encoded string we can use the `decode_json_fields`.
  • Loading branch information
ph committed Oct 24, 2018
1 parent a90651f commit 0796470
Show file tree
Hide file tree
Showing 14 changed files with 490 additions and 61 deletions.
24 changes: 24 additions & 0 deletions libbeat/common/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package common

import "strings"

// NetString store the byte length of the data that follows, making it easier
// to unambiguously pass text and byte data between programs that could be
// sensitive to values that could be interpreted as delimiters or terminators
Expand All @@ -28,3 +30,25 @@ type NetString []byte
func (n NetString) MarshalText() ([]byte, error) {
return n, nil
}

// RemoveChars takes a string candidate and a string of chars to remove from the candidate.
func RemoveChars(s string, chars string) string {
buf := &strings.Builder{}
for len(s) > 0 {
idx := strings.IndexAny(s, chars)
if idx < 0 {
if buf.Len() > 0 {
buf.WriteString(s)
}
break
}

buf.WriteString(s[:idx])
s = s[idx+1:]
}

if buf.Len() == 0 {
return s
}
return buf.String()
}
64 changes: 64 additions & 0 deletions libbeat/common/string_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestRemoveChar(t *testing.T) {
tests := []struct {
title string
candidate string
chars string
expected string
}{
{
title: "when we have one char to replace",
candidate: "hello foobar",
chars: "a",
expected: "hello foobr",
},
{
title: "when we have multiple chars to replace",
candidate: "hello foobar",
chars: "all",
expected: "heo foobr",
},
{
title: "when we have no chars to replace",
candidate: "hello foobar",
chars: "x",
expected: "hello foobar",
},
{
title: "when we have an empty string",
candidate: "",
chars: "x",
expected: "",
},
}

for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
assert.Equal(t, test.expected, RemoveChars(test.candidate, test.chars))
})
}
}
3 changes: 1 addition & 2 deletions x-pack/functionbeat/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ include $(ES_BEATS)/libbeat/scripts/Makefile
.PHONY: collect
collect:

# TODO(ph) This is used for debugging until we change the build to create 2 artifacts,
# we will do this in another PR.
# Generate an artifact to be push on serverless provider.
.PHONY: linux
linux:
GOOS=linux go build -o pkg/functionbeat
44 changes: 43 additions & 1 deletion x-pack/functionbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Define the list of function availables, each function required to have a unique name.
# Create a function that accepts events coming from cloudwatchlogs.
- name: cloudwatch
enabled: false
type: cloudwatch_logs

# Description of the method to help identify them when you run multiples functions.
Expand All @@ -33,7 +35,7 @@ functionbeat.provider.aws.functions:
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
# dead_letter_config.target_arn:
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
Expand All @@ -50,3 +52,43 @@ functionbeat.provider.aws.functions:
#processors:
# - dissect:
# tokenizer: "%{key1} %{key2}"

# Create a function that accepts events from SQS queues.
- name: sqs
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"

# Concurrency, is the reserved number of instances for that function.
# Default is unreserved.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
triggers:
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false
93 changes: 70 additions & 23 deletions x-pack/functionbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,77 @@
# Configure functions to run on AWS Lambda, currently we assume that the credentials
# are present in the environment to correctly create the function when using the CLI.
#

# Configure which S3 bucket we should upload the lambda artifact.
functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Accepts events from a cloudwatch log group.
- name: cloudwatch
# Define the list of function availables, each function required to have a unique name.
# Create a function that accepts events coming from cloudwatchlogs.
- name: fn_cloudwatch_logs
enabled: false
type: cloudwatch_logs
# The IAM role that the lambda will take when executing your function.
role: iam
# List of cloudwatch streams registered to this function.

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for cloudwatch logs"

# Concurrency, is the reserved number of instances for that function.
# Default is unreserved.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
triggers:
- log_group_name: /aws/lambda/functionbeat-cloudwatch
filter_name: myfiltername
#filter_pattern: mylog_
- log_group_name: /aws/lambda/functionbeat-cloudwatch_logs
filter_pattern: mylog_

# Accepts events from a SQS queue.
# - name: fn_sqs
# type: sqs
#
# Accepts events form a Kinesis stream
# - name: fn_kinesis
# type: kinesis
#
# Accepts events from an api gateway proxy call.
# - name: fn_apigateway_proxy
# type: api_gateway_proxy
# Define custom processors for this function.
#processors:
# - dissect:
# tokenizer: "%{key1} %{key2}"

# Create a function that accepts events from SQS queues.
- name: sqs
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for SQS events"

# Concurrency, is the reserved number of instances for that function.
# Default is unreserved.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false
44 changes: 43 additions & 1 deletion x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ functionbeat.provider.aws.deploy_bucket: "functionbeat-deploy"

functionbeat.provider.aws.functions:
# Define the list of function availables, each function required to have a unique name.
# Create a function that accepts events coming from cloudwatchlogs.
- name: cloudwatch
enabled: false
type: cloudwatch_logs

# Description of the method to help identify them when you run multiples functions.
Expand All @@ -33,7 +35,7 @@ functionbeat.provider.aws.functions:
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
# dead_letter_config.target_arn:
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
Expand All @@ -51,6 +53,46 @@ functionbeat.provider.aws.functions:
# - dissect:
# tokenizer: "%{key1} %{key2}"

# Create a function that accepts events from SQS queues.
- name: sqs
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"

# Concurrency, is the reserved number of instances for that function.
# Default is unreserved.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
triggers:
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

#================================ General ======================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
Loading

0 comments on commit 0796470

Please sign in to comment.