Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP DT support for New Relic #780

Merged
merged 3 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
dirs: v3/newrelic,v3/internal,v3/examples

# v3 integrations
- go-version: 1.19.x
dirs: v3/integrations/nramqp
- go-version: 1.19.x
dirs: v3/integrations/nrsarama
- go-version: 1.19.x
Expand Down
78 changes: 78 additions & 0 deletions v3/integrations/nramqp/examples/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"log"
"os"
"time"

"github.com/newrelic/go-agent/v3/integrations/nramqp"
"github.com/newrelic/go-agent/v3/newrelic"

amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s\n", msg, err))
}
}

// a rabit mq server must be running on localhost on port 5672
func main() {
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName("AMQP Consumer Example App"),
newrelic.ConfigLicense(os.Getenv("NEW_RELIC_LICENSE_KEY")),
newrelic.ConfigInfoLogger(os.Stdout),
)

if err != nil {
panic(err)
}

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

var forever chan struct{}

handleDelivery, msgs, err := nramqp.Consume(nrApp, ch,
q.Name,
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args)
)
failOnError(err, "Failed to register a consumer")

go func() {
for d := range msgs {
txn := handleDelivery(d)
log.Printf("Received a message: %s\n", d.Body)
txn.End()
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

nrApp.Shutdown(time.Second * 10)
}
124 changes: 124 additions & 0 deletions v3/integrations/nramqp/examples/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/newrelic/go-agent/v3/integrations/nramqp"
"github.com/newrelic/go-agent/v3/newrelic"

amqp "github.com/rabbitmq/amqp091-go"
)

var indexHTML = `
<!DOCTYPE html>
<html>
<body>

<h1>Send a Rabbit MQ Message</h1>

<form>
<label for="msg">Message:</label><br>
<input type="text" id="msg" name="msg"><br>
<input type="submit" formaction="/message" value="Send">
</form>

</body>
</html>
`

func failOnError(err error, msg string) {
if err != nil {
panic(fmt.Sprintf("%s: %s\n", msg, err))
}
}

type amqpServer struct {
ch *amqp.Channel
exchange string
routingKey string
}

func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer {
return &amqpServer{
channel,
exchangeName,
routingKeyName,
}
}

func (serv *amqpServer) index(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, indexHTML)
}

func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// get the message from the HTTP form
r.ParseForm()
message := r.Form.Get("msg")

err := nramqp.PublishWithContext(serv.ch,
ctx,
serv.exchange, // exchange
serv.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})

if err != nil {
txn := newrelic.FromContext(ctx)
txn.NoticeError(err)
}

serv.index(w, r)
}

// a rabit mq server must be running on localhost on port 5672
func main() {
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName("AMQP Publisher Example App"),
newrelic.ConfigFromEnvironment(),
newrelic.ConfigInfoLogger(os.Stdout),
)

if err != nil {
panic(err)
}

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

server := NewServer(ch, "", q.Name)

http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index))
http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage))

fmt.Println("\n\nlistening on: http://localhost:8000/")
http.ListenAndServe(":8000", nil)

nrApp.Shutdown(time.Second * 10)
}
8 changes: 8 additions & 0 deletions v3/integrations/nramqp/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/newrelic/go-agent/v3/integrations/nramqp

go 1.19

require (
github.com/newrelic/go-agent/v3 v3.27.0
github.com/rabbitmq/amqp091-go v1.9.0
)
69 changes: 69 additions & 0 deletions v3/integrations/nramqp/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package nramqp

import (
"encoding/json"
"fmt"
"net/http"

"github.com/newrelic/go-agent/v3/newrelic"
amqp "github.com/rabbitmq/amqp091-go"
)

const (
MaxHeaderLen = 4096
)

// Adds Distributed Tracing headers to the amqp table object
func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) amqp.Table {
dummyHeaders := http.Header{}

txn.InsertDistributedTraceHeaders(dummyHeaders)
if headers == nil {
headers = amqp.Table{}
}

dtHeaders := dummyHeaders.Get(newrelic.DistributedTraceNewRelicHeader)
if dtHeaders != "" {
headers[newrelic.DistributedTraceNewRelicHeader] = dtHeaders
}
traceParent := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceParentHeader)
if traceParent != "" {
headers[newrelic.DistributedTraceW3CTraceParentHeader] = traceParent
}
traceState := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceStateHeader)
if traceState != "" {
headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState
}

return headers
}

func toHeader(headers amqp.Table) http.Header {
headersHTTP := http.Header{}
if headers == nil {
return headersHTTP
}

for k, v := range headers {
headersHTTP.Set(k, fmt.Sprintf("%v", v))
}

return headersHTTP
}

func getHeadersAttributeString(hdrs amqp.Table) (string, error) {
if len(hdrs) == 0 {
return "", nil
}

delete(hdrs, newrelic.DistributedTraceNewRelicHeader)
delete(hdrs, newrelic.DistributedTraceW3CTraceParentHeader)
delete(hdrs, newrelic.DistributedTraceW3CTraceStateHeader)

if len(hdrs) == 0 {
return "", nil
}

bytes, err := json.Marshal(hdrs)
return string(bytes), err
}
Loading
Loading