Skip to content

Commit

Permalink
[filebeat][azure-blob-storage] - Added support for new features and r…
Browse files Browse the repository at this point in the history
…emoved partial save mechanism (elastic#36690)

* added new debug logs and updated deprecated library methods

* added support for path prefix, date filter and root level array splitting

* added support for fileselectors, timestamp filtering and expand event field

* updated asciidoc, updated comments

* updated changelog

* addressed PR suggestions

* addressed linting errors

* addressed further PR suggestions

* updated FileSelectors assignment condition

* partial save mechanism removed for now due to concurrency issues

* updated changelog
  • Loading branch information
ShourieG authored and Scholar-Li committed Feb 5, 2024
1 parent 3952e41 commit 409d7a0
Show file tree
Hide file tree
Showing 17 changed files with 637 additions and 135 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659]
- Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427]
- Update mito CEL extension library to v1.6.0. {pull}36651[36651]
- Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690]
- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668]
- Add CEL partial value debug function. {pull}36652[36652]

Expand Down
89 changes: 88 additions & 1 deletion x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ even though it can get expensive with dealing with a very large number of files.
3. If any major error occurs which stops the main thread, the logs will be appropriately generated,
describing said error.

NOTE: In the latest update the partial save mechanism has been removed for now due to concurrency issues, but will be added back in the future.

[id="supported-types"]
NOTE: NOTE: Currently only `JSON` and `NDJSON` are supported blob/file formats. Blobs/files may be also be gzip compressed.
As for authentication types, we currently have support for `shared access keys` and `connection strings`.
Expand Down Expand Up @@ -138,6 +140,9 @@ Now let's explore the configuration attributes a bit more elaborately.
7. <<attrib-max_workers,max_workers>>
8. <<attrib-poll,poll>>
9. <<attrib-poll_interval,poll_interval>>
10. <<attrib-file_selectors,file_selectors>>
11. <<attrib-expand_event_list_from_field,expand_event_list_from_field>>
12. <<attrib-timestamp_epoch,timestamp_epoch>>


[id="attrib-account-name"]
Expand Down Expand Up @@ -214,6 +219,88 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds.
This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always
take priority and override the root level values if both are specified.

[id="attrib-file_selectors"]
[float]
==== `file_selectors`

If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, yml]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
file_selectors:
- regex: '/CloudTrail/'
- regex: 'docs/'
- regex: '/Security-Logs/'
----

[id="attrib-expand_event_list_from_field"]
[float]
==== `expand_event_list_from_field`

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if
you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"region": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"region": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
expand_event_list_from_field: Records
----

NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob.

[id="attrib-timestamp_epoch"]
[float]
==== `timestamp_epoch`

This attribute can be used to filter out files/blobs which have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: azure-blob-storage
id: my-azureblobstorage-id
enabled: true
account_name: some_account
auth.shared_credentials.account_key: some_key
containers:
- name: container_1
timestamp_epoch: 1627233600
----

[id="container-overrides"]
*The sample configs below will explain the container level overriding of attributes a bit further :-*
Expand Down Expand Up @@ -277,4 +364,4 @@ In this configuration even though we have specified `max_workers = 10`, `poll =
will override these values with their own respective values which are defined as part of their sub attibutes.


NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward.
NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward.
40 changes: 27 additions & 13 deletions x-pack/filebeat/input/azureblobstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,41 @@ package azureblobstorage

import (
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
)

// MaxWorkers, Poll & PollInterval can be configured at a global level,
// which applies to all containers, as well as at the container level.
// MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can
// be configured at a global level, which applies to all containers. They can also be configured at individual container levels.
// Container level configurations will always override global level values.
type config struct {
AccountName string `config:"account_name" validate:"required"`
StorageURL string `config:"storage_url,omitempty"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
Containers []container `config:"containers" validate:"required"`
AccountName string `config:"account_name" validate:"required"`
StorageURL string `config:"storage_url"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers" validate:"max=5000"`
Poll *bool `config:"poll"`
PollInterval *time.Duration `config:"poll_interval"`
Containers []container `config:"containers" validate:"required"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// container contains the config for each specific blob storage container in the root account
type container struct {
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers" validate:"max=5000"`
Poll *bool `config:"poll"`
PollInterval *time.Duration `config:"poll_interval"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// fileSelectorConfig helps filter out azure blobs based on a regex pattern
type fileSelectorConfig struct {
Regex *match.Matcher `config:"regex" validate:"required"`
// TODO: Add support for reader config in future
}

type authConfig struct {
Expand Down
39 changes: 32 additions & 7 deletions x-pack/filebeat/input/azureblobstorage/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type azurebsInput struct {
serviceURL string
}

// defines the valid range for Unix timestamps for 64 bit integers
var (
minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
)

const (
inputName string = "azure-blob-storage"
)
Expand Down Expand Up @@ -53,12 +59,18 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
var sources []cursor.Source
for _, c := range config.Containers {
container := tryOverrideOrDefault(config, c)
if container.TimeStampEpoch != nil && !isValidUnixTimestamp(*container.TimeStampEpoch) {
return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *container.TimeStampEpoch)
}
sources = append(sources, &Source{
AccountName: config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
AccountName: config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
TimeStampEpoch: container.TimeStampEpoch,
ExpandEventListFromField: container.ExpandEventListFromField,
FileSelectors: container.FileSelectors,
})
}

Expand All @@ -85,25 +97,38 @@ func tryOverrideOrDefault(cfg config, c container) container {
}
c.MaxWorkers = &maxWorkers
}

if c.Poll == nil {
var poll bool
if cfg.Poll != nil {
poll = *cfg.Poll
}
c.Poll = &poll
}

if c.PollInterval == nil {
interval := time.Second * 300
if cfg.PollInterval != nil {
interval = *cfg.PollInterval
}
c.PollInterval = &interval
}
if c.TimeStampEpoch == nil {
c.TimeStampEpoch = cfg.TimeStampEpoch
}
if c.ExpandEventListFromField == "" {
c.ExpandEventListFromField = cfg.ExpandEventListFromField
}
if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 {
c.FileSelectors = cfg.FileSelectors
}
return c
}

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// checks if the timestamp is within the valid range
return minTimestamp <= timestamp && timestamp <= maxTimestamp
}

func (input *azurebsInput) Name() string {
return inputName
}
Expand Down
29 changes: 19 additions & 10 deletions x-pack/filebeat/input/azureblobstorage/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package azureblobstorage
import (
"context"

"golang.org/x/sync/errgroup"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
Expand Down Expand Up @@ -43,14 +45,18 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher) error {
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group
for _, c := range in.config.Containers {
container := tryOverrideOrDefault(in.config, c)
source = &Source{
AccountName: in.config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
AccountName: in.config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
TimeStampEpoch: container.TimeStampEpoch,
ExpandEventListFromField: container.ExpandEventListFromField,
FileSelectors: container.FileSelectors,
}

st := newState()
Expand All @@ -73,10 +79,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
}

scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log)
err = scheduler.schedule(ctx)
if err != nil {
return err
}
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
// the real world concurrent execution of multiple containers. This fix allows it to do so.
g.Go(func() error {
return scheduler.schedule(ctx)
})

}
return nil
return g.Wait()
}
Loading

0 comments on commit 409d7a0

Please sign in to comment.