Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][azure-blob-storage] - Added support for new features and removed partial save mechanism #36690

Merged
merged 16 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659]
- Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427]
- Update mito CEL extension library to v1.6.0. {pull}36651[36651]
- Added support for new features in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690]
- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668]

*Auditbeat*
Expand Down
84 changes: 84 additions & 0 deletions x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Now let's explore the configuration attributes a bit more elaborately.
7. <<attrib-max_workers,max_workers>>
8. <<attrib-poll,poll>>
9. <<attrib-poll_interval,poll_interval>>
10. <<attrib-file_selectors,file_selectors>>
11. <<attrib-expand_event_list_from_field,expand_event_list_from_field>>
12. <<attrib-timestamp_epoch,timestamp_epoch>>


[id="attrib-account-name"]
Expand Down Expand Up @@ -214,6 +217,87 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds.
This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always
take priority and override the root level values if both are specified.

[id="attrib-file_selectors"]
[float]
==== `file_selectors`

If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, yml]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
file_selectors:
- regex: '/CloudTrail/'
- regex: 'docs/'
- regex: '/Security-Logs/'
----

[id="attrib-expand_event_list_from_field"]
[float]
==== `expand_event_list_from_field`

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
expand_event_list_from_field: Records
----

NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob.

[id="attrib-timestamp_epoch"]
[float]
==== `timestamp_epoch`

This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v.Properties.LastModified.Unix() < *s.src.TimeStampEpoch { continue

The description doesn't match up with the code I saw. It seems like it filters blobs that are older than the value.

What is the use case for filtering based on a static value? I've seen inputs offer a relative time based filter when user specifies a duration since now like ignore_older: 168h or since: -168h.

Copy link
Contributor Author

@ShourieG ShourieG Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated the description. The use case here is that some customers in the past specially asked for the ability to filter out based on a specific date time, hence added this


["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
timestamp_epoch: 1627233600
----

[id="container-overrides"]
*The sample configs below will explain the container level overriding of attributes a bit further :-*
Expand Down
40 changes: 27 additions & 13 deletions x-pack/filebeat/input/azureblobstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,41 @@ package azureblobstorage

import (
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
)

// MaxWorkers, Poll & PollInterval can be configured at a global level,
// which applies to all containers, as well as at the container level.
// MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can
// be configured at a global level, which applies to all containers. They can also be configured at individual container levels.
// Container level configurations will always override global level values.
type config struct {
AccountName string `config:"account_name" validate:"required"`
StorageURL string `config:"storage_url,omitempty"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
Containers []container `config:"containers" validate:"required"`
AccountName string `config:"account_name" validate:"required"`
StorageURL string `config:"storage_url,omitempty"`
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
Containers []container `config:"containers" validate:"required"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// container contains the config for each specific blob storage container in the root account
type container struct {
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// fileSelectorConfig helps filter out azure blobs based on a regex pattern
type fileSelectorConfig struct {
Regex *match.Matcher `config:"regex" validate:"required"`
// TODO: Add support for reader config in future
}

type authConfig struct {
Expand Down
39 changes: 32 additions & 7 deletions x-pack/filebeat/input/azureblobstorage/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type azurebsInput struct {
serviceURL string
}

// defines the valid range for Unix timestamps for 64 bit integers
var (
minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
)

const (
inputName string = "azure-blob-storage"
)
Expand Down Expand Up @@ -53,12 +59,18 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
var sources []cursor.Source
for _, c := range config.Containers {
container := tryOverrideOrDefault(config, c)
if container.TimeStampEpoch != nil && !isValidUnixTimestamp(*container.TimeStampEpoch) {
return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *container.TimeStampEpoch)
}
sources = append(sources, &Source{
AccountName: config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
AccountName: config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
TimeStampEpoch: container.TimeStampEpoch,
ExpandEventListFromField: container.ExpandEventListFromField,
FileSelectors: container.FileSelectors,
})
}

Expand All @@ -85,25 +97,38 @@ func tryOverrideOrDefault(cfg config, c container) container {
}
c.MaxWorkers = &maxWorkers
}

if c.Poll == nil {
var poll bool
if cfg.Poll != nil {
poll = *cfg.Poll
}
c.Poll = &poll
}

if c.PollInterval == nil {
interval := time.Second * 300
if cfg.PollInterval != nil {
interval = *cfg.PollInterval
}
c.PollInterval = &interval
}
if c.TimeStampEpoch == nil {
c.TimeStampEpoch = cfg.TimeStampEpoch
}
if c.ExpandEventListFromField == "" {
c.ExpandEventListFromField = cfg.ExpandEventListFromField
}
if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) > 0 {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
c.FileSelectors = append(c.FileSelectors, cfg.FileSelectors...)
}
return c
}

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// checks if the timestamp is within the valid range
return minTimestamp <= timestamp && timestamp <= maxTimestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What determines the valid range? I can understand validating that the value is non-negative. But why is there an upper range?

The only reason I can think of for checking the upper value is as a sanity check. It probably does not make sense to configure any value that is in the future.

The bound checking can be added to the struct tag and ucfg will enforce it. Like validate:"min=1" for >=1. https://pkg.go.dev/github.com/elastic/go-ucfg#Config.Unpack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to this being an optional pointer value, struct tags validation causes a nil pointer deference. And yes this is used as a sanity check for permissible timestamp values.

}

func (input *azurebsInput) Name() string {
return inputName
}
Expand Down
26 changes: 16 additions & 10 deletions x-pack/filebeat/input/azureblobstorage/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import (
"context"

Check failure on line 9 in x-pack/filebeat/input/azureblobstorage/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 9 in x-pack/filebeat/input/azureblobstorage/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 9 in x-pack/filebeat/input/azureblobstorage/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 9 in x-pack/filebeat/input/azureblobstorage/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"golang.org/x/sync/errgroup"
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
)

type statelessInput struct {
Expand Down Expand Up @@ -43,14 +44,18 @@
func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher) error {
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group
for _, c := range in.config.Containers {
container := tryOverrideOrDefault(in.config, c)
source = &Source{
AccountName: in.config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
AccountName: in.config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
TimeStampEpoch: container.TimeStampEpoch,
ExpandEventListFromField: container.ExpandEventListFromField,
FileSelectors: container.FileSelectors,
}

st := newState()
Expand All @@ -73,10 +78,11 @@
}

scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log)
err = scheduler.schedule(ctx)
if err != nil {
return err
}
// allows multiple containers to be scheduled concurrently while testing
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
g.Go(func() error {
return scheduler.schedule(ctx)
})

}
return nil
return g.Wait()
}
Loading
Loading