Skip to content

Commit

Permalink
Improve error messages in s3 input (elastic#18824)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored May 28, 2020
1 parent 0d356d8 commit ed0979c
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed")
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}

s3Infos = append(s3Infos, s3Info{
Expand Down Expand Up @@ -419,17 +419,17 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
// If the SDK can determine the request or retry delay was canceled
// by a context the ErrCodeRequestCanceled error will be returned.
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
err = errors.Wrap(err, "S3 GetObjectRequest canceled")
err = errors.Wrapf(err, "S3 GetObjectRequest canceled for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}

if awsErr.Code() == "NoSuchKey" {
p.logger.Warn("Cannot find s3 file")
p.logger.Warnf("Cannot find s3 file '%s' from S3 bucket '%s'", info.key, info.name)
return nil
}
}
return errors.Wrap(err, "S3 GetObjectRequest failed")
return errors.Wrapf(err, "S3 GetObjectRequest failed for '%s' from S3 bucket '%s'", info.key, info.name)
}

defer resp.Body.Close()
Expand All @@ -448,7 +448,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx)
if err != nil {
err = errors.Wrap(err, "decodeJSONWithKey failed")
err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
Expand All @@ -459,7 +459,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrap(err, "gzip.NewReader failed")
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
Expand Down Expand Up @@ -519,36 +519,37 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrap(err, "convertJSONToEvent failed")
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
p.logger.Warnf(fmt.Sprintf("Decode json failed for '%s', skipping this file: %s", s3Info.key, err))
err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name)
p.logger.Warn(err)
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
}
Expand All @@ -564,7 +565,7 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH

err = p.forwardEvent(event)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("forwardEvent failed"))
err = errors.Wrap(err, "forwardEvent failed")
p.logger.Error(err)
return err
}
Expand Down Expand Up @@ -596,7 +597,7 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled {
return nil
}
return errors.Wrap(err, "SQS DeleteMessageRequest failed")
return errors.Wrapf(err, "SQS DeleteMessageRequest failed in queue %s", queueURL)
}
return nil
}
Expand Down

0 comments on commit ed0979c

Please sign in to comment.