Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][azure-blob-storage] - Added support for new features and removed partial save mechanism #36690

Merged
merged 16 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ take priority and override the root level values if both are specified.
==== `file_selectors`

If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match one of the regex's won't be processed.
This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.

[source, yml]
----
Expand All @@ -245,8 +244,7 @@ filebeat.inputs:
[float]
==== `expand_event_list_from_field`

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events.
For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed event. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
["source","json"]
----
{
Expand Down Expand Up @@ -280,14 +278,13 @@ filebeat.inputs:
expand_event_list_from_field: Records
----

NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves.
NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob.

[id="attrib-timestamp_epoch"]
[float]
==== `timestamp_epoch`

This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch`
(seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v.Properties.LastModified.Unix() < *s.src.TimeStampEpoch { continue

The description doesn't match up with the code I saw. It seems like it filters blobs that are older than the value.

What is the use case for filtering based on a static value? I've seen inputs offer a relative time based filter when user specifies a duration since now like ignore_older: 168h or since: -168h.

Copy link
Contributor Author

@ShourieG ShourieG Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated the description. The use case here is that some customers in the past specially asked for the ability to filter out based on a specific date time, hence added this


["source","yaml",subs="attributes"]
----
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/azureblobstorage/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type azurebsInput struct {
serviceURL string
}

// defines the valid range for Unix timestamps for 64 bit integers
var (
minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
)

const (
inputName string = "azure-blob-storage"
)
Expand Down Expand Up @@ -119,12 +125,8 @@ func tryOverrideOrDefault(cfg config, c container) container {

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// defines the valid range for Unix timestamps
minTimestamp := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp := time.Date(2106, time.February, 7, 6, 28, 15, 0, time.UTC).Unix()

// checks if the timestamp is within the valid range
return timestamp >= minTimestamp && timestamp <= maxTimestamp
return minTimestamp <= timestamp && timestamp <= maxTimestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What determines the valid range? I can understand validating that the value is non-negative. But why is there an upper range?

The only reason I can think of for checking the upper value is as a sanity check. It probably does not make sense to configure any value that is in the future.

The bound checking can be added to the struct tag and ucfg will enforce it. Like validate:"min=1" for >=1. https://pkg.go.dev/github.com/elastic/go-ucfg#Config.Unpack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to this being an optional pointer value, struct tags validation causes a nil pointer deference. And yes this is used as a sanity check for permissible timestamp values.

}

func (input *azurebsInput) Name() string {
Expand Down
13 changes: 8 additions & 5 deletions x-pack/filebeat/input/azureblobstorage/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"golang.org/x/sync/errgroup"
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
)

type statelessInput struct {
Expand Down Expand Up @@ -43,6 +44,7 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher) error {
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group
for _, c := range in.config.Containers {
container := tryOverrideOrDefault(in.config, c)
source = &Source{
Expand Down Expand Up @@ -76,10 +78,11 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
}

scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log)
err = scheduler.schedule(ctx)
if err != nil {
return err
}
// allows multiple containers to be scheduled concurrently while testing
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
g.Go(func() error {
return scheduler.schedule(ctx)
})

}
return nil
return g.Wait()
}
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,39 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesContainer_events_array_json[1]: true,
},
},
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
{
name: "MultiContainerWithMultiFileSelectors",
baseConfig: map[string]interface{}{
"account_name": "beatsblobnew",
"auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==",
"max_workers": 2,
"poll": true,
"poll_interval": "10s",
"containers": []map[string]interface{}{
{
"name": beatsContainer,
"file_selectors": []map[string]interface{}{
{
"regex": "docs/",
},
},
},
{
"name": beatsContainer2,
"file_selectors": []map[string]interface{}{
{
"regex": "data_3",
},
},
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{
mock.Beatscontainer_blob_docs_ata_json: true,
mock.Beatscontainer_2_blob_data3_json: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
17 changes: 8 additions & 9 deletions x-pack/filebeat/input/azureblobstorage/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type job struct {
// offset value for an object, it points to the location inside the data stream
// from where we can start processing the object.
offset int64
// flag to denote if object is gZipped compressed or not
// flag to denote if object is gZip compressed or not
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
isCompressed bool
// flag to denote if object's root element is of an array type
isRootArray bool
Expand Down Expand Up @@ -136,8 +136,9 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error {
return fmt.Errorf("failed to download data from blob with error: %w", err)
}
// we hardcode a retry count of 3 here
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
const maxRetries = 3
reader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{
MaxRetries: 3,
MaxRetries: maxRetries,
})
defer func() {
err = reader.Close()
Expand All @@ -155,8 +156,7 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error {
}

func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error {
bhapas marked this conversation as resolved.
Show resolved Hide resolved
var err error
r, err = j.addGzipDecoderIfNeeded(bufio.NewReader(r))
r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r))
if err != nil {
return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err)
}
Expand All @@ -174,6 +174,7 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
}

dec := json.NewDecoder(r)
dec.UseNumber()
// If array is present at root then read json token and advance decoder
if j.isRootArray {
_, err := dec.Token()
Expand All @@ -189,7 +190,6 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
if !j.isCompressed && !j.isRootArray {
relativeOffset = j.offset
}
dec.UseNumber()
for dec.More() && ctx.Err() == nil {
var item json.RawMessage
offset = dec.InputOffset()
Expand Down Expand Up @@ -237,15 +237,14 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
return nil
}

// splitEventList, splits the event list into individual events and publishes them
// splitEventList splits the event list into individual events and publishes them
func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error {
var jsonObject map[string]json.RawMessage
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return err
}

var found bool
raw, found = jsonObject[key]
raw, found := jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
}
Expand Down Expand Up @@ -321,7 +320,7 @@ func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) {
return gzip.NewReader(bufReader)
}

// evaluateJSON, uses a bufio.NewReader & reader.Peek to evaluate if the
// evaluateJSON uses a bufio.NewReader & reader.Peek to evaluate if the
// data stream contains a json array as the root element or not, without
// advancing the reader. If the data stream contains an array as the root
// element, the value of the boolean return type is set to true.
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/azureblobstorage/mock/data_random.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func generateMetadata() []byte {

// Helper function to create a random Blob
func createRandomBlob(i int) Blob {
rand.Seed(time.Now().UnixNano())
rand.New(rand.NewSource(12345))

return Blob{
Name: fmt.Sprintf("data_%d.json", i),
Expand Down Expand Up @@ -104,7 +104,7 @@ func generateRandomBlob() []byte {
}

func createRandomData() MyData {
rand.New(rand.NewSource(time.Now().UnixNano()))
rand.New(rand.NewSource(12345))

return MyData{
ID: rand.Intn(1000) + 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
"time": "2021-05-26 22:21:40 UTC",
"msg": "world"
}
]
]
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"}
{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
"@timestamp": "2021-05-25T17:25:51.391Z",
"log.level": "info",
"message": "available space 44.3gb"
}
}