Skip to content

Commit

Permalink
Merge pull request #780 from iamemilio/nramqp
Browse files Browse the repository at this point in the history
AMQP DT support for New Relic
  • Loading branch information
iamemilio authored Oct 19, 2023
2 parents 05ee293 + f896274 commit d54ae74
Show file tree
Hide file tree
Showing 11 changed files with 763 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
dirs: v3/newrelic,v3/internal,v3/examples

# v3 integrations
- go-version: 1.19.x
dirs: v3/integrations/nramqp
- go-version: 1.19.x
dirs: v3/integrations/nrsarama
- go-version: 1.19.x
Expand Down
78 changes: 78 additions & 0 deletions v3/integrations/nramqp/examples/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"log"
"os"
"time"

"github.com/newrelic/go-agent/v3/integrations/nramqp"
"github.com/newrelic/go-agent/v3/newrelic"

amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s\n", msg, err))
}
}

// a rabit mq server must be running on localhost on port 5672
func main() {
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName("AMQP Consumer Example App"),
newrelic.ConfigLicense(os.Getenv("NEW_RELIC_LICENSE_KEY")),
newrelic.ConfigInfoLogger(os.Stdout),
)

if err != nil {
panic(err)
}

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

var forever chan struct{}

handleDelivery, msgs, err := nramqp.Consume(nrApp, ch,
q.Name,
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args)
)
failOnError(err, "Failed to register a consumer")

go func() {
for d := range msgs {
txn := handleDelivery(d)
log.Printf("Received a message: %s\n", d.Body)
txn.End()
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

nrApp.Shutdown(time.Second * 10)
}
124 changes: 124 additions & 0 deletions v3/integrations/nramqp/examples/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/newrelic/go-agent/v3/integrations/nramqp"
"github.com/newrelic/go-agent/v3/newrelic"

amqp "github.com/rabbitmq/amqp091-go"
)

var indexHTML = `
<!DOCTYPE html>
<html>
<body>
<h1>Send a Rabbit MQ Message</h1>
<form>
<label for="msg">Message:</label><br>
<input type="text" id="msg" name="msg"><br>
<input type="submit" formaction="/message" value="Send">
</form>
</body>
</html>
`

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s\n", msg, err))
}
}

type amqpServer struct {
ch *amqp.Channel
exchange string
routingKey string
}

func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer {
return &amqpServer{
channel,
exchangeName,
routingKeyName,
}
}

func (serv *amqpServer) index(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, indexHTML)
}

func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// get the message from the HTTP form
r.ParseForm()
message := r.Form.Get("msg")

err := nramqp.PublishWithContext(serv.ch,
ctx,
serv.exchange, // exchange
serv.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})

if err != nil {
txn := newrelic.FromContext(ctx)
txn.NoticeError(err)
}

serv.index(w, r)
}

// a rabit mq server must be running on localhost on port 5672
func main() {
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName("AMQP Publisher Example App"),
newrelic.ConfigFromEnvironment(),
newrelic.ConfigInfoLogger(os.Stdout),
)

if err != nil {
panic(err)
}

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

server := NewServer(ch, "", q.Name)

http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index))
http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage))

fmt.Println("\n\nlistening on: http://localhost:8000/")
http.ListenAndServe(":8000", nil)

nrApp.Shutdown(time.Second * 10)
}
8 changes: 8 additions & 0 deletions v3/integrations/nramqp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/newrelic/go-agent/v3/integrations/nramqp

go 1.19

require (
github.com/newrelic/go-agent/v3 v3.27.0
github.com/rabbitmq/amqp091-go v1.9.0
)
69 changes: 69 additions & 0 deletions v3/integrations/nramqp/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package nramqp

import (
"encoding/json"
"fmt"
"net/http"

"github.com/newrelic/go-agent/v3/newrelic"
amqp "github.com/rabbitmq/amqp091-go"
)

const (
MaxHeaderLen = 4096
)

// Adds Distributed Tracing headers to the amqp table object
func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) amqp.Table {
dummyHeaders := http.Header{}

txn.InsertDistributedTraceHeaders(dummyHeaders)
if headers == nil {
headers = amqp.Table{}
}

dtHeaders := dummyHeaders.Get(newrelic.DistributedTraceNewRelicHeader)
if dtHeaders != "" {
headers[newrelic.DistributedTraceNewRelicHeader] = dtHeaders
}
traceParent := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceParentHeader)
if traceParent != "" {
headers[newrelic.DistributedTraceW3CTraceParentHeader] = traceParent
}
traceState := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceStateHeader)
if traceState != "" {
headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState
}

return headers
}

func toHeader(headers amqp.Table) http.Header {
headersHTTP := http.Header{}
if headers == nil {
return headersHTTP
}

for k, v := range headers {
headersHTTP.Set(k, fmt.Sprintf("%v", v))
}

return headersHTTP
}

func getHeadersAttributeString(hdrs amqp.Table) (string, error) {
if len(hdrs) == 0 {
return "", nil
}

delete(hdrs, newrelic.DistributedTraceNewRelicHeader)
delete(hdrs, newrelic.DistributedTraceW3CTraceParentHeader)
delete(hdrs, newrelic.DistributedTraceW3CTraceStateHeader)

if len(hdrs) == 0 {
return "", nil
}

bytes, err := json.Marshal(hdrs)
return string(bytes), err
}
Loading

0 comments on commit d54ae74

Please sign in to comment.