From 1a6c291208d5604da37c786ca993512a67d279f7 Mon Sep 17 00:00:00 2001 From: Aman Date: Mon, 29 May 2017 17:07:37 +0530 Subject: [PATCH 1/3] Add option to process messages in parallel --- cmd/root.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index e744b10..cbbd029 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,6 +38,7 @@ var queueName string var url string var awsRegion string var sqsEndpoint string +var parallelRequests int // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -62,6 +63,7 @@ func init() { RootCmd.PersistentFlags().StringVarP(&url, "url", "u", "", "endpoint to send an HTTP POST request with contents of queue message in the body") RootCmd.PersistentFlags().StringVar(&awsRegion, "aws-region", "us-east-1", "AWS Region for the SQS queue") RootCmd.PersistentFlags().StringVar(&sqsEndpoint, "sqs-endpoint", "", "SQS Endpoint for using with fake_sqs") + RootCmd.PersistentFlags().IntVar(¶llelRequests, "parallel", 1, "Number of messages to be consumed in parallel") RootCmd.MarkPersistentFlagRequired("queuename") RootCmd.MarkPersistentFlagRequired("url") @@ -72,6 +74,7 @@ func init() { var svc *sqs.SQS var msgparams *sqs.ReceiveMessageInput var httpClient *http.Client +var sem chan *sqs.Message func startGohaqd(cmd *cobra.Command, args []string) { var config *aws.Config @@ -97,18 +100,30 @@ func startGohaqd(cmd *cobra.Command, args []string) { QueueUrl: q.QueueUrl, WaitTimeSeconds: aws.Int64(20), } + sem = make(chan *sqs.Message) + + for i := 0; i < parallelRequests; i++ { + go startConsumer(q.QueueUrl) + } + for { - pollSQS(q.QueueUrl) + pollSQS() } } -func pollSQS(queueURL *string) { +func pollSQS() { resp, err := svc.ReceiveMessage(msgparams) if err != nil { log.Fatalf(err.Error()) } for _, msg := range resp.Messages { + sem <- msg + } +} + +func startConsumer(queueURL *string) { + for msg := range sem { if sendMessageToURL(*msg.Body) { _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: queueURL, From ac0a0260cd4212bf2419ef0230524787bc978d6d Mon Sep 17 00:00:00 2001 From: Aman Date: Mon, 29 May 2017 17:12:15 +0530 Subject: [PATCH 2/3] Update README and CHANGELOG --- CHANGELOG.md | 8 ++++++++ README.md | 1 + 2 files changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1d38d2..6097fd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## v1.1 (May 29, 2017) + + * Add option to process messages in parallel ([#2](https://github.com/Codigami/gohaqd/pull/2), [@ApsOps](https://github.com/ApsOps)) + +## v1.0 (May 9, 2017) + + * Initial stable release + ## v0.3 (December 21, 2016) * Instantiate SQS object only once ([#5](https://github.com/ApsOps/gohaqd/pull/5), [@ApsOps](https://github.com/ApsOps)) diff --git a/README.md b/README.md index cbacdfb..6d7c1e0 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It pulls data off a queue, inserts it into the message body, and sends an HTTP P ## Flags: ``` --aws-region string AWS Region for the SQS queue (default "us-east-1") + --parallel int Number of messages to be consumed in parallel (default 1) -q, --queue-name string queue name to use --sqs-endpoint string SQS Endpoint for using with fake_sqs -u, --url string endpoint to send an HTTP POST request with contents of queue message in the body From 9140e2039feda06d8d4464228168db45f4154ead Mon Sep 17 00:00:00 2001 From: Aman Date: Tue, 30 May 2017 12:40:16 +0530 Subject: [PATCH 3/3] Add comments --- cmd/root.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/root.go b/cmd/root.go index cbbd029..64947cc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -100,17 +100,22 @@ func startGohaqd(cmd *cobra.Command, args []string) { QueueUrl: q.QueueUrl, WaitTimeSeconds: aws.Int64(20), } + + // Create semaphore channel for passing messages to consumers sem = make(chan *sqs.Message) + // Start multiple goroutines for consumers base on --parallel flag for i := 0; i < parallelRequests; i++ { go startConsumer(q.QueueUrl) } + // Infinitely poll SQS queue for messages for { pollSQS() } } +// Receives messages from SQS queue and adds to semaphore channel func pollSQS() { resp, err := svc.ReceiveMessage(msgparams) if err != nil { @@ -122,6 +127,8 @@ func pollSQS() { } } +// Receives messages from semaphore channel and +// deletes a message from SQS queue is it's consumed successfully func startConsumer(queueURL *string) { for msg := range sem { if sendMessageToURL(*msg.Body) { @@ -136,6 +143,7 @@ func startConsumer(queueURL *string) { } } +// Sends a POST request to consumption endpoint with the SQS message as body func sendMessageToURL(msg string) bool { var resp *http.Response var err error