diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 55e90b2df228..60c26d236094 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659] - Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] +- Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] - Improve template evaluation logging for HTTPJSON input. {pull}36668[36668] - Add CEL partial value debug function. {pull}36652[36652] diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index ec91b25aaf9e..9208a1ce6116 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -27,6 +27,8 @@ even though it can get expensive with dealing with a very large number of files. 3. If any major error occurs which stops the main thread, the logs will be appropriately generated, describing said error. +NOTE: In the latest update the partial save mechanism has been removed for now due to concurrency issues, but will be added back in the future. + [id="supported-types"] NOTE: NOTE: Currently only `JSON` and `NDJSON` are supported blob/file formats. Blobs/files may be also be gzip compressed. As for authentication types, we currently have support for `shared access keys` and `connection strings`. @@ -138,6 +140,9 @@ Now let's explore the configuration attributes a bit more elaborately. 7. <> 8. <> 9. <> + 10. <> + 11. <> + 12. <> [id="attrib-account-name"] @@ -214,6 +219,88 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +[id="attrib-file_selectors"] +[float] +==== `file_selectors` + +If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit +the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +[source, yml] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + file_selectors: + - regex: '/CloudTrail/' + - regex: 'docs/' + - regex: '/Security-Logs/' +---- + +[id="attrib-expand_event_list_from_field"] +[float] +==== `expand_event_list_from_field` + +If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if +you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +["source","json"] +---- +{ + "Records": [ + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:51:00Z", + "region": "us-east-1", + "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", + }, + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:52:00Z", + "region": "us-east-1", + "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", + } + ] +} +---- + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + expand_event_list_from_field: Records +---- + +NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob. + +[id="attrib-timestamp_epoch"] +[float] +==== `timestamp_epoch` + +This attribute can be used to filter out files/blobs which have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + timestamp_epoch: 1627233600 +---- [id="container-overrides"] *The sample configs below will explain the container level overriding of attributes a bit further :-* @@ -277,4 +364,4 @@ In this configuration even though we have specified `max_workers = 10`, `poll = will override these values with their own respective values which are defined as part of their sub attibutes. -NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward. \ No newline at end of file +NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward. diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index 22e8efd7ec28..a780e60216d2 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -6,27 +6,41 @@ package azureblobstorage import ( "time" + + "github.com/elastic/beats/v7/libbeat/common/match" ) -// MaxWorkers, Poll & PollInterval can be configured at a global level, -// which applies to all containers, as well as at the container level. +// MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can +// be configured at a global level, which applies to all containers. They can also be configured at individual container levels. // Container level configurations will always override global level values. type config struct { - AccountName string `config:"account_name" validate:"required"` - StorageURL string `config:"storage_url,omitempty"` - Auth authConfig `config:"auth" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - Containers []container `config:"containers" validate:"required"` + AccountName string `config:"account_name" validate:"required"` + StorageURL string `config:"storage_url"` + Auth authConfig `config:"auth" validate:"required"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` + Containers []container `config:"containers" validate:"required"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + TimeStampEpoch *int64 `config:"timestamp_epoch"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` } // container contains the config for each specific blob storage container in the root account type container struct { - Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` + Name string `config:"name" validate:"required"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + TimeStampEpoch *int64 `config:"timestamp_epoch"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` +} + +// fileSelectorConfig helps filter out azure blobs based on a regex pattern +type fileSelectorConfig struct { + Regex *match.Matcher `config:"regex" validate:"required"` + // TODO: Add support for reader config in future } type authConfig struct { diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 3cca206174d2..28484c037f70 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -23,6 +23,12 @@ type azurebsInput struct { serviceURL string } +// defines the valid range for Unix timestamps for 64 bit integers +var ( + minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() + maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() +) + const ( inputName string = "azure-blob-storage" ) @@ -53,12 +59,18 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { var sources []cursor.Source for _, c := range config.Containers { container := tryOverrideOrDefault(config, c) + if container.TimeStampEpoch != nil && !isValidUnixTimestamp(*container.TimeStampEpoch) { + return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *container.TimeStampEpoch) + } sources = append(sources, &Source{ - AccountName: config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, + AccountName: config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + TimeStampEpoch: container.TimeStampEpoch, + ExpandEventListFromField: container.ExpandEventListFromField, + FileSelectors: container.FileSelectors, }) } @@ -85,7 +97,6 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.MaxWorkers = &maxWorkers } - if c.Poll == nil { var poll bool if cfg.Poll != nil { @@ -93,7 +104,6 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.Poll = &poll } - if c.PollInterval == nil { interval := time.Second * 300 if cfg.PollInterval != nil { @@ -101,9 +111,24 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.PollInterval = &interval } + if c.TimeStampEpoch == nil { + c.TimeStampEpoch = cfg.TimeStampEpoch + } + if c.ExpandEventListFromField == "" { + c.ExpandEventListFromField = cfg.ExpandEventListFromField + } + if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 { + c.FileSelectors = cfg.FileSelectors + } return c } +// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp +func isValidUnixTimestamp(timestamp int64) bool { + // checks if the timestamp is within the valid range + return minTimestamp <= timestamp && timestamp <= maxTimestamp +} + func (input *azurebsInput) Name() string { return inputName } diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index a2cfc487081f..73ae14a62e58 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -7,6 +7,8 @@ package azureblobstorage import ( "context" + "golang.org/x/sync/errgroup" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" @@ -43,14 +45,18 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher) error { pub := statelessPublisher{wrapped: publisher} var source cursor.Source + var g errgroup.Group for _, c := range in.config.Containers { container := tryOverrideOrDefault(in.config, c) source = &Source{ - AccountName: in.config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, + AccountName: in.config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + TimeStampEpoch: container.TimeStampEpoch, + ExpandEventListFromField: container.ExpandEventListFromField, + FileSelectors: container.FileSelectors, } st := newState() @@ -73,10 +79,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher } scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log) - err = scheduler.schedule(ctx) - if err != nil { - return err - } + // allows multiple containers to be scheduled concurrently while testing + // the stateless input is triggered only while testing and till now it did not mimic + // the real world concurrent execution of multiple containers. This fix allows it to do so. + g.Go(func() error { + return scheduler.schedule(ctx) + }) + } - return nil + return g.Wait() } diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index b313f6fabd9a..d4b004980b8d 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -35,6 +35,7 @@ const ( beatsJSONContainer = "beatsjsoncontainer" beatsNdJSONContainer = "beatsndjsoncontainer" beatsGzJSONContainer = "beatsgzjsoncontainer" + beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer" ) func Test_StorageClient(t *testing.T) { @@ -316,6 +317,160 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_multiline_json_gz[1]: true, }, }, + { + name: "ReadJSONWithRootAsArray", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsJSONWithArrayContainer, + }, + }, + }, + mockHandler: mock.AzureStorageFileServer, + expected: map[string]bool{ + mock.BeatsFilesContainer_json_array[0]: true, + mock.BeatsFilesContainer_json_array[1]: true, + mock.BeatsFilesContainer_json_array[2]: true, + mock.BeatsFilesContainer_json_array[3]: true, + }, + }, + { + name: "FilterByTimeStampEpoch", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "timestamp_epoch": 1663157564, + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_data3_json: true, + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + { + name: "FilterByFileSelectorRegexSingle", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + { + name: "FilterByFileSelectorRegexMulti", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + { + "regex": "data", + }, + }, + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_data3_json: true, + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + { + name: "ExpandEventListFromField", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "expand_event_list_from_field": "Events", + "file_selectors": []map[string]interface{}{ + { + "regex": "events-array", + }, + }, + "containers": []map[string]interface{}{ + { + "name": beatsJSONContainer, + }, + }, + }, + mockHandler: mock.AzureStorageFileServer, + expected: map[string]bool{ + mock.BeatsFilesContainer_events_array_json[0]: true, + mock.BeatsFilesContainer_events_array_json[1]: true, + }, + }, + { + name: "MultiContainerWithMultiFileSelectors", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + }, + { + "name": beatsContainer2, + "file_selectors": []map[string]interface{}{ + { + "regex": "data_3", + }, + }, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_docs_ata_json: true, + mock.Beatscontainer_2_blob_data3_json: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index 07698a6b0cf2..77dbe1ed882a 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -12,9 +12,11 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "time" + "unicode" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" @@ -29,16 +31,26 @@ import ( const jobErrString = "job with jobId %s encountered an error: %w" type job struct { - client *blob.Client - blob *azcontainer.BlobItem - blobURL string - hash string - offset int64 + // client is an azure blob handle + client *blob.Client + // blob is an azure blob item handle + blob *azcontainer.BlobItem + // azure blob url for the resource + blobURL string + // object hash, used in setting event id + hash string + // flag to denote if object is gzip compressed or not isCompressed bool - state *state - src *Source - publisher cursor.Publisher - log *logp.Logger + // flag to denote if object's root element is of an array type + isRootArray bool + // blob state + state *state + // container source struct used for storing container related data + src *Source + // publisher is used to publish a beat event to the output stream + publisher cursor.Publisher + // custom logger + log *logp.Logger } // newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine @@ -107,18 +119,14 @@ func (j *job) timestamp() *time.Time { } func (j *job) processAndPublishData(ctx context.Context, id string) error { - var err error - downloadOptions := &blob.DownloadStreamOptions{} - if !j.isCompressed { - downloadOptions.Range.Offset = j.offset - } - - get, err := j.client.DownloadStream(ctx, downloadOptions) + get, err := j.client.DownloadStream(ctx, &blob.DownloadStreamOptions{}) if err != nil { return fmt.Errorf("failed to download data from blob with error: %w", err) } - - reader := get.NewRetryReader(context.Background(), &azblob.RetryReaderOptions{}) + const maxRetries = 3 + reader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{ + MaxRetries: maxRetries, + }) defer func() { err = reader.Close() if err != nil { @@ -126,8 +134,7 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } }() - updatedReader, err := j.addGzipDecoderIfNeeded(reader) - err = j.readJsonAndPublish(ctx, updatedReader, id) + err = j.readJsonAndPublish(ctx, reader, id) if err != nil { return fmt.Errorf("failed to read data from blob with error: %w", err) } @@ -135,6 +142,120 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { return err } +func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { + r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r)) + if err != nil { + return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err) + } + + // checks if the root element is an array or not + r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) + if err != nil { + return fmt.Errorf("failed to evaluate json for blob: %s, with error: %w", *j.blob.Name, err) + } + + dec := json.NewDecoder(r) + dec.UseNumber() + // If array is present at root then read json token and advance decoder + if j.isRootArray { + _, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", *j.blob.Name, err) + } + } + + for dec.More() && ctx.Err() == nil { + var item json.RawMessage + offset := dec.InputOffset() + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode json: %w", err) + } + // if expand_event_list_from_field is set, then split the event list + if j.src.ExpandEventListFromField != "" { + if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil { + return err + } + } + + data, err := item.MarshalJSON() + if err != nil { + return err + } + evt := j.createEvent(string(data), offset) + + if !dec.More() { + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorf(jobErrString, id, err) + } + done() + } else { + // since we don't update the cursor checkpoint, lack of a lock here should be fine + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorf(jobErrString, id, err) + } + } + } + return nil +} + +// splitEventList splits the event list into individual events and publishes them +func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { + var jsonObject map[string]json.RawMessage + if err := json.Unmarshal(raw, &jsonObject); err != nil { + return err + } + + raw, found := jsonObject[key] + if !found { + return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + + tok, err := dec.Token() + if err != nil { + return err + } + delim, ok := tok.(json.Delim) + if !ok || delim != '[' { + return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + } + + for dec.More() { + arrayOffset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) + } + + data, err := item.MarshalJSON() + if err != nil { + return err + } + evt := j.createEvent(string(data), offset+arrayOffset) + + if !dec.More() { + // if this is the last object, then save checkpoint + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorf(jobErrString, id, err) + } + done() + } else { + // since we don't update the cursor checkpoint, lack of a lock here should be fine + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorf(jobErrString, id, err) + } + } + } + + return nil +} + // addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader) // represents gzipped content or not and adds gzipped decoder if needed. A buffered reader is used // so the function can peek into the byte stream without consuming it. This makes it convenient for @@ -161,53 +282,38 @@ func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { return gzip.NewReader(bufReader) } -func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { - dec := json.NewDecoder(r) - dec.UseNumber() - var offset int64 - var relativeOffset int64 - // uncompressed files use the client to directly set the offset, this - // in turn causes the offset to reset to 0 for the new stream, hence why - // we need to keep relative offsets to keep track of the actual offset - if !j.isCompressed { - relativeOffset = j.offset - } - for dec.More() && ctx.Err() == nil { - var item json.RawMessage - offset = dec.InputOffset() - if err := dec.Decode(&item); err != nil { - return fmt.Errorf("failed to decode json: %w", err) - } - // manually seek offset only if file is compressed - if j.isCompressed && offset < j.offset { - continue - } - - data, err := item.MarshalJSON() - if err != nil { - return err +// evaluateJSON uses a bufio.NewReader & reader.Peek to evaluate if the +// data stream contains a json array as the root element or not, without +// advancing the reader. If the data stream contains an array as the root +// element, the value of the boolean return type is set to true. +func evaluateJSON(reader *bufio.Reader) (io.Reader, bool, error) { + eof := false + // readSize is the constant value in the incremental read operation, this value is arbitrary + // but works well for our use case + const readSize = 5 + for i := 0; ; i++ { + b, err := reader.Peek((i + 1) * readSize) + if errors.Is(err, io.EOF) { + eof = true } - evt := j.createEvent(string(data), offset+relativeOffset) - // updates the offset after reading the file - // this avoids duplicates for the last read when resuming operation - offset = dec.InputOffset() - var ( - cp *Checkpoint - done func() - ) - if !dec.More() { - // if this is the last object, then peform a complete state save - cp, done = j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) - } else { - // partially saves read state using offset - cp, done = j.state.savePartialForTx(*j.blob.Name, offset+relativeOffset) + startByte := i * readSize + for j := 0; j < len(b[startByte:]); j++ { + char := b[startByte+j : startByte+j+1] + switch { + case bytes.Equal(char, []byte("[")): + return reader, true, nil + case bytes.Equal(char, []byte("{")): + return reader, false, nil + case unicode.IsSpace(bytes.Runes(char)[0]): + continue + default: + return nil, false, fmt.Errorf("unexpected error: JSON data is malformed") + } } - if err := j.publisher.Publish(evt, cp); err != nil { - j.log.Errorf(jobErrString, id, err) + if eof { + return nil, false, fmt.Errorf("unexpected error: JSON data is malformed") } - done() } - return nil } func (j *job) createEvent(message string, offset int64) beat.Event { @@ -227,8 +333,8 @@ func (j *job) createEvent(message string, offset int64) beat.Event { "name": j.src.ContainerName, }, "blob": mapstr.M{ - "name": j.blob.Name, - "content_type": j.blob.Properties.ContentType, + "name": *j.blob.Name, + "content_type": *j.blob.Properties.ContentType, }, }, }, diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data.go b/x-pack/filebeat/input/azureblobstorage/mock/data.go index a566e4bbc037..1132df169002 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data.go @@ -47,7 +47,7 @@ var fetchContainer = map[string]string{ ata.json - Wed, 14 Sep 2022 12:12:28 GMT + Wed, 12 Sep 2022 12:12:28 GMT 0x8DA964A64516C82 643 application/json @@ -83,7 +83,7 @@ var fetchContainer = map[string]string{ docs/ata.json - Wed, 14 Sep 2022 12:13:07 GMT + Wed, 15 Sep 2022 12:13:07 GMT 0x8DA964A7B8D8862 643 application/json diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go index ed6f3ad7bb3a..3273d329412f 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go @@ -9,6 +9,7 @@ const ( beatsJSONContainer = "beatsjsoncontainer" beatsNdJSONContainer = "beatsndjsoncontainer" beatsGzJSONContainer = "beatsgzjsoncontainer" + beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer" ) var fileContainers = map[string]bool{ @@ -16,6 +17,7 @@ var fileContainers = map[string]bool{ beatsJSONContainer: true, beatsNdJSONContainer: true, beatsGzJSONContainer: true, + beatsJSONWithArrayContainer: true, } var availableFileBlobs = map[string]map[string]bool{ @@ -32,6 +34,10 @@ var availableFileBlobs = map[string]map[string]bool{ beatsGzJSONContainer: { "multiline.json.gz": true, }, + beatsJSONWithArrayContainer: { + "array-at-root.json": true, + "nested-arrays.json": true, + }, } var fetchFilesContainer = map[string]string{ @@ -125,6 +131,48 @@ var fetchFilesContainer = map[string]string{ `, + beatsJSONWithArrayContainer: ` + + + + array-at-root.json + + Wed, 14 Sep 2022 12:12:28 GMT + 0x8DA964A64516C82 + 643 + application/json + + + UjQX73kQRTHx+UyXZDvVkg== + + + BlockBlob + unlocked + available + + + + + nested-arrays.json + + Wed, 14 Sep 2022 12:12:28 GMT + 0x8DA964A64516C83 + 643 + application/json + + + UjQX73kQRTHx+UyXZDvVkg== + + + BlockBlob + unlocked + available + + + + + + `, beatsGzJSONContainer: ` @@ -167,6 +215,18 @@ var BeatsFilesContainer_log_ndjson = []string{ `{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available space is 44.3gb"}`, } +var BeatsFilesContainer_events_array_json = []string{ + "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", + "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", +} + +var BeatsFilesContainer_json_array = []string{ + "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", + "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", + "[\n {\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n },\n {\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }\n ]", + "[\n {\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hi\"\n },\n {\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"seoul\"\n }\n ]", +} + var BeatsFilesContainer_multiline_json_gz = []string{ "{\n \"@timestamp\": \"2021-05-25T17:25:42.806Z\",\n \"log.level\": \"error\",\n \"message\": \"error making http request\"\n}", "{\n \"@timestamp\": \"2021-05-25T17:25:51.391Z\",\n \"log.level\": \"info\",\n \"message\": \"available disk space 44.3gb\"\n}", diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go index f52fe2fdbdb7..b4d92735e691 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go @@ -66,7 +66,7 @@ func generateMetadata() []byte { // Helper function to create a random Blob func createRandomBlob(i int) Blob { - rand.Seed(time.Now().UnixNano()) + rand.New(rand.NewSource(12345)) return Blob{ Name: fmt.Sprintf("data_%d.json", i), @@ -104,7 +104,7 @@ func generateRandomBlob() []byte { } func createRandomData() MyData { - rand.Seed(time.Now().UnixNano()) + rand.New(rand.NewSource(12345)) return MyData{ ID: rand.Intn(1000) + 1, @@ -119,6 +119,7 @@ func getRandomString(options []string) string { if len(options) == 0 { return "" } - rand.Seed(time.Now().UnixNano()) + + rand.New(rand.NewSource(time.Now().UnixNano())) return options[rand.Intn(len(options))] } diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json new file mode 100644 index 000000000000..8d22df6aeb8a --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json @@ -0,0 +1,10 @@ +[ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } +] diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json index f6aaf5ec64d0..b88e05452843 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json @@ -1 +1 @@ -{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} \ No newline at end of file +{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json index 974d296762ac..32d319af2bc7 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json @@ -7,4 +7,4 @@ "@timestamp": "2021-05-25T17:25:51.391Z", "log.level": "info", "message": "available space 44.3gb" -} \ No newline at end of file +} diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json new file mode 100644 index 000000000000..588e549867a1 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json @@ -0,0 +1,23 @@ +[ + [ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } + ], + + [ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hi" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "seoul" + } + ] +] diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index b796a6f15b43..ba433b78f416 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -74,7 +74,6 @@ func newScheduler(publisher cursor.Publisher, client *azcontainer.Client, // schedule, is responsible for fetching & scheduling jobs using the workerpool model func (s *scheduler) schedule(ctx context.Context) error { - defer s.limiter.wait() if !s.src.Poll { return s.scheduleOnce(ctx) } @@ -93,16 +92,30 @@ func (s *scheduler) schedule(ctx context.Context) error { } func (s *scheduler) scheduleOnce(ctx context.Context) error { + defer s.limiter.wait() pager := s.fetchBlobPager(int32(s.src.MaxWorkers)) + fileSelectorLen := len(s.src.FileSelectors) + var numBlobs, numJobs int + for pager.More() { resp, err := pager.NextPage(ctx) if err != nil { return err } + numBlobs += len(resp.Segment.BlobItems) + s.log.Debugf("scheduler: %d blobs fetched for current batch", len(resp.Segment.BlobItems)) + var jobs []*job for _, v := range resp.Segment.BlobItems { - + // if file selectors are present, then only select the files that match the regex + if fileSelectorLen != 0 && !s.isFileSelected(*v.Name) { + continue + } + // date filter is applied on last modified time of the blob + if s.src.TimeStampEpoch != nil && v.Properties.LastModified.Unix() < *s.src.TimeStampEpoch { + continue + } blobURL := s.serviceURL + s.src.ContainerName + "/" + *v.Name blobCreds := &blobCredentials{ serviceCreds: s.credential, @@ -125,6 +138,8 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { jobs = s.moveToLastSeenJob(jobs) } + s.log.Debugf("scheduler: %d jobs scheduled for current batch", len(jobs)) + // distributes jobs among workers with the help of a limiter for i, job := range jobs { id := fetchJobID(i, s.src.ContainerName, job.name()) @@ -135,6 +150,11 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { job.do(ctx, id) }() } + + s.log.Debugf("scheduler: total objects read till now: %d\nscheduler: total jobs scheduled till now: %d", numBlobs, numJobs) + if len(jobs) != 0 { + s.log.Debugf("scheduler: first job in current batch: %s\nscheduler: last job in current batch: %s", jobs[0].name(), jobs[len(jobs)-1].name()) + } } return nil @@ -177,10 +197,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { ignore := false for _, job := range jobs { - switch offset, isPartial := s.state.cp.PartiallyProcessed[*job.blob.Name]; { - case isPartial: - job.offset = offset - latestJobs = append(latestJobs, job) + switch { case job.timestamp().After(s.state.checkpoint().LatestEntryTime): latestJobs = append(latestJobs, job) case job.name() == s.state.checkpoint().BlobName: @@ -200,12 +217,21 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { jobsToReturn = jobs } - // in a senario where there are some jobs which have a later time stamp + // in a senario where there are some jobs which have a greater timestamp // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint or partially completed jobs are present + // than the current checkpoint blob name, then we append the latest jobs if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { jobsToReturn = append(latestJobs, jobsToReturn...) } return jobsToReturn } + +func (s *scheduler) isFileSelected(name string) bool { + for _, sel := range s.src.FileSelectors { + if sel.Regex == nil || sel.Regex.MatchString(name) { + return true + } + } + return false +} diff --git a/x-pack/filebeat/input/azureblobstorage/state.go b/x-pack/filebeat/input/azureblobstorage/state.go index de1873e9042a..43fdf822f2d3 100644 --- a/x-pack/filebeat/input/azureblobstorage/state.go +++ b/x-pack/filebeat/input/azureblobstorage/state.go @@ -25,15 +25,11 @@ type Checkpoint struct { BlobName string // timestamp to denote which is the latest blob LatestEntryTime time.Time - // map to contain offset data - PartiallyProcessed map[string]int64 } func newState() *state { return &state{ - cp: &Checkpoint{ - PartiallyProcessed: make(map[string]int64), - }, + cp: &Checkpoint{}, } } @@ -43,7 +39,6 @@ func newState() *state { // more than once. func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() - delete(s.cp.PartiallyProcessed, name) if len(s.cp.BlobName) == 0 { s.cp.BlobName = name } else if strings.ToLower(name) > strings.ToLower(s.cp.BlobName) { @@ -57,21 +52,8 @@ func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint return s.cp, func() { s.mu.Unlock() } } -// savePartialForTx partially updates and returns the current state checkpoint, locks the state -// and returns an unlock function done(). The caller must call done when -// s and cp are no longer needed in a locked state. done may not be called -// more than once. -func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { - s.mu.Lock() - s.cp.PartiallyProcessed[name] = offset - return s.cp, func() { s.mu.Unlock() } -} - // setCheckpoint sets checkpoint from source to current state instance func (s *state) setCheckpoint(chkpt *Checkpoint) { - if chkpt.PartiallyProcessed == nil { - chkpt.PartiallyProcessed = make(map[string]int64) - } s.cp = chkpt } diff --git a/x-pack/filebeat/input/azureblobstorage/types.go b/x-pack/filebeat/input/azureblobstorage/types.go index ef3e2480fc4a..67012198423e 100644 --- a/x-pack/filebeat/input/azureblobstorage/types.go +++ b/x-pack/filebeat/input/azureblobstorage/types.go @@ -13,11 +13,14 @@ import ( // Source, it is the cursor source type Source struct { - ContainerName string - AccountName string - MaxWorkers int - Poll bool - PollInterval time.Duration + ContainerName string + AccountName string + MaxWorkers int + Poll bool + PollInterval time.Duration + TimeStampEpoch *int64 + FileSelectors []fileSelectorConfig + ExpandEventListFromField string } func (s *Source) Name() string {