diff --git a/CHANGELOG.md b/CHANGELOG.md index c1d38d2..6097fd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## v1.1 (May 29, 2017) + + * Add option to process messages in parallel ([#2](https://github.com/Codigami/gohaqd/pull/2), [@ApsOps](https://github.com/ApsOps)) + +## v1.0 (May 9, 2017) + + * Initial stable release + ## v0.3 (December 21, 2016) * Instantiate SQS object only once ([#5](https://github.com/ApsOps/gohaqd/pull/5), [@ApsOps](https://github.com/ApsOps)) diff --git a/README.md b/README.md index cbacdfb..6d7c1e0 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It pulls data off a queue, inserts it into the message body, and sends an HTTP P ## Flags: ``` --aws-region string AWS Region for the SQS queue (default "us-east-1") + --parallel int Number of messages to be consumed in parallel (default 1) -q, --queue-name string queue name to use --sqs-endpoint string SQS Endpoint for using with fake_sqs -u, --url string endpoint to send an HTTP POST request with contents of queue message in the body diff --git a/cmd/root.go b/cmd/root.go index e744b10..64947cc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,6 +38,7 @@ var queueName string var url string var awsRegion string var sqsEndpoint string +var parallelRequests int // RootCmd represents the base command when called without any subcommands var RootCmd = &cobra.Command{ @@ -62,6 +63,7 @@ func init() { RootCmd.PersistentFlags().StringVarP(&url, "url", "u", "", "endpoint to send an HTTP POST request with contents of queue message in the body") RootCmd.PersistentFlags().StringVar(&awsRegion, "aws-region", "us-east-1", "AWS Region for the SQS queue") RootCmd.PersistentFlags().StringVar(&sqsEndpoint, "sqs-endpoint", "", "SQS Endpoint for using with fake_sqs") + RootCmd.PersistentFlags().IntVar(¶llelRequests, "parallel", 1, "Number of messages to be consumed in parallel") RootCmd.MarkPersistentFlagRequired("queuename") RootCmd.MarkPersistentFlagRequired("url") @@ -72,6 +74,7 @@ func init() { var svc *sqs.SQS var msgparams *sqs.ReceiveMessageInput var httpClient *http.Client +var sem chan *sqs.Message func startGohaqd(cmd *cobra.Command, args []string) { var config *aws.Config @@ -97,18 +100,37 @@ func startGohaqd(cmd *cobra.Command, args []string) { QueueUrl: q.QueueUrl, WaitTimeSeconds: aws.Int64(20), } + + // Create semaphore channel for passing messages to consumers + sem = make(chan *sqs.Message) + + // Start multiple goroutines for consumers base on --parallel flag + for i := 0; i < parallelRequests; i++ { + go startConsumer(q.QueueUrl) + } + + // Infinitely poll SQS queue for messages for { - pollSQS(q.QueueUrl) + pollSQS() } } -func pollSQS(queueURL *string) { +// Receives messages from SQS queue and adds to semaphore channel +func pollSQS() { resp, err := svc.ReceiveMessage(msgparams) if err != nil { log.Fatalf(err.Error()) } for _, msg := range resp.Messages { + sem <- msg + } +} + +// Receives messages from semaphore channel and +// deletes a message from SQS queue is it's consumed successfully +func startConsumer(queueURL *string) { + for msg := range sem { if sendMessageToURL(*msg.Body) { _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: queueURL, @@ -121,6 +143,7 @@ func pollSQS(queueURL *string) { } } +// Sends a POST request to consumption endpoint with the SQS message as body func sendMessageToURL(msg string) bool { var resp *http.Response var err error