Skip to content

Commit

Permalink
Merge pull request #2 from AlayaCare/feature/Allow_filtering_by_messa…
Browse files Browse the repository at this point in the history
…ge_attributes

Allows to filter by Message Attributes
  • Loading branch information
natenho authored Dec 30, 2023
2 parents 5cebacf + 3280e41 commit 340590b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 36 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Here are the available options:
- `-after`: Get only messages sent after a certain date in format 2006-01-02T15:04:05Z07:00.
- `-attribute-filter`: Filter messages by a certain attribute using JQ expression.
- `-body-filter`: Filter messages by JSON body field content using JQ expression.
- `-message-attribute-filter`: Filter messages by message attribute using JQ expression.

### Examples

Expand Down Expand Up @@ -76,9 +77,18 @@ aws-sqs-filter-redrive \
-attribute-filter '.AttributeName == "AttributeValue"'
```

#### Delete messages from a queue based on a message attribute. Message attributes have different value types, so you need to specify the type of the value you are filtering.
```bash
aws-sqs-filter-redrive \
-source https://sqs.us-east-1.amazonaws.com/123456789012/source-queue \
-delete \
-message-attribute-filter '.AttributeName.StringValue == "AttributeValue"' \
-count 10000 -polling-duration 60s
```

## Contributing

If you find a bug or have a feature request, please open an issue or submit a pull request. Contributions are always welcome!

## License
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fnatenho%2Faws-sqs-filter-redrive.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fnatenho%2Faws-sqs-filter-redrive?ref=badge_large)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fnatenho%2Faws-sqs-filter-redrive.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fnatenho%2Faws-sqs-filter-redrive?ref=badge_large)
27 changes: 14 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@ go 1.19

require (
github.com/aws/aws-sdk-go v1.44.253
github.com/aws/aws-sdk-go-v2/config v1.18.22
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.9
github.com/aws/aws-sdk-go-v2/config v1.25.10
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.1
github.com/google/uuid v1.3.0
github.com/itchyny/gojq v0.12.12
go.uber.org/ratelimit v0.2.0
)

require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.21 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.9 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.10 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.8 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.18.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.26.1 // indirect
github.com/aws/smithy-go v1.18.1 // indirect
github.com/itchyny/timefmt-go v0.1.5 // indirect
)
28 changes: 28 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,58 @@ github.com/aws/aws-sdk-go v1.44.253 h1:iqDd0okcH4ShfFexz2zzf4VmeDFf6NOMm07pHnEb8
github.com/aws/aws-sdk-go v1.44.253/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.23.4 h1:2P20ZjH0ouSAu/6yZep8oCmTReathLuEu6dwoqEgjts=
github.com/aws/aws-sdk-go-v2 v1.23.4/go.mod h1:t3szzKfP0NeRU27uBFczDivYJjsmSnqI8kIvKyWb9ds=
github.com/aws/aws-sdk-go-v2/config v1.18.22 h1:7vkUEmjjv+giht4wIROqLs+49VWmiQMMHSduxmoNKLU=
github.com/aws/aws-sdk-go-v2/config v1.18.22/go.mod h1:mN7Li1wxaPxSSy4Xkr6stFuinJGf3VZW3ZSNvO0q6sI=
github.com/aws/aws-sdk-go-v2/config v1.25.10 h1:qw/e8emDtNufTkrAU86DlQ18DruMyyM7ttW6Lgwp4v0=
github.com/aws/aws-sdk-go-v2/config v1.25.10/go.mod h1:203YiAtb6XyoGxXMPsUVwEcuxCiTQY/r8P27IDjfvMc=
github.com/aws/aws-sdk-go-v2/credentials v1.13.21 h1:VRiXnPEaaPeGeoFcXvMZOB5K/yfIXOYE3q97Kgb0zbU=
github.com/aws/aws-sdk-go-v2/credentials v1.13.21/go.mod h1:90Dk1lJoMyspa/EDUrldTxsPns0wn6+KpRKpdAWc0uA=
github.com/aws/aws-sdk-go-v2/credentials v1.16.8 h1:phw9nRLy/77bPk6Mfu2SHCOnHwfVB7WWrOa5rZIY2Fc=
github.com/aws/aws-sdk-go-v2/credentials v1.16.8/go.mod h1:MrS4SOin6adbO6wgWhdifyPiq+TX7fPPwyA/ZLC1F5M=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3 h1:jJPgroehGvjrde3XufFIJUZVK5A2L9a3KwSFgKy9n8w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.8 h1:tQZLSPC2Zj2CqZHonLmWEvCsbpMX5tQvaYJWHadcPek=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.8/go.mod h1:5+YpvTHDFffykWr5qAGjqwoh8oVYZOddL3sSrEN7lws=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.7 h1:eMqD7ku6WGdmcWWXPYun9m6yk6feSULLhJlAtN6rYG4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.7/go.mod h1:0oBIfcDV6LScxEW0VgOqxT3e4aqKRp+SYhB9wAd5E3Q=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.7 h1:+XYhWhgWs5F3Zx8oa49CXzNvfXrItaDjZB/M172fcHQ=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.7/go.mod h1:L6tcSRyCGxcKfDWUrmv2jv8G1cLDU7d0FUpEFpG9bVE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIotiQntNhsCFejawx8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 h1:uR9lXYjdPX0xY+NhvaJ4dD8rpSRz5VY81ccIIoNG+lw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.3 h1:e3PCNeEaev/ZF01cQyNZgmYE9oYYePIMJs2mWSKG514=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.3/go.mod h1:gIeeNyaL8tIEqZrzAnTeyhHcE0yysCtcaP+N9kxLZ+E=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.7 h1:dU+ZyhvqMB/T/TxjGagHMCdyUiqaThRIaMu3YvKiSQI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.7/go.mod h1:SGORuNqoXyWfTvTp/gBGJfv8jRvW/+nha0XhnIXVI+o=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.9 h1:fc/wDZYzYqTtWqOWybwyelGLayiNk6F1b+KEnxUICaY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.9/go.mod h1:ujUjm+PrcKUeIiKu2PT7MWjcyY0D6YZRZF3fSswiO+0=
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.1 h1:OZI2aJxnfOZzB0uhyTaYIW6MeRMb1Qd2eLMjh0bFsRg=
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.1/go.mod h1:GiU88YWgOho2cyEyS2YZo3GYz/j4etRYKWbJdcYgpuQ=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.9 h1:GAiaQWuQhQQui76KjuXeShmyXqECwQ0mGRMc/rwsL+c=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.9/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.1 h1:V40g2daNO3l1J94JYwqfkyvQMYXi5I25fs3fNQW8iDs=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.1/go.mod h1:0ZWQJP/mBOUxkCvZKybZNz1XmdUKSBxoF0dzgfxtvDs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.9 h1:TraLwncRJkWqtIBVKI/UqBymq4+hL+3MzUOtUATuzkA=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.9/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.1 h1:uQrj7SpUNC3r55vc1CDh3qV9wJC66lz546xM9dhSo5s=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.1/go.mod h1:oyaTk5xEAOuPXX1kCD7HmIeuLqdj3Bk5yGkqGXtGi14=
github.com/aws/aws-sdk-go-v2/service/sts v1.18.10 h1:6UbNM/KJhMBfOI5+lpVcJ/8OA7cBSz0O6OX37SRKlSw=
github.com/aws/aws-sdk-go-v2/service/sts v1.18.10/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/aws-sdk-go-v2/service/sts v1.26.1 h1:K33V7L0XDdb23FMOZySr8bon1jou5SHn1fiv7NJ1SUg=
github.com/aws/aws-sdk-go-v2/service/sts v1.26.1/go.mod h1:YtXUl/sfnS06VksYhr855hTQf2HphfT1Xv/EwuzbPjg=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.18.1 h1:pOdBTUfXNazOlxLrgeYalVnuTpKreACHtc62xLwIB3c=
github.com/aws/smithy-go v1.18.1/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
58 changes: 36 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,29 @@ var (
date = "unknown"
)
var (
dry = flag.Bool("dry", false, "dry run (only print messages that would be processed)")
delete = flag.Bool("delete", false, "delete messages from source queue")
move = flag.Bool("move", false, "move messages from source queue to target queue")
targetQueue = flag.String("target", "", "the queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
sourceQueue = flag.String("source", "", "the queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
beforeFilter = flag.String("before", "", "get only messages sent before a certain date in format 2006-01-02T15:04:05Z07:00")
afterFilter = flag.String("after", "", "get only messages sent after a certain date in format 2006-01-02T15:04:05Z07:00")
numMessages = flag.Int("count", 0, "maximum number of messages to process")
numWorkers = flag.Int("workers", 8, "number of workers")
bodyFilter = flag.String("body-filter", "", "filter messages by JSON body field content using JQ expression")
attributeFilter = flag.String("attribute-filter", "", "filter messages by a certain attribute using JQ expression")
rateLimit = flag.Int("rate-limit", 10, "Max number of messages processed per second")
pollingDuration = flag.Duration("polling-duration", 30*time.Second, "Polling duration")
versionFlag = flag.Bool("version", false, "Print version and exit")
dry = flag.Bool("dry", false, "dry run (only print messages that would be processed)")
delete = flag.Bool("delete", false, "delete messages from source queue")
move = flag.Bool("move", false, "move messages from source queue to target queue")
targetQueue = flag.String("target", "", "the queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
sourceQueue = flag.String("source", "", "the queue URL, e.g. https://sqs.us-east-1.amazonaws.com/123456789012/my-queue")
beforeFilter = flag.String("before", "", "get only messages sent before a certain date in format 2006-01-02T15:04:05Z07:00")
afterFilter = flag.String("after", "", "get only messages sent after a certain date in format 2006-01-02T15:04:05Z07:00")
numMessages = flag.Int("count", 0, "maximum number of messages to process")
numWorkers = flag.Int("workers", 8, "number of workers")
bodyFilter = flag.String("body-filter", "", "filter messages by JSON body field content using JQ expression")
attributeFilter = flag.String("attribute-filter", "", "filter messages by a certain attribute using JQ expression")
messageAttributeFilter = flag.String("message-attribute-filter", "", "filter messages by a certain message attribute using JQ expression")
rateLimit = flag.Int("rate-limit", 10, "Max number of messages processed per second")
pollingDuration = flag.Duration("polling-duration", 30*time.Second, "Polling duration")
versionFlag = flag.Bool("version", false, "Print version and exit")
)

var (
after time.Time
before time.Time
attributesQuery *gojq.Code
bodyQuery *gojq.Code
after time.Time
before time.Time
attributesQuery *gojq.Code
bodyQuery *gojq.Code
messageAttributesQuery *gojq.Code
)

const maxSQSBatchSize = 10
Expand Down Expand Up @@ -125,6 +127,13 @@ func validateFlags() error {
}
}

if *messageAttributeFilter != "" {
messageAttributesQuery, err = parseAndCompileJQ(*messageAttributeFilter)
if err != nil {
return fmt.Errorf("Invalid message attribute filter %w", err)
}
}

if *bodyFilter != "" {
bodyQuery, err = parseAndCompileJQ(*bodyFilter)
if err != nil {
Expand Down Expand Up @@ -239,7 +248,7 @@ func filterBatch(polledBatch []types.Message) []types.Message {

func printSQSMessages(batch []types.Message) {
for _, msg := range batch {
log.Printf("SentTimestamp:%v Attributes: %v Body: %v\n", getTimestamp(msg), toJSONMap(msg.Attributes), *msg.Body)
log.Printf("SentTimestamp:%v\n Attributes: %v\n MessageAttributes: %v\n Body: %v\n", getTimestamp(msg), toJSONMap(msg.Attributes), toJSONMap(msg.MessageAttributes), *msg.Body)
}
}

Expand Down Expand Up @@ -312,6 +321,10 @@ func shouldProcessMessage(msg types.Message) bool {
return false
}

if messageAttributesQuery != nil && !matchesJqQuery(toJSONMap(msg.MessageAttributes), messageAttributesQuery) {
return false
}

if bodyQuery != nil && !matchesJqQuery(toJSONMap(msg.Body), bodyQuery) {
return false
}
Expand Down Expand Up @@ -370,9 +383,10 @@ func toJSONMap(input any) (jsonMap map[string]any) {

func pollForMessageBatch(ctx context.Context, sqsClient *sqs.Client) ([]types.Message, error) {
result, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(*sourceQueue),
MaxNumberOfMessages: maxSQSBatchSize,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
QueueUrl: aws.String(*sourceQueue),
MaxNumberOfMessages: maxSQSBatchSize,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameAll},
MessageAttributeNames: []string{string(types.QueueAttributeNameAll)},
})

if err != nil {
Expand Down

0 comments on commit 340590b

Please sign in to comment.