From cb93f29b89fb5ff3aa5dab481d15b7137d1b6a26 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 21 Sep 2023 13:15:15 +0530 Subject: [PATCH 01/11] added new debug logs and updated deprecated library methods --- .../input/azureblobstorage/mock/data_random.go | 5 +++-- .../input/azureblobstorage/scheduler.go | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go index f52fe2fdbdb..bc5db7d661e 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go @@ -104,7 +104,7 @@ func generateRandomBlob() []byte { } func createRandomData() MyData { - rand.Seed(time.Now().UnixNano()) + rand.New(rand.NewSource(time.Now().UnixNano())) return MyData{ ID: rand.Intn(1000) + 1, @@ -119,6 +119,7 @@ func getRandomString(options []string) string { if len(options) == 0 { return "" } - rand.Seed(time.Now().UnixNano()) + + rand.New(rand.NewSource(time.Now().UnixNano())) return options[rand.Intn(len(options))] } diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index b796a6f15b4..bfa13821dd4 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -74,7 +74,6 @@ func newScheduler(publisher cursor.Publisher, client *azcontainer.Client, // schedule, is responsible for fetching & scheduling jobs using the workerpool model func (s *scheduler) schedule(ctx context.Context) error { - defer s.limiter.wait() if !s.src.Poll { return s.scheduleOnce(ctx) } @@ -93,16 +92,21 @@ func (s *scheduler) schedule(ctx context.Context) error { } func (s *scheduler) scheduleOnce(ctx context.Context) error { + defer s.limiter.wait() pager := s.fetchBlobPager(int32(s.src.MaxWorkers)) + var numBlobs, numJobs int + for pager.More() { resp, err := pager.NextPage(ctx) if err != nil { return err } + numBlobs += len(resp.Segment.BlobItems) + s.log.Debugf("scheduler: %d blobs fetched for current batch", len(resp.Segment.BlobItems)) + var jobs []*job for _, v := range resp.Segment.BlobItems { - blobURL := s.serviceURL + s.src.ContainerName + "/" + *v.Name blobCreds := &blobCredentials{ serviceCreds: s.credential, @@ -125,6 +129,8 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { jobs = s.moveToLastSeenJob(jobs) } + s.log.Debugf("scheduler: %d jobs scheduled for current batch", len(jobs)) + // distributes jobs among workers with the help of a limiter for i, job := range jobs { id := fetchJobID(i, s.src.ContainerName, job.name()) @@ -135,6 +141,11 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { job.do(ctx, id) }() } + + s.log.Debugf("scheduler: total objects read till now: %d\nscheduler: total jobs scheduled till now: %d", numBlobs, numJobs) + if len(jobs) != 0 { + s.log.Debugf("scheduler: first job in current batch: %s\nscheduler: last job in current batch: %s", jobs[0].name(), jobs[len(jobs)-1].name()) + } } return nil @@ -200,7 +211,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { jobsToReturn = jobs } - // in a senario where there are some jobs which have a later time stamp + // in a senario where there are some jobs which have a later timestamp // but lesser alphanumeric order and some jobs have greater alphanumeric order // than the current checkpoint or partially completed jobs are present if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { From 4164ba1de63e269294fb7ead84d4ebacc2dbadd2 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 26 Sep 2023 19:41:33 +0530 Subject: [PATCH 02/11] added support for path prefix, date filter and root level array splitting --- .../filebeat/input/azureblobstorage/config.go | 30 ++-- .../filebeat/input/azureblobstorage/input.go | 20 ++- .../input/azureblobstorage/input_stateless.go | 12 +- .../input/azureblobstorage/input_test.go | 64 +++++++ x-pack/filebeat/input/azureblobstorage/job.go | 162 +++++++++++++----- .../input/azureblobstorage/mock/data.go | 4 +- .../input/azureblobstorage/mock/data_files.go | 55 ++++++ .../mock/testdata/array-at-root.json | 10 ++ .../mock/testdata/nested-arrays.json | 23 +++ .../input/azureblobstorage/scheduler.go | 8 + .../filebeat/input/azureblobstorage/state.go | 26 ++- .../filebeat/input/azureblobstorage/types.go | 12 +- 12 files changed, 351 insertions(+), 75 deletions(-) create mode 100644 x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json create mode 100644 x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index 22e8efd7ec2..d10c7c00786 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -8,25 +8,29 @@ import ( "time" ) -// MaxWorkers, Poll & PollInterval can be configured at a global level, -// which applies to all containers, as well as at the container level. +// MaxWorkers, Poll, PollInterval & PathPrefix can be configured at a global level, +// which applies to all containers. They can also be configured at individual container levels. // Container level configurations will always override global level values. type config struct { - AccountName string `config:"account_name" validate:"required"` - StorageURL string `config:"storage_url,omitempty"` - Auth authConfig `config:"auth" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - Containers []container `config:"containers" validate:"required"` + AccountName string `config:"account_name" validate:"required"` + StorageURL string `config:"storage_url,omitempty"` + Auth authConfig `config:"auth" validate:"required"` + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + Poll *bool `config:"poll,omitempty"` + PollInterval *time.Duration `config:"poll_interval,omitempty"` + Containers []container `config:"containers" validate:"required"` + PathPrefix string `config:"path_prefix,omitempty"` + TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` } // container contains the config for each specific blob storage container in the root account type container struct { - Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` + Name string `config:"name" validate:"required"` + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + Poll *bool `config:"poll,omitempty"` + PollInterval *time.Duration `config:"poll_interval,omitempty"` + PathPrefix string `config:"path_prefix,omitempty"` + TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` } type authConfig struct { diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 3cca206174d..a6134f73141 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -54,11 +54,13 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { for _, c := range config.Containers { container := tryOverrideOrDefault(config, c) sources = append(sources, &Source{ - AccountName: config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, + AccountName: config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + PathPrefix: container.PathPrefix, + TimeStampEpoch: container.TimeStampEpoch, }) } @@ -101,6 +103,14 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.PollInterval = &interval } + + if c.PathPrefix == "" { + c.PathPrefix = cfg.PathPrefix + } + + if c.TimeStampEpoch == nil { + c.TimeStampEpoch = cfg.TimeStampEpoch + } return c } diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index a2cfc487081..0c0e9810bd2 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -46,11 +46,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher for _, c := range in.config.Containers { container := tryOverrideOrDefault(in.config, c) source = &Source{ - AccountName: in.config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, + AccountName: in.config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + PathPrefix: container.PathPrefix, + TimeStampEpoch: container.TimeStampEpoch, } st := newState() diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index b313f6fabd9..d33ca519c49 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -35,6 +35,7 @@ const ( beatsJSONContainer = "beatsjsoncontainer" beatsNdJSONContainer = "beatsndjsoncontainer" beatsGzJSONContainer = "beatsgzjsoncontainer" + beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer" ) func Test_StorageClient(t *testing.T) { @@ -316,6 +317,69 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_multiline_json_gz[1]: true, }, }, + { + name: "ReadJSONWithRootAsArray", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 1, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsJSONWithArrayContainer, + }, + }, + }, + mockHandler: mock.AzureStorageFileServer, + expected: map[string]bool{ + mock.BeatsFilesContainer_json_array[0]: true, + mock.BeatsFilesContainer_json_array[1]: true, + mock.BeatsFilesContainer_json_array[2]: true, + mock.BeatsFilesContainer_json_array[3]: true, + }, + }, + { + name: "PathPrefix", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "path_prefix": "docs/", + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + { + name: "TimeStampEpoch", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "timestamp_epoch": 1663157564, + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_data3_json: true, + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index 07698a6b0cf..60d106b219c 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -12,9 +12,11 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "time" + "unicode" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" @@ -29,16 +31,29 @@ import ( const jobErrString = "job with jobId %s encountered an error: %w" type job struct { - client *blob.Client - blob *azcontainer.BlobItem - blobURL string - hash string - offset int64 + // azure blob client handle + client *blob.Client + // azure blob item handle + blob *azcontainer.BlobItem + // azure blob url for the resource + blobURL string + // object hash, used in setting event id + hash string + // offset value for an object, it points to the location inside the data stream + // from where we can start processing the object. + offset int64 + // flag to denote if object is gZipped compressed or not isCompressed bool - state *state - src *Source - publisher cursor.Publisher - log *logp.Logger + // flag to denote if object's root element is of an array type + isRootArray bool + // blob state + state *state + // container source struct used for storing container related data + src *Source + // publisher is used to publish a beat event to the output stream + publisher cursor.Publisher + // custom logger + log *logp.Logger } // newJob, returns an instance of a job, which is a unit of work that can be assigned to a go routine @@ -72,6 +87,9 @@ func (j *job) do(ctx context.Context, id string) { if *j.blob.Properties.ContentType == gzType || (j.blob.Properties.ContentEncoding != nil && *j.blob.Properties.ContentEncoding == encodingGzip) { j.isCompressed = true } + isRootArray, done := j.state.isRootArray(*j.blob.Name) + done() + j.isRootArray = isRootArray err := j.processAndPublishData(ctx, id) if err != nil { j.log.Errorf(jobErrString, id, err) @@ -109,7 +127,7 @@ func (j *job) timestamp() *time.Time { func (j *job) processAndPublishData(ctx context.Context, id string) error { var err error downloadOptions := &blob.DownloadStreamOptions{} - if !j.isCompressed { + if !j.isCompressed && !j.isRootArray { downloadOptions.Range.Offset = j.offset } @@ -117,8 +135,10 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("failed to download data from blob with error: %w", err) } - - reader := get.NewRetryReader(context.Background(), &azblob.RetryReaderOptions{}) + // we hardcode a retry count of 3 here + reader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{ + MaxRetries: 3, + }) defer func() { err = reader.Close() if err != nil { @@ -126,8 +146,7 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } }() - updatedReader, err := j.addGzipDecoderIfNeeded(reader) - err = j.readJsonAndPublish(ctx, updatedReader, id) + err = j.readJsonAndPublish(ctx, reader, id) if err != nil { return fmt.Errorf("failed to read data from blob with error: %w", err) } @@ -135,51 +154,50 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { return err } -// addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader) -// represents gzipped content or not and adds gzipped decoder if needed. A buffered reader is used -// so the function can peek into the byte stream without consuming it. This makes it convenient for -// code executed after this function call to consume the stream if it wants. -func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { - bufReader := bufio.NewReader(body) - isStreamGzipped := false - // check if stream is gziped or not - buf, err := bufReader.Peek(3) +func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { + var err error + r, err = j.addGzipDecoderIfNeeded(bufio.NewReader(r)) if err != nil { - if err == io.EOF { - err = nil - } - return bufReader, err + return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err) } - // gzip magic number (1f 8b) and the compression method (08 for DEFLATE). - isStreamGzipped = bytes.Equal(buf, []byte{0x1F, 0x8B, 0x08}) - - if !isStreamGzipped { - return bufReader, nil + // if offset == 0, then this is a new stream which has not been processed previously + if j.offset == 0 { + r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) + if err != nil { + return fmt.Errorf("failed to evaluate json for blob: %s, with error: %w", *j.blob.Name, err) + } + if j.isRootArray { + done := j.state.setRootArray(*j.blob.Name) + done() + } } - return gzip.NewReader(bufReader) -} - -func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { dec := json.NewDecoder(r) - dec.UseNumber() - var offset int64 - var relativeOffset int64 + // If array is present at root then read json token and advance decoder + if j.isRootArray { + _, err := dec.Token() + if err != nil { + return fmt.Errorf("failed to read JSON token for object: %s, with error: %w", *j.blob.Name, err) + } + } + + var offset, relativeOffset int64 // uncompressed files use the client to directly set the offset, this // in turn causes the offset to reset to 0 for the new stream, hence why // we need to keep relative offsets to keep track of the actual offset - if !j.isCompressed { + if !j.isCompressed && !j.isRootArray { relativeOffset = j.offset } + dec.UseNumber() for dec.More() && ctx.Err() == nil { var item json.RawMessage offset = dec.InputOffset() if err := dec.Decode(&item); err != nil { return fmt.Errorf("failed to decode json: %w", err) } - // manually seek offset only if file is compressed - if j.isCompressed && offset < j.offset { + // manually seek offset only if file is compressed or if root element is an array + if (j.isCompressed || j.isRootArray) && offset < j.offset { continue } @@ -210,6 +228,62 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er return nil } +// addGzipDecoderIfNeeded determines whether the given stream of bytes (encapsulated in a buffered reader) +// represents gzipped content or not and adds gzipped decoder if needed. A buffered reader is used +// so the function can peek into the byte stream without consuming it. This makes it convenient for +// code executed after this function call to consume the stream if it wants. +func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { + bufReader := bufio.NewReader(body) + isStreamGzipped := false + // check if stream is gziped or not + buf, err := bufReader.Peek(3) + if err != nil { + if err == io.EOF { + err = nil + } + return bufReader, err + } + + // gzip magic number (1f 8b) and the compression method (08 for DEFLATE). + isStreamGzipped = bytes.Equal(buf, []byte{0x1F, 0x8B, 0x08}) + + if !isStreamGzipped { + return bufReader, nil + } + + return gzip.NewReader(bufReader) +} + +// evaluateJSON, uses a bufio.NewReader & reader.Peek to evaluate if the +// data stream contains a json array as the root element or not, without +// advancing the reader. If the data stream contains an array as the root +// element, the value of the boolean return type is set to true. +func evaluateJSON(reader *bufio.Reader) (io.Reader, bool, error) { + eof := false + for i := 0; ; i++ { + b, err := reader.Peek((i + 1) * 5) + if errors.Is(err, io.EOF) { + eof = true + } + startByte := i * 5 + for j := 0; j < len(b[startByte:]); j++ { + char := b[startByte+j : startByte+j+1] + switch { + case bytes.Equal(char, []byte("[")): + return reader, true, nil + case bytes.Equal(char, []byte("{")): + return reader, false, nil + case unicode.IsSpace(bytes.Runes(char)[0]): + continue + default: + return nil, false, fmt.Errorf("unexpected error: JSON data is malformed") + } + } + if eof { + return nil, false, fmt.Errorf("unexpected error: JSON data is malformed") + } + } +} func (j *job) createEvent(message string, offset int64) beat.Event { event := beat.Event{ Timestamp: time.Now(), @@ -227,8 +301,8 @@ func (j *job) createEvent(message string, offset int64) beat.Event { "name": j.src.ContainerName, }, "blob": mapstr.M{ - "name": j.blob.Name, - "content_type": j.blob.Properties.ContentType, + "name": *j.blob.Name, + "content_type": *j.blob.Properties.ContentType, }, }, }, diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data.go b/x-pack/filebeat/input/azureblobstorage/mock/data.go index a566e4bbc03..1132df16900 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data.go @@ -47,7 +47,7 @@ var fetchContainer = map[string]string{ ata.json - Wed, 14 Sep 2022 12:12:28 GMT + Wed, 12 Sep 2022 12:12:28 GMT 0x8DA964A64516C82 643 application/json @@ -83,7 +83,7 @@ var fetchContainer = map[string]string{ docs/ata.json - Wed, 14 Sep 2022 12:13:07 GMT + Wed, 15 Sep 2022 12:13:07 GMT 0x8DA964A7B8D8862 643 application/json diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go index ed6f3ad7bb3..693cd89c610 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go @@ -9,6 +9,7 @@ const ( beatsJSONContainer = "beatsjsoncontainer" beatsNdJSONContainer = "beatsndjsoncontainer" beatsGzJSONContainer = "beatsgzjsoncontainer" + beatsJSONWithArrayContainer = "beatsjsonwitharraycontainer" ) var fileContainers = map[string]bool{ @@ -16,6 +17,7 @@ var fileContainers = map[string]bool{ beatsJSONContainer: true, beatsNdJSONContainer: true, beatsGzJSONContainer: true, + beatsJSONWithArrayContainer: true, } var availableFileBlobs = map[string]map[string]bool{ @@ -32,6 +34,10 @@ var availableFileBlobs = map[string]map[string]bool{ beatsGzJSONContainer: { "multiline.json.gz": true, }, + beatsJSONWithArrayContainer: { + "array-at-root.json": true, + "nested-arrays.json": true, + }, } var fetchFilesContainer = map[string]string{ @@ -125,6 +131,48 @@ var fetchFilesContainer = map[string]string{ `, + beatsJSONWithArrayContainer: ` + + + + array-at-root.json + + Wed, 14 Sep 2022 12:12:28 GMT + 0x8DA964A64516C82 + 643 + application/json + + + UjQX73kQRTHx+UyXZDvVkg== + + + BlockBlob + unlocked + available + + + + + nested-arrays.json + + Wed, 14 Sep 2022 12:12:28 GMT + 0x8DA964A64516C83 + 643 + application/json + + + UjQX73kQRTHx+UyXZDvVkg== + + + BlockBlob + unlocked + available + + + + + + `, beatsGzJSONContainer: ` @@ -167,6 +215,13 @@ var BeatsFilesContainer_log_ndjson = []string{ `{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available space is 44.3gb"}`, } +var BeatsFilesContainer_json_array = []string{ + "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", + "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", + "[\n {\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n },\n {\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }\n ]", + "[\n {\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hi\"\n },\n {\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"seoul\"\n }\n ]", +} + var BeatsFilesContainer_multiline_json_gz = []string{ "{\n \"@timestamp\": \"2021-05-25T17:25:42.806Z\",\n \"log.level\": \"error\",\n \"message\": \"error making http request\"\n}", "{\n \"@timestamp\": \"2021-05-25T17:25:51.391Z\",\n \"log.level\": \"info\",\n \"message\": \"available disk space 44.3gb\"\n}", diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json new file mode 100644 index 00000000000..68cd2b41e7b --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json @@ -0,0 +1,10 @@ +[ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } +] \ No newline at end of file diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json new file mode 100644 index 00000000000..588e549867a --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/nested-arrays.json @@ -0,0 +1,23 @@ +[ + [ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } + ], + + [ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hi" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "seoul" + } + ] +] diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index bfa13821dd4..7b09cc49fd7 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -7,6 +7,7 @@ package azureblobstorage import ( "context" "fmt" + "strings" "sync" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" @@ -107,6 +108,13 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { var jobs []*job for _, v := range resp.Segment.BlobItems { + if s.src.PathPrefix != "" && !strings.HasPrefix(*v.Name, s.src.PathPrefix) { + continue + } + // date filter is applied on last modified time of the blob + if s.src.TimeStampEpoch != nil && v.Properties.LastModified.Unix() < *s.src.TimeStampEpoch { + continue + } blobURL := s.serviceURL + s.src.ContainerName + "/" + *v.Name blobCreds := &blobCredentials{ serviceCreds: s.credential, diff --git a/x-pack/filebeat/input/azureblobstorage/state.go b/x-pack/filebeat/input/azureblobstorage/state.go index de1873e9042..4090ba6feec 100644 --- a/x-pack/filebeat/input/azureblobstorage/state.go +++ b/x-pack/filebeat/input/azureblobstorage/state.go @@ -25,7 +25,10 @@ type Checkpoint struct { BlobName string // timestamp to denote which is the latest blob LatestEntryTime time.Time - // map to contain offset data + // a mapping from object name to whether the object is having an array type as it's root. + IsRootArray map[string]bool + // a mapping from object name to an array index that contains the last processed offset for that object. + // if isRootArray == true for object, then PartiallyProcessed will treat offset as an array index PartiallyProcessed map[string]int64 } @@ -33,6 +36,7 @@ func newState() *state { return &state{ cp: &Checkpoint{ PartiallyProcessed: make(map[string]int64), + IsRootArray: make(map[string]bool), }, } } @@ -44,6 +48,7 @@ func newState() *state { func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() delete(s.cp.PartiallyProcessed, name) + delete(s.cp.IsRootArray, name) if len(s.cp.BlobName) == 0 { s.cp.BlobName = name } else if strings.ToLower(name) > strings.ToLower(s.cp.BlobName) { @@ -67,11 +72,30 @@ func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, don return s.cp, func() { s.mu.Unlock() } } +// setRootArray, sets boolean true for objects that have their roots defined as an array type, locks the state +// and returns an unlock function done(). The caller must call done when s is no longer needed in a locked state. +func (s *state) setRootArray(name string) (done func()) { + s.mu.Lock() + s.cp.IsRootArray[name] = true + return func() { s.mu.Unlock() } +} + +// isRootArray, returns true if the object has it's root defined as an array type, locks the state +// and returns an unlock function done(). The caller must call done when s and result are no longer needed in a locked state. +func (s *state) isRootArray(name string) (result bool, done func()) { + s.mu.Lock() + result = s.cp.IsRootArray[name] + return result, func() { s.mu.Unlock() } +} + // setCheckpoint sets checkpoint from source to current state instance func (s *state) setCheckpoint(chkpt *Checkpoint) { if chkpt.PartiallyProcessed == nil { chkpt.PartiallyProcessed = make(map[string]int64) } + if chkpt.IsRootArray == nil { + chkpt.IsRootArray = make(map[string]bool) + } s.cp = chkpt } diff --git a/x-pack/filebeat/input/azureblobstorage/types.go b/x-pack/filebeat/input/azureblobstorage/types.go index ef3e2480fc4..2f6365fa9a6 100644 --- a/x-pack/filebeat/input/azureblobstorage/types.go +++ b/x-pack/filebeat/input/azureblobstorage/types.go @@ -13,11 +13,13 @@ import ( // Source, it is the cursor source type Source struct { - ContainerName string - AccountName string - MaxWorkers int - Poll bool - PollInterval time.Duration + ContainerName string + AccountName string + MaxWorkers int + Poll bool + PollInterval time.Duration + PathPrefix string + TimeStampEpoch *int64 } func (s *Source) Name() string { From 6488a4c5e9d9e87ec4777f7b20ce94588c3c44c5 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 27 Sep 2023 18:33:09 +0530 Subject: [PATCH 03/11] added support for fileselectors, timestamp filtering and expand event field --- .../filebeat/input/azureblobstorage/config.go | 41 ++++--- .../filebeat/input/azureblobstorage/input.go | 41 ++++--- .../input/azureblobstorage/input_stateless.go | 15 +-- .../input/azureblobstorage/input_test.go | 66 ++++++++++- x-pack/filebeat/input/azureblobstorage/job.go | 104 +++++++++++++++--- .../input/azureblobstorage/mock/data_files.go | 5 + .../input/azureblobstorage/scheduler.go | 14 ++- .../filebeat/input/azureblobstorage/state.go | 4 +- .../filebeat/input/azureblobstorage/types.go | 15 +-- 9 files changed, 236 insertions(+), 69 deletions(-) diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index d10c7c00786..19de36c8369 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -6,31 +6,42 @@ package azureblobstorage import ( "time" + + "github.com/elastic/beats/v7/libbeat/common/match" ) // MaxWorkers, Poll, PollInterval & PathPrefix can be configured at a global level, // which applies to all containers. They can also be configured at individual container levels. // Container level configurations will always override global level values. type config struct { - AccountName string `config:"account_name" validate:"required"` - StorageURL string `config:"storage_url,omitempty"` - Auth authConfig `config:"auth" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - Containers []container `config:"containers" validate:"required"` - PathPrefix string `config:"path_prefix,omitempty"` - TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + AccountName string `config:"account_name" validate:"required"` + StorageURL string `config:"storage_url,omitempty"` + Auth authConfig `config:"auth" validate:"required"` + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + Poll *bool `config:"poll,omitempty"` + PollInterval *time.Duration `config:"poll_interval,omitempty"` + Containers []container `config:"containers" validate:"required"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` } // container contains the config for each specific blob storage container in the root account type container struct { - Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - PathPrefix string `config:"path_prefix,omitempty"` - TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + Name string `config:"name" validate:"required"` + MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + Poll *bool `config:"poll,omitempty"` + PollInterval *time.Duration `config:"poll_interval,omitempty"` + FileSelectors []fileSelectorConfig `config:"file_selectors"` + TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` +} + +// fileSelectorConfig defines reader configuration that applies to a subset +// of azure blobs whose name matches the given regex. +type fileSelectorConfig struct { + Regex *match.Matcher `config:"regex" validate:"required"` + // TODO: Add support for reader config in future } type authConfig struct { diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index a6134f73141..457376799af 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -53,14 +53,18 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { var sources []cursor.Source for _, c := range config.Containers { container := tryOverrideOrDefault(config, c) + if container.TimeStampEpoch != nil && !isValidUnixTimestamp(*container.TimeStampEpoch) { + return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *container.TimeStampEpoch) + } sources = append(sources, &Source{ - AccountName: config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, - PathPrefix: container.PathPrefix, - TimeStampEpoch: container.TimeStampEpoch, + AccountName: config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + TimeStampEpoch: container.TimeStampEpoch, + ExpandEventListFromField: container.ExpandEventListFromField, + FileSelectors: container.FileSelectors, }) } @@ -87,7 +91,6 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.MaxWorkers = &maxWorkers } - if c.Poll == nil { var poll bool if cfg.Poll != nil { @@ -95,7 +98,6 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.Poll = &poll } - if c.PollInterval == nil { interval := time.Second * 300 if cfg.PollInterval != nil { @@ -103,17 +105,28 @@ func tryOverrideOrDefault(cfg config, c container) container { } c.PollInterval = &interval } - - if c.PathPrefix == "" { - c.PathPrefix = cfg.PathPrefix - } - if c.TimeStampEpoch == nil { c.TimeStampEpoch = cfg.TimeStampEpoch } + if c.ExpandEventListFromField == "" { + c.ExpandEventListFromField = cfg.ExpandEventListFromField + } + if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) > 0 { + c.FileSelectors = append(c.FileSelectors, cfg.FileSelectors...) + } return c } +// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp +func isValidUnixTimestamp(timestamp int64) bool { + // defines the valid range for Unix timestamps + minTimestamp := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() + maxTimestamp := time.Date(2106, time.February, 7, 6, 28, 15, 0, time.UTC).Unix() + + // checks if the timestamp is within the valid range + return timestamp >= minTimestamp && timestamp <= maxTimestamp +} + func (input *azurebsInput) Name() string { return inputName } diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index 0c0e9810bd2..381c026cf82 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -46,13 +46,14 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher for _, c := range in.config.Containers { container := tryOverrideOrDefault(in.config, c) source = &Source{ - AccountName: in.config.AccountName, - ContainerName: c.Name, - MaxWorkers: *container.MaxWorkers, - Poll: *container.Poll, - PollInterval: *container.PollInterval, - PathPrefix: container.PathPrefix, - TimeStampEpoch: container.TimeStampEpoch, + AccountName: in.config.AccountName, + ContainerName: c.Name, + MaxWorkers: *container.MaxWorkers, + Poll: *container.Poll, + PollInterval: *container.PollInterval, + TimeStampEpoch: container.TimeStampEpoch, + ExpandEventListFromField: container.ExpandEventListFromField, + FileSelectors: container.FileSelectors, } st := newState() diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index d33ca519c49..89b55d8b17b 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -340,11 +340,11 @@ func Test_StorageClient(t *testing.T) { }, }, { - name: "PathPrefix", + name: "FilterByTimeStampEpoch", baseConfig: map[string]interface{}{ "account_name": "beatsblobnew", "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", - "path_prefix": "docs/", + "timestamp_epoch": 1663157564, "max_workers": 2, "poll": false, "poll_interval": "10s", @@ -356,18 +356,50 @@ func Test_StorageClient(t *testing.T) { }, mockHandler: mock.AzureStorageServer, expected: map[string]bool{ + mock.Beatscontainer_blob_data3_json: true, mock.Beatscontainer_blob_docs_ata_json: true, }, }, { - name: "TimeStampEpoch", + name: "FilterByFileSelectorRegexSingle", baseConfig: map[string]interface{}{ "account_name": "beatsblobnew", "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", - "timestamp_epoch": 1663157564, "max_workers": 2, "poll": false, "poll_interval": "10s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_docs_ata_json: true, + }, + }, + { + name: "FilterByFileSelectorRegexMulti", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": false, + "poll_interval": "10s", + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + { + "regex": "data", + }, + }, "containers": []map[string]interface{}{ { "name": beatsContainer, @@ -380,6 +412,32 @@ func Test_StorageClient(t *testing.T) { mock.Beatscontainer_blob_docs_ata_json: true, }, }, + { + name: "ExpandEventListFromField", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "expand_event_list_from_field": "Events", + "file_selectors": []map[string]interface{}{ + { + "regex": "events-array", + }, + }, + "containers": []map[string]interface{}{ + { + "name": beatsJSONContainer, + }, + }, + }, + mockHandler: mock.AzureStorageFileServer, + expected: map[string]bool{ + mock.BeatsFilesContainer_events_array_json[0]: true, + mock.BeatsFilesContainer_events_array_json[1]: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index 60d106b219c..b4be6b77281 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -196,35 +196,102 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er if err := dec.Decode(&item); err != nil { return fmt.Errorf("failed to decode json: %w", err) } - // manually seek offset only if file is compressed or if root element is an array - if (j.isCompressed || j.isRootArray) && offset < j.offset { + // if expand_event_list_from_field is set, then split the event list + // here we assume that the field is always an array and it is the only field in the object + if j.src.ExpandEventListFromField != "" { + if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil { + return err + } continue + } else { + // manually seek offset only if file is compressed or if root element is an array + if (j.isCompressed || j.isRootArray) && offset < j.offset { + continue + } + + data, err := item.MarshalJSON() + if err != nil { + return err + } + evt := j.createEvent(string(data), offset+relativeOffset) + // updates the offset after reading the file + // this avoids duplicates for the last read when resuming operation + offset = dec.InputOffset() + var ( + cp *Checkpoint + done func() + ) + if !dec.More() { + // if this is the last object, then peform a complete state save + cp, done = j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + } else { + // partially saves read state using offset + cp, done = j.state.savePartialForTx(*j.blob.Name, offset+relativeOffset) + } + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorf(jobErrString, id, err) + } + done() + } + } + return nil +} + +// splitEventList, splits the event list into individual events and publishes them +func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { + var jsonObject map[string]json.RawMessage + if err := json.Unmarshal(raw, &jsonObject); err != nil { + return err + } + + var found bool + raw, found = jsonObject[key] + if !found { + return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + + tok, err := dec.Token() + if err != nil { + return err + } + delim, ok := tok.(json.Delim) + if !ok || delim != '[' { + return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + } + + for dec.More() { + arrayOffset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) } data, err := item.MarshalJSON() if err != nil { return err } - evt := j.createEvent(string(data), offset+relativeOffset) - // updates the offset after reading the file - // this avoids duplicates for the last read when resuming operation - offset = dec.InputOffset() - var ( - cp *Checkpoint - done func() - ) + evt := j.createEvent(string(data), offset+arrayOffset) + if !dec.More() { - // if this is the last object, then peform a complete state save - cp, done = j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + // if this is the last object, then save checkpoint + // we are not supporting partial saves atm for split event list op + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorf(jobErrString, id, err) + } + done() } else { - // partially saves read state using offset - cp, done = j.state.savePartialForTx(*j.blob.Name, offset+relativeOffset) - } - if err := j.publisher.Publish(evt, cp); err != nil { - j.log.Errorf(jobErrString, id, err) + // since we don't update the cursor checkpoint, lack of a lock here should be fine + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorf(jobErrString, id, err) + } } - done() } + return nil } @@ -284,6 +351,7 @@ func evaluateJSON(reader *bufio.Reader) (io.Reader, bool, error) { } } } + func (j *job) createEvent(message string, offset int64) beat.Event { event := beat.Event{ Timestamp: time.Now(), diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go index 693cd89c610..3273d329412 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_files.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_files.go @@ -215,6 +215,11 @@ var BeatsFilesContainer_log_ndjson = []string{ `{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available space is 44.3gb"}`, } +var BeatsFilesContainer_events_array_json = []string{ + "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", + "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", +} + var BeatsFilesContainer_json_array = []string{ "{\n \"time\": \"2021-05-25 18:20:58 UTC\",\n \"msg\": \"hello\"\n }", "{\n \"time\": \"2021-05-26 22:21:40 UTC\",\n \"msg\": \"world\"\n }", diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index 7b09cc49fd7..a3da3eab8f8 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -7,7 +7,6 @@ package azureblobstorage import ( "context" "fmt" - "strings" "sync" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" @@ -95,6 +94,7 @@ func (s *scheduler) schedule(ctx context.Context) error { func (s *scheduler) scheduleOnce(ctx context.Context) error { defer s.limiter.wait() pager := s.fetchBlobPager(int32(s.src.MaxWorkers)) + fileSelectorLen := len(s.src.FileSelectors) var numBlobs, numJobs int for pager.More() { @@ -108,7 +108,8 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { var jobs []*job for _, v := range resp.Segment.BlobItems { - if s.src.PathPrefix != "" && !strings.HasPrefix(*v.Name, s.src.PathPrefix) { + // if file selectors are present, then only select the files that match the regex + if fileSelectorLen != 0 && !s.isFileSelected(*v.Name) { continue } // date filter is applied on last modified time of the blob @@ -228,3 +229,12 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { return jobsToReturn } + +func (s *scheduler) isFileSelected(name string) bool { + for _, sel := range s.src.FileSelectors { + if sel.Regex == nil || sel.Regex.MatchString(name) { + return true + } + } + return false +} diff --git a/x-pack/filebeat/input/azureblobstorage/state.go b/x-pack/filebeat/input/azureblobstorage/state.go index 4090ba6feec..0a1229837ef 100644 --- a/x-pack/filebeat/input/azureblobstorage/state.go +++ b/x-pack/filebeat/input/azureblobstorage/state.go @@ -80,8 +80,8 @@ func (s *state) setRootArray(name string) (done func()) { return func() { s.mu.Unlock() } } -// isRootArray, returns true if the object has it's root defined as an array type, locks the state -// and returns an unlock function done(). The caller must call done when s and result are no longer needed in a locked state. +// isRootArray, returns true if the object has it's root defined as an array type and has been partially processed, it also locks the state +// and returns an unlock function done(). The caller must call done when 's' and 'result' are no longer needed in a locked state. func (s *state) isRootArray(name string) (result bool, done func()) { s.mu.Lock() result = s.cp.IsRootArray[name] diff --git a/x-pack/filebeat/input/azureblobstorage/types.go b/x-pack/filebeat/input/azureblobstorage/types.go index 2f6365fa9a6..67012198423 100644 --- a/x-pack/filebeat/input/azureblobstorage/types.go +++ b/x-pack/filebeat/input/azureblobstorage/types.go @@ -13,13 +13,14 @@ import ( // Source, it is the cursor source type Source struct { - ContainerName string - AccountName string - MaxWorkers int - Poll bool - PollInterval time.Duration - PathPrefix string - TimeStampEpoch *int64 + ContainerName string + AccountName string + MaxWorkers int + Poll bool + PollInterval time.Duration + TimeStampEpoch *int64 + FileSelectors []fileSelectorConfig + ExpandEventListFromField string } func (s *Source) Name() string { From 0e12e5574556c0c5578498bf4775aa218f5c7b12 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 27 Sep 2023 19:25:17 +0530 Subject: [PATCH 04/11] updated asciidoc, updated comments --- .../inputs/input-azure-blob-storage.asciidoc | 87 +++++++++++++++++++ .../filebeat/input/azureblobstorage/config.go | 7 +- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index ec91b25aaf9..2bcbd0a751c 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -138,6 +138,9 @@ Now let's explore the configuration attributes a bit more elaborately. 7. <> 8. <> 9. <> + 10. <> + 11. <> + 12. <> [id="attrib-account-name"] @@ -214,6 +217,90 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +[id="attrib-file_selectors"] +[float] +==== `file_selectors` + +If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit +the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match one of the regex's won't be processed. +This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +[source, yml] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + file_selectors: + - regex: '/CloudTrail/' + - regex: 'docs/' + - regex: '/Security-Logs/' +---- + +[id="attrib-expand_event_list_from_field"] +[float] +==== `expand_event_list_from_field` + +If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. +For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed event. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +["source","json"] +---- +{ + "Records": [ + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:51:00Z", + "awsRegion": "us-east-1", + "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", + }, + { + "eventVersion": "1.07", + "eventTime": "2019-11-14T00:52:00Z", + "awsRegion": "us-east-1", + "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", + } + ] +} +---- + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + expand_event_list_from_field: Records +---- + +NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. + +[id="attrib-timestamp_epoch"] +[float] +==== `timestamp_epoch` + +This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` +(seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: azure-blob-storage + id: my-azureblobstorage-id + enabled: true + account_name: some_account + auth.shared_credentials.account_key: some_key + containers: + - name: container_1 + timestamp_epoch: 1627233600 +---- [id="container-overrides"] *The sample configs below will explain the container level overriding of attributes a bit further :-* diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index 19de36c8369..c4c62016a20 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -10,8 +10,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common/match" ) -// MaxWorkers, Poll, PollInterval & PathPrefix can be configured at a global level, -// which applies to all containers. They can also be configured at individual container levels. +// MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can +// be configured at a global level, which applies to all containers. They can also be configured at individual container levels. // Container level configurations will always override global level values. type config struct { AccountName string `config:"account_name" validate:"required"` @@ -37,8 +37,7 @@ type container struct { ExpandEventListFromField string `config:"expand_event_list_from_field"` } -// fileSelectorConfig defines reader configuration that applies to a subset -// of azure blobs whose name matches the given regex. +// fileSelectorConfig helps filter out azure blobs based on a regex pattern type fileSelectorConfig struct { Regex *match.Matcher `config:"regex" validate:"required"` // TODO: Add support for reader config in future From e69e99887c5c22b804fb9793acb96c207bf2af6c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 27 Sep 2023 19:50:44 +0530 Subject: [PATCH 05/11] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 228d201f74f..f04f97d6c55 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659] - Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] +- Added support for new features in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] *Auditbeat* From 2dc42120cddc0ced27e44fd6d7182dd732d3af04 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 28 Sep 2023 18:58:31 +0530 Subject: [PATCH 06/11] addressed PR suggestions --- .../inputs/input-azure-blob-storage.asciidoc | 11 +++---- .../filebeat/input/azureblobstorage/input.go | 12 ++++--- .../input/azureblobstorage/input_stateless.go | 13 +++++--- .../input/azureblobstorage/input_test.go | 33 +++++++++++++++++++ x-pack/filebeat/input/azureblobstorage/job.go | 17 +++++----- .../azureblobstorage/mock/data_random.go | 4 +-- .../mock/testdata/array-at-root.json | 2 +- .../azureblobstorage/mock/testdata/log.json | 2 +- .../mock/testdata/multiline.json | 2 +- 9 files changed, 65 insertions(+), 31 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index 2bcbd0a751c..dff63da340d 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -222,8 +222,7 @@ take priority and override the root level values if both are specified. ==== `file_selectors` If the Azure blob storage container will have blobs that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit -the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match one of the regex's won't be processed. -This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the blob name or should be a part of the blob name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. [source, yml] ---- @@ -245,8 +244,7 @@ filebeat.inputs: [float] ==== `expand_event_list_from_field` -If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. -For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed event. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. ["source","json"] ---- { @@ -280,14 +278,13 @@ filebeat.inputs: expand_event_list_from_field: Records ---- -NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. +NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob. [id="attrib-timestamp_epoch"] [float] ==== `timestamp_epoch` -This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` -(seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. ["source","yaml",subs="attributes"] ---- diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 457376799af..daf01666087 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -23,6 +23,12 @@ type azurebsInput struct { serviceURL string } +// defines the valid range for Unix timestamps for 64 bit integers +var ( + minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() + maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() +) + const ( inputName string = "azure-blob-storage" ) @@ -119,12 +125,8 @@ func tryOverrideOrDefault(cfg config, c container) container { // isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp func isValidUnixTimestamp(timestamp int64) bool { - // defines the valid range for Unix timestamps - minTimestamp := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix() - maxTimestamp := time.Date(2106, time.February, 7, 6, 28, 15, 0, time.UTC).Unix() - // checks if the timestamp is within the valid range - return timestamp >= minTimestamp && timestamp <= maxTimestamp + return minTimestamp <= timestamp && timestamp <= maxTimestamp } func (input *azurebsInput) Name() string { diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index 381c026cf82..91b21011acf 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -11,6 +11,7 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" + "golang.org/x/sync/errgroup" ) type statelessInput struct { @@ -43,6 +44,7 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error { func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher) error { pub := statelessPublisher{wrapped: publisher} var source cursor.Source + var g errgroup.Group for _, c := range in.config.Containers { container := tryOverrideOrDefault(in.config, c) source = &Source{ @@ -76,10 +78,11 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher } scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log) - err = scheduler.schedule(ctx) - if err != nil { - return err - } + // allows multiple containers to be scheduled concurrently while testing + g.Go(func() error { + return scheduler.schedule(ctx) + }) + } - return nil + return g.Wait() } diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index 89b55d8b17b..d4b004980b8 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -438,6 +438,39 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesContainer_events_array_json[1]: true, }, }, + { + name: "MultiContainerWithMultiFileSelectors", + baseConfig: map[string]interface{}{ + "account_name": "beatsblobnew", + "auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==", + "max_workers": 2, + "poll": true, + "poll_interval": "10s", + "containers": []map[string]interface{}{ + { + "name": beatsContainer, + "file_selectors": []map[string]interface{}{ + { + "regex": "docs/", + }, + }, + }, + { + "name": beatsContainer2, + "file_selectors": []map[string]interface{}{ + { + "regex": "data_3", + }, + }, + }, + }, + }, + mockHandler: mock.AzureStorageServer, + expected: map[string]bool{ + mock.Beatscontainer_blob_docs_ata_json: true, + mock.Beatscontainer_2_blob_data3_json: true, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index b4be6b77281..9465fef83a4 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -42,7 +42,7 @@ type job struct { // offset value for an object, it points to the location inside the data stream // from where we can start processing the object. offset int64 - // flag to denote if object is gZipped compressed or not + // flag to denote if object is gZip compressed or not isCompressed bool // flag to denote if object's root element is of an array type isRootArray bool @@ -136,8 +136,9 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { return fmt.Errorf("failed to download data from blob with error: %w", err) } // we hardcode a retry count of 3 here + const maxRetries = 3 reader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{ - MaxRetries: 3, + MaxRetries: maxRetries, }) defer func() { err = reader.Close() @@ -155,8 +156,7 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { - var err error - r, err = j.addGzipDecoderIfNeeded(bufio.NewReader(r)) + r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r)) if err != nil { return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err) } @@ -174,6 +174,7 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er } dec := json.NewDecoder(r) + dec.UseNumber() // If array is present at root then read json token and advance decoder if j.isRootArray { _, err := dec.Token() @@ -189,7 +190,6 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er if !j.isCompressed && !j.isRootArray { relativeOffset = j.offset } - dec.UseNumber() for dec.More() && ctx.Err() == nil { var item json.RawMessage offset = dec.InputOffset() @@ -237,15 +237,14 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er return nil } -// splitEventList, splits the event list into individual events and publishes them +// splitEventList splits the event list into individual events and publishes them func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objHash string, id string) error { var jsonObject map[string]json.RawMessage if err := json.Unmarshal(raw, &jsonObject); err != nil { return err } - var found bool - raw, found = jsonObject[key] + raw, found := jsonObject[key] if !found { return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) } @@ -321,7 +320,7 @@ func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { return gzip.NewReader(bufReader) } -// evaluateJSON, uses a bufio.NewReader & reader.Peek to evaluate if the +// evaluateJSON uses a bufio.NewReader & reader.Peek to evaluate if the // data stream contains a json array as the root element or not, without // advancing the reader. If the data stream contains an array as the root // element, the value of the boolean return type is set to true. diff --git a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go index bc5db7d661e..b4d92735e69 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/data_random.go +++ b/x-pack/filebeat/input/azureblobstorage/mock/data_random.go @@ -66,7 +66,7 @@ func generateMetadata() []byte { // Helper function to create a random Blob func createRandomBlob(i int) Blob { - rand.Seed(time.Now().UnixNano()) + rand.New(rand.NewSource(12345)) return Blob{ Name: fmt.Sprintf("data_%d.json", i), @@ -104,7 +104,7 @@ func generateRandomBlob() []byte { } func createRandomData() MyData { - rand.New(rand.NewSource(time.Now().UnixNano())) + rand.New(rand.NewSource(12345)) return MyData{ ID: rand.Intn(1000) + 1, diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json index 68cd2b41e7b..8d22df6aeb8 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/array-at-root.json @@ -7,4 +7,4 @@ "time": "2021-05-26 22:21:40 UTC", "msg": "world" } -] \ No newline at end of file +] diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json index f6aaf5ec64d..b88e0545284 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/log.json @@ -1 +1 @@ -{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} \ No newline at end of file +{"@timestamp":"2021-05-25T17:25:42.806Z","log.level":"error","message":"error making http request"}{"@timestamp":"2021-05-25T17:25:51.391Z","log.level":"info","message":"available disk space 44.3gb"} diff --git a/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json b/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json index 974d296762a..32d319af2bc 100644 --- a/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json +++ b/x-pack/filebeat/input/azureblobstorage/mock/testdata/multiline.json @@ -7,4 +7,4 @@ "@timestamp": "2021-05-25T17:25:51.391Z", "log.level": "info", "message": "available space 44.3gb" -} \ No newline at end of file +} From 5f53d052fdd60f5a754971167f6e0fe9f3cef881 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 28 Sep 2023 19:11:32 +0530 Subject: [PATCH 07/11] addressed linting errors --- x-pack/filebeat/input/azureblobstorage/input_stateless.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index 91b21011acf..d0d9500a8a4 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -11,6 +11,7 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" + "golang.org/x/sync/errgroup" ) From 72503ec622d0a750d1404f0dcf10e61809888606 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Sep 2023 12:32:30 +0530 Subject: [PATCH 08/11] addressed further PR suggestions --- .../inputs/input-azure-blob-storage.asciidoc | 9 +++++---- .../filebeat/input/azureblobstorage/config.go | 18 +++++++++--------- .../filebeat/input/azureblobstorage/input.go | 2 +- .../input/azureblobstorage/input_stateless.go | 6 ++++-- x-pack/filebeat/input/azureblobstorage/job.go | 17 +++++++++-------- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index dff63da340d..09d8f22380e 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -244,7 +244,8 @@ filebeat.inputs: [float] ==== `expand_event_list_from_field` -If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if +you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. ["source","json"] ---- { @@ -252,13 +253,13 @@ If the file-set using this input expects to receive multiple messages bundled un { "eventVersion": "1.07", "eventTime": "2019-11-14T00:51:00Z", - "awsRegion": "us-east-1", + "region": "us-east-1", "eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE", }, { "eventVersion": "1.07", "eventTime": "2019-11-14T00:52:00Z", - "awsRegion": "us-east-1", + "region": "us-east-1", "eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE", } ] @@ -284,7 +285,7 @@ NOTE: This attribute is only applicable for JSON file formats. You do not requir [float] ==== `timestamp_epoch` -This attribute can be used to filter out files/blobs which have a timestamp greater than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +This attribute can be used to filter out files/blobs which have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `LastModified Timestamp` obtained from the blob metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. ["source","yaml",subs="attributes"] ---- diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index c4c62016a20..a780e60216d 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -15,25 +15,25 @@ import ( // Container level configurations will always override global level values. type config struct { AccountName string `config:"account_name" validate:"required"` - StorageURL string `config:"storage_url,omitempty"` + StorageURL string `config:"storage_url"` Auth authConfig `config:"auth" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` Containers []container `config:"containers" validate:"required"` FileSelectors []fileSelectorConfig `config:"file_selectors"` - TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + TimeStampEpoch *int64 `config:"timestamp_epoch"` ExpandEventListFromField string `config:"expand_event_list_from_field"` } // container contains the config for each specific blob storage container in the root account type container struct { Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` FileSelectors []fileSelectorConfig `config:"file_selectors"` - TimeStampEpoch *int64 `config:"timestamp_epoch,omitempty"` + TimeStampEpoch *int64 `config:"timestamp_epoch"` ExpandEventListFromField string `config:"expand_event_list_from_field"` } diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index daf01666087..d78939e177c 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -118,7 +118,7 @@ func tryOverrideOrDefault(cfg config, c container) container { c.ExpandEventListFromField = cfg.ExpandEventListFromField } if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) > 0 { - c.FileSelectors = append(c.FileSelectors, cfg.FileSelectors...) + c.FileSelectors = cfg.FileSelectors } return c } diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index d0d9500a8a4..73ae14a62e5 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -7,12 +7,12 @@ package azureblobstorage import ( "context" + "golang.org/x/sync/errgroup" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" - - "golang.org/x/sync/errgroup" ) type statelessInput struct { @@ -80,6 +80,8 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher scheduler := newScheduler(pub, containerClient, credential, currentSource, &in.config, st, in.serviceURL, log) // allows multiple containers to be scheduled concurrently while testing + // the stateless input is triggered only while testing and till now it did not mimic + // the real world concurrent execution of multiple containers. This fix allows it to do so. g.Go(func() error { return scheduler.schedule(ctx) }) diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index 9465fef83a4..975ee6ea7e8 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -31,9 +31,9 @@ import ( const jobErrString = "job with jobId %s encountered an error: %w" type job struct { - // azure blob client handle + // client is an azure blob handle client *blob.Client - // azure blob item handle + // blob is an azure blob item handle blob *azcontainer.BlobItem // azure blob url for the resource blobURL string @@ -42,7 +42,7 @@ type job struct { // offset value for an object, it points to the location inside the data stream // from where we can start processing the object. offset int64 - // flag to denote if object is gZip compressed or not + // flag to denote if object is gzip compressed or not isCompressed bool // flag to denote if object's root element is of an array type isRootArray bool @@ -135,7 +135,6 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("failed to download data from blob with error: %w", err) } - // we hardcode a retry count of 3 here const maxRetries = 3 reader := get.NewRetryReader(ctx, &azblob.RetryReaderOptions{ MaxRetries: maxRetries, @@ -218,8 +217,8 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() var ( - cp *Checkpoint done func() + cp *Checkpoint ) if !dec.More() { // if this is the last object, then peform a complete state save @@ -277,7 +276,6 @@ func (j *job) splitEventList(key string, raw json.RawMessage, offset int64, objH if !dec.More() { // if this is the last object, then save checkpoint - // we are not supporting partial saves atm for split event list op cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorf(jobErrString, id, err) @@ -326,12 +324,15 @@ func (j *job) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { // element, the value of the boolean return type is set to true. func evaluateJSON(reader *bufio.Reader) (io.Reader, bool, error) { eof := false + // readSize is the constant value in the incremental read operation, this value is arbitrary + // but works well for our use case + const readSize = 5 for i := 0; ; i++ { - b, err := reader.Peek((i + 1) * 5) + b, err := reader.Peek((i + 1) * readSize) if errors.Is(err, io.EOF) { eof = true } - startByte := i * 5 + startByte := i * readSize for j := 0; j < len(b[startByte:]); j++ { char := b[startByte+j : startByte+j+1] switch { From 2d4a4695a445eeea6b9b9ae5eb4e68737deada59 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Sep 2023 12:39:09 +0530 Subject: [PATCH 09/11] updated FileSelectors assignment condition --- x-pack/filebeat/input/azureblobstorage/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index d78939e177c..28484c037f7 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -117,7 +117,7 @@ func tryOverrideOrDefault(cfg config, c container) container { if c.ExpandEventListFromField == "" { c.ExpandEventListFromField = cfg.ExpandEventListFromField } - if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) > 0 { + if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 { c.FileSelectors = cfg.FileSelectors } return c From 615425959f0983e68b850d0c67e780e8a30ed5c7 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Sep 2023 14:01:14 +0530 Subject: [PATCH 10/11] partial save mechanism removed for now due to concurrency issues --- .../inputs/input-azure-blob-storage.asciidoc | 6 +- x-pack/filebeat/input/azureblobstorage/job.go | 78 +++++-------------- .../input/azureblobstorage/scheduler.go | 9 +-- .../filebeat/input/azureblobstorage/state.go | 44 +---------- 4 files changed, 29 insertions(+), 108 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index 09d8f22380e..9208a1ce611 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -27,6 +27,8 @@ even though it can get expensive with dealing with a very large number of files. 3. If any major error occurs which stops the main thread, the logs will be appropriately generated, describing said error. +NOTE: In the latest update the partial save mechanism has been removed for now due to concurrency issues, but will be added back in the future. + [id="supported-types"] NOTE: NOTE: Currently only `JSON` and `NDJSON` are supported blob/file formats. Blobs/files may be also be gzip compressed. As for authentication types, we currently have support for `shared access keys` and `connection strings`. @@ -279,7 +281,7 @@ filebeat.inputs: expand_event_list_from_field: Records ---- -NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events, with support for partial state saves. Using this setting, prevents partial state saves from occurring. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob. +NOTE: This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/blob. [id="attrib-timestamp_epoch"] [float] @@ -362,4 +364,4 @@ In this configuration even though we have specified `max_workers = 10`, `poll = will override these values with their own respective values which are defined as part of their sub attibutes. -NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward. \ No newline at end of file +NOTE: Since this is a beta input, any feedback is welcome, which will help us optimise and make it better going forward. diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index 975ee6ea7e8..77dbe1ed882 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -39,9 +39,6 @@ type job struct { blobURL string // object hash, used in setting event id hash string - // offset value for an object, it points to the location inside the data stream - // from where we can start processing the object. - offset int64 // flag to denote if object is gzip compressed or not isCompressed bool // flag to denote if object's root element is of an array type @@ -87,9 +84,6 @@ func (j *job) do(ctx context.Context, id string) { if *j.blob.Properties.ContentType == gzType || (j.blob.Properties.ContentEncoding != nil && *j.blob.Properties.ContentEncoding == encodingGzip) { j.isCompressed = true } - isRootArray, done := j.state.isRootArray(*j.blob.Name) - done() - j.isRootArray = isRootArray err := j.processAndPublishData(ctx, id) if err != nil { j.log.Errorf(jobErrString, id, err) @@ -125,13 +119,7 @@ func (j *job) timestamp() *time.Time { } func (j *job) processAndPublishData(ctx context.Context, id string) error { - var err error - downloadOptions := &blob.DownloadStreamOptions{} - if !j.isCompressed && !j.isRootArray { - downloadOptions.Range.Offset = j.offset - } - - get, err := j.client.DownloadStream(ctx, downloadOptions) + get, err := j.client.DownloadStream(ctx, &blob.DownloadStreamOptions{}) if err != nil { return fmt.Errorf("failed to download data from blob with error: %w", err) } @@ -160,16 +148,10 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err) } - // if offset == 0, then this is a new stream which has not been processed previously - if j.offset == 0 { - r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) - if err != nil { - return fmt.Errorf("failed to evaluate json for blob: %s, with error: %w", *j.blob.Name, err) - } - if j.isRootArray { - done := j.state.setRootArray(*j.blob.Name) - done() - } + // checks if the root element is an array or not + r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) + if err != nil { + return fmt.Errorf("failed to evaluate json for blob: %s, with error: %w", *j.blob.Name, err) } dec := json.NewDecoder(r) @@ -182,55 +164,37 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er } } - var offset, relativeOffset int64 - // uncompressed files use the client to directly set the offset, this - // in turn causes the offset to reset to 0 for the new stream, hence why - // we need to keep relative offsets to keep track of the actual offset - if !j.isCompressed && !j.isRootArray { - relativeOffset = j.offset - } for dec.More() && ctx.Err() == nil { var item json.RawMessage - offset = dec.InputOffset() + offset := dec.InputOffset() if err := dec.Decode(&item); err != nil { return fmt.Errorf("failed to decode json: %w", err) } // if expand_event_list_from_field is set, then split the event list - // here we assume that the field is always an array and it is the only field in the object if j.src.ExpandEventListFromField != "" { if err := j.splitEventList(j.src.ExpandEventListFromField, item, offset, j.hash, id); err != nil { return err } - continue - } else { - // manually seek offset only if file is compressed or if root element is an array - if (j.isCompressed || j.isRootArray) && offset < j.offset { - continue - } + } - data, err := item.MarshalJSON() - if err != nil { - return err - } - evt := j.createEvent(string(data), offset+relativeOffset) - // updates the offset after reading the file - // this avoids duplicates for the last read when resuming operation - offset = dec.InputOffset() - var ( - done func() - cp *Checkpoint - ) - if !dec.More() { - // if this is the last object, then peform a complete state save - cp, done = j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) - } else { - // partially saves read state using offset - cp, done = j.state.savePartialForTx(*j.blob.Name, offset+relativeOffset) - } + data, err := item.MarshalJSON() + if err != nil { + return err + } + evt := j.createEvent(string(data), offset) + + if !dec.More() { + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorf(jobErrString, id, err) } done() + } else { + // since we don't update the cursor checkpoint, lack of a lock here should be fine + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorf(jobErrString, id, err) + } } } return nil diff --git a/x-pack/filebeat/input/azureblobstorage/scheduler.go b/x-pack/filebeat/input/azureblobstorage/scheduler.go index a3da3eab8f8..ba433b78f41 100644 --- a/x-pack/filebeat/input/azureblobstorage/scheduler.go +++ b/x-pack/filebeat/input/azureblobstorage/scheduler.go @@ -197,10 +197,7 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { ignore := false for _, job := range jobs { - switch offset, isPartial := s.state.cp.PartiallyProcessed[*job.blob.Name]; { - case isPartial: - job.offset = offset - latestJobs = append(latestJobs, job) + switch { case job.timestamp().After(s.state.checkpoint().LatestEntryTime): latestJobs = append(latestJobs, job) case job.name() == s.state.checkpoint().BlobName: @@ -220,9 +217,9 @@ func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job { jobsToReturn = jobs } - // in a senario where there are some jobs which have a later timestamp + // in a senario where there are some jobs which have a greater timestamp // but lesser alphanumeric order and some jobs have greater alphanumeric order - // than the current checkpoint or partially completed jobs are present + // than the current checkpoint blob name, then we append the latest jobs if len(jobsToReturn) != len(jobs) && len(latestJobs) > 0 { jobsToReturn = append(latestJobs, jobsToReturn...) } diff --git a/x-pack/filebeat/input/azureblobstorage/state.go b/x-pack/filebeat/input/azureblobstorage/state.go index 0a1229837ef..43fdf822f2d 100644 --- a/x-pack/filebeat/input/azureblobstorage/state.go +++ b/x-pack/filebeat/input/azureblobstorage/state.go @@ -25,19 +25,11 @@ type Checkpoint struct { BlobName string // timestamp to denote which is the latest blob LatestEntryTime time.Time - // a mapping from object name to whether the object is having an array type as it's root. - IsRootArray map[string]bool - // a mapping from object name to an array index that contains the last processed offset for that object. - // if isRootArray == true for object, then PartiallyProcessed will treat offset as an array index - PartiallyProcessed map[string]int64 } func newState() *state { return &state{ - cp: &Checkpoint{ - PartiallyProcessed: make(map[string]int64), - IsRootArray: make(map[string]bool), - }, + cp: &Checkpoint{}, } } @@ -47,8 +39,6 @@ func newState() *state { // more than once. func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() - delete(s.cp.PartiallyProcessed, name) - delete(s.cp.IsRootArray, name) if len(s.cp.BlobName) == 0 { s.cp.BlobName = name } else if strings.ToLower(name) > strings.ToLower(s.cp.BlobName) { @@ -62,40 +52,8 @@ func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint return s.cp, func() { s.mu.Unlock() } } -// savePartialForTx partially updates and returns the current state checkpoint, locks the state -// and returns an unlock function done(). The caller must call done when -// s and cp are no longer needed in a locked state. done may not be called -// more than once. -func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { - s.mu.Lock() - s.cp.PartiallyProcessed[name] = offset - return s.cp, func() { s.mu.Unlock() } -} - -// setRootArray, sets boolean true for objects that have their roots defined as an array type, locks the state -// and returns an unlock function done(). The caller must call done when s is no longer needed in a locked state. -func (s *state) setRootArray(name string) (done func()) { - s.mu.Lock() - s.cp.IsRootArray[name] = true - return func() { s.mu.Unlock() } -} - -// isRootArray, returns true if the object has it's root defined as an array type and has been partially processed, it also locks the state -// and returns an unlock function done(). The caller must call done when 's' and 'result' are no longer needed in a locked state. -func (s *state) isRootArray(name string) (result bool, done func()) { - s.mu.Lock() - result = s.cp.IsRootArray[name] - return result, func() { s.mu.Unlock() } -} - // setCheckpoint sets checkpoint from source to current state instance func (s *state) setCheckpoint(chkpt *Checkpoint) { - if chkpt.PartiallyProcessed == nil { - chkpt.PartiallyProcessed = make(map[string]int64) - } - if chkpt.IsRootArray == nil { - chkpt.IsRootArray = make(map[string]bool) - } s.cp = chkpt } From 7a3f0722e2e74445efce97cb577af1b274d53f7a Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 29 Sep 2023 14:03:14 +0530 Subject: [PATCH 11/11] updated changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a12246654d1..60c26d23609 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -221,7 +221,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659] - Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] -- Added support for new features in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] +- Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] - Improve template evaluation logging for HTTPJSON input. {pull}36668[36668] - Add CEL partial value debug function. {pull}36652[36652]