From 9ae6ed1e3de11dad7597a6768b8e752f55c84e1c Mon Sep 17 00:00:00 2001 From: subham sarkar Date: Tue, 1 Oct 2024 22:52:26 +0530 Subject: [PATCH] x-pack/filebeat/input/salesforce: Bug fixes and improvements (#41015) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/salesforce/input.go | 100 +++++++++++------- .../filebeat/input/salesforce/input_test.go | 4 +- 3 files changed, 68 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e3a4111751a..8c5e0d74d5f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] - Fix replace processor handling of zero string replacement validation. {pull}40751[40751] - Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] +- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] *Heartbeat* diff --git a/x-pack/filebeat/input/salesforce/input.go b/x-pack/filebeat/input/salesforce/input.go index 2029429ace0..081b45dfd13 100644 --- a/x-pack/filebeat/input/salesforce/input.go +++ b/x-pack/filebeat/input/salesforce/input.go @@ -119,6 +119,7 @@ func (s *salesforceInput) Setup(env v2.Context, src inputcursor.Source, cursor * // and based on the configuration, it will run the different methods -- EventLogFile // or Object to collect events at defined intervals. func (s *salesforceInput) run() error { + s.log.Info("Starting Salesforce input run") if s.srcConfig.EventMonitoringMethod.EventLogFile.isEnabled() { err := s.RunEventLogFile() if err != nil { @@ -160,12 +161,18 @@ func (s *salesforceInput) run() error { case <-s.ctx.Done(): return s.isError(s.ctx.Err()) case <-eventLogFileTicker.C: + s.log.Info("Running EventLogFile collection") if err := s.RunEventLogFile(); err != nil { s.log.Errorf("Problem running EventLogFile collection: %s", err) + } else { + s.log.Info("EventLogFile collection completed successfully") } case <-objectMethodTicker.C: + s.log.Info("Running Object collection") if err := s.RunObject(); err != nil { s.log.Errorf("Problem running Object collection: %s", err) + } else { + s.log.Info("Object collection completed successfully") } } } @@ -181,6 +188,7 @@ func (s *salesforceInput) isError(err error) error { } func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) { + s.log.Info("Setting up Salesforce client connection") if s.sfdcConfig == nil { return nil, errors.New("internal error: salesforce configuration is not set properly") } @@ -188,8 +196,9 @@ func (s *salesforceInput) SetupSFClientConnection() (*soql.Resource, error) { // Open creates a session using the configuration. session, err := session.Open(*s.sfdcConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to open salesforce connection: %w", err) } + s.log.Info("Salesforce session opened successfully") // Set clientSession for re-use. s.clientSession = session @@ -209,8 +218,6 @@ func (s *salesforceInput) FormQueryWithCursor(queryConfig *QueryConfig, cursor m return nil, err } - s.log.Infof("Salesforce query: %s", qr) - return &querier{Query: qr}, err } @@ -222,7 +229,7 @@ func isZero[T comparable](v T) bool { // RunObject runs the Object method of the Event Monitoring API to collect events. func (s *salesforceInput) RunObject() error { - s.log.Debugf("scrape object(s) every %s", s.srcConfig.EventMonitoringMethod.Object.Interval) + s.log.Infof("Running Object collection with interval: %s", s.srcConfig.EventMonitoringMethod.Object.Interval) var cursor mapstr.M if !(isZero(s.cursor.Object.FirstEventTime) && isZero(s.cursor.Object.LastEventTime)) { @@ -241,6 +248,8 @@ func (s *salesforceInput) RunObject() error { return fmt.Errorf("error forming query based on cursor: %w", err) } + s.log.Infof("Query formed: %s", query.Query) + res, err := s.soqlr.Query(query, false) if err != nil { return err @@ -282,7 +291,7 @@ func (s *salesforceInput) RunObject() error { return err } } - s.log.Debugf("Total events: %d", totalEvents) + s.log.Infof("Total events: %d", totalEvents) return nil } @@ -290,7 +299,7 @@ func (s *salesforceInput) RunObject() error { // RunEventLogFile runs the EventLogFile method of the Event Monitoring API to // collect events. func (s *salesforceInput) RunEventLogFile() error { - s.log.Debugf("scrape eventLogFile(s) every %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval) + s.log.Infof("Running EventLogFile collection with interval: %s", s.srcConfig.EventMonitoringMethod.EventLogFile.Interval) var cursor mapstr.M if !(isZero(s.cursor.EventLogFile.FirstEventTime) && isZero(s.cursor.EventLogFile.LastEventTime)) { @@ -309,6 +318,8 @@ func (s *salesforceInput) RunEventLogFile() error { return fmt.Errorf("error forming query based on cursor: %w", err) } + s.log.Infof("Query formed: %s", query.Query) + res, err := s.soqlr.Query(query, false) if err != nil { return err @@ -324,9 +335,14 @@ func (s *salesforceInput) RunEventLogFile() error { totalEvents, firstEvent := 0, true for res.TotalSize() > 0 { for _, rec := range res.Records() { - req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+rec.Record().Fields()["LogFile"].(string), nil) + logfile, ok := rec.Record().Fields()["LogFile"].(string) + if !ok { + return fmt.Errorf("LogFile field not found or not a string in Salesforce event log file: %v", rec.Record().Fields()) + } + + req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, s.config.URL+logfile, nil) if err != nil { - return err + return fmt.Errorf("error creating request for log file: %w", err) } s.clientSession.AuthorizationHeader(req) @@ -341,19 +357,23 @@ func (s *salesforceInput) RunEventLogFile() error { resp, err := s.sfdcConfig.Client.Do(req) if err != nil { - return err + return fmt.Errorf("error fetching log file: %w", err) } - body, err := io.ReadAll(resp.Body) - if err != nil { + if resp.StatusCode != http.StatusOK { resp.Body.Close() - return err + return fmt.Errorf("unexpected status code %d for log file", resp.StatusCode) } + + body, err := io.ReadAll(resp.Body) resp.Body.Close() + if err != nil { + return fmt.Errorf("error reading log file body: %w", err) + } - recs, err := decodeAsCSV(body) + recs, err := s.decodeAsCSV(body) if err != nil { - return err + return fmt.Errorf("error decoding CSV: %w", err) } if timestamp, ok := rec.Record().Fields()[s.config.EventMonitoringMethod.EventLogFile.Cursor.Field].(string); ok { @@ -366,12 +386,11 @@ func (s *salesforceInput) RunEventLogFile() error { for _, val := range recs { jsonStrEvent, err := json.Marshal(val) if err != nil { - return err + return fmt.Errorf("error json marshaling event: %w", err) } - err = publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile") - if err != nil { - return err + if err := publishEvent(s.publisher, s.cursor, jsonStrEvent, "EventLogFile"); err != nil { + return fmt.Errorf("error publishing event: %w", err) } totalEvents++ } @@ -384,10 +403,10 @@ func (s *salesforceInput) RunEventLogFile() error { res, err = res.Next() if err != nil { - return err + return fmt.Errorf("error getting next page: %w", err) } } - s.log.Debugf("Total events: %d", totalEvents) + s.log.Infof("Total events processed: %d", totalEvents) return nil } @@ -405,6 +424,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error switch { case cfg.Auth.OAuth2.JWTBearerFlow != nil && cfg.Auth.OAuth2.JWTBearerFlow.isEnabled(): + s.log.Info("Using JWT Bearer Flow for authentication") pemBytes, err := os.ReadFile(cfg.Auth.OAuth2.JWTBearerFlow.ClientKeyPath) if err != nil { return nil, fmt.Errorf("problem with client key path for JWT auth: %w", err) @@ -428,6 +448,7 @@ func (s *salesforceInput) getSFDCConfig(cfg *config) (*sfdc.Configuration, error } case cfg.Auth.OAuth2.UserPasswordFlow != nil && cfg.Auth.OAuth2.UserPasswordFlow.isEnabled(): + s.log.Info("Using User Password Flow for authentication") passCreds := credentials.PasswordCredentials{ URL: cfg.Auth.OAuth2.UserPasswordFlow.TokenURL, Username: cfg.Auth.OAuth2.UserPasswordFlow.Username, @@ -533,21 +554,29 @@ type textContextError struct { body []byte } -// decodeAsCSV decodes p as a headed CSV document into dst. -func decodeAsCSV(p []byte) ([]map[string]string, error) { +// decodeAsCSV decodes the provided byte slice as a CSV and returns a slice of +// maps, where each map represents a row in the CSV with the header fields as +// keys and the row values as values. +func (s *salesforceInput) decodeAsCSV(p []byte) ([]map[string]string, error) { r := csv.NewReader(bytes.NewReader(p)) // To share the backing array for performance. r.ReuseRecord = true + // Lazy quotes are enabled to allow for quoted fields with commas. More flexible + // in handling CSVs. + // NOTE(shmsr): Although, we didn't face any issue with LazyQuotes == false, but I + // think we should keep it enabled to avoid any issues in the future. + r.LazyQuotes = true + // Header row is always expected, otherwise we can't map values to keys in // the event. header, err := r.Read() if err != nil { - if err == io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF. + if errors.Is(err, io.EOF) { return nil, nil } - return nil, err + return nil, fmt.Errorf("failed to read CSV header: %w", err) } // As buffer reuse is enabled, copying header is important. @@ -561,22 +590,21 @@ func decodeAsCSV(p []byte) ([]map[string]string, error) { // so that future records must have the same field count. // So, if len(header) != len(event), the Read will return an error and hence // we need not put an explicit check. - event, err := r.Read() - for ; err == nil; event, err = r.Read() { + for { + record, err := r.Read() if err != nil { - continue - } - o := make(map[string]string, len(header)) - for i, h := range header { - o[h] = event[i] + if errors.Is(err, io.EOF) { + break + } + s.log.Errorf("failed to read CSV record: %v\n%s", err, p) + return nil, textContextError{error: fmt.Errorf("failed to read CSV record: %w for: %v", err, record), body: p} } - results = append(results, o) - } - if err != nil { - if err != io.EOF { //nolint:errorlint // csv.Reader never wraps io.EOF. - return nil, textContextError{error: err, body: p} + event := make(map[string]string, len(header)) + for i, h := range header { + event[h] = record[i] } + results = append(results, event) } return results, nil diff --git a/x-pack/filebeat/input/salesforce/input_test.go b/x-pack/filebeat/input/salesforce/input_test.go index 5d4ef44bf43..3f3966b477d 100644 --- a/x-pack/filebeat/input/salesforce/input_test.go +++ b/x-pack/filebeat/input/salesforce/input_test.go @@ -467,7 +467,9 @@ func TestDecodeAsCSV(t *testing.T) { "Login","20231218054831.655","4u6LyuMrDvb_G-l1cJIQk-","00D5j00000DgAYG","0055j00000AT6I1","1219","127","/services/oauth2/token","","bY5Wfv8t/Ith7WVE","Standard","","1051271151","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:31.655Z","0055j00000AT6I1AAL","Salesforce.com IP","","LOGIN_NO_ERROR","103.108.207.58" "Login","20231218054832.003","4u6LyuHSDv8LLVl1cJOqGV","00D5j00000DgAYG","0055j00000AT6I1","1277","104","/services/oauth2/token","","u60el7VqW8CSSKcW","Standard","","674857427","i","Go-http-client/1.1","","9998.0","salesforceinstance@devtest.in","TLSv1.2","ECDHE-RSA-AES256-GCM-SHA384","","","2023-12-18T05:48:32.003Z","0055j00000AT6I1AAL","103.108.207.58","","LOGIN_NO_ERROR","103.108.207.58"` - mp, err := decodeAsCSV([]byte(sampleELF)) + s := &salesforceInput{log: logp.NewLogger("salesforceInput")} + + mp, err := s.decodeAsCSV([]byte(sampleELF)) assert.NoError(t, err) wantNumOfEvents := 2