Skip to content

Commit

Permalink
staging commit nramqp
Browse files Browse the repository at this point in the history
  • Loading branch information
iamemilio committed Sep 13, 2023
1 parent e3d6c12 commit 0b721dc
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 39 deletions.
154 changes: 154 additions & 0 deletions v3/integrations/nramqp/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package nramqp

import (
"context"
"fmt"
"net/http"

amqp "github.com/rabbitmq/amqp091-go"

"github.com/newrelic/go-agent/v3/newrelic"
)

type Channel struct {
amqpChan *amqp.Channel
txn *newrelic.Transaction
// exchangeCache stores the name and the type of an exchange
exchangeCache map[string]string
}

func WrappedAMQPChannel(ch *amqp.Channel, txn *newrelic.Transaction) *Channel {
return &Channel{
amqpChan: ch,
txn: txn,
exchangeCache: map[string]string{},
}
}

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Exchange/Declare/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Exchange/Declare/Named/" + name
}
excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

ch.exchangeCache[name] = kind
return ch.amqpChan.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Queue/Declare/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Queue/Declare/Named/" + name
}
excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

return ch.amqpChan.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Queue/Bind/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Queue/Bind/Named/" + name
}

excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

err := ch.amqpChan.QueueBind(name, key, exchange, noWait, args)

txn.SetWebRequest(newrelic.WebRequest{
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
RoutingKey: key,
},
})

return err
}

// Consume creates a named segment that captures telemetry about this specific consume operation to the transaction.
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
txn := ch.txn

name := "MessageBroker/RabbitMQ/Queue/Consume/Named/Default"
if queue != "" {
if consumer != "" {
name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s/%s", queue, consumer)
} else {
name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s", queue)
}
}

consumeSegment := txn.StartSegment(name)
defer consumeSegment.End()

txn.SetWebRequest(newrelic.WebRequest{
Transport: newrelic.TransportAMQP,
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
QueueName: queue,
},
})

return ch.amqpChan.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}

func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
txn := ch.txn
if txn != nil {
// Create copy of headers so that we can inject DT headers into original object
copyHeaders := amqp.Table{}
for k, v := range msg.Headers {
copyHeaders[k] = v
}

injectDtHeaders(txn, msg.Headers)

exchangeName := "Default"
if exchange != "" {
exchangeName = exchange
}
consumeSegment := txn.StartSegment("MessageBroker/RabbitMQ/Exchange/Produce/Named/" + exchangeName)
defer consumeSegment.End()

txn.SetWebRequest(newrelic.WebRequest{
Transport: newrelic.TransportAMQP,
Body: msg.Body,
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
RoutingKey: key,
Headers: copyHeaders,
ExchangeType: ch.exchangeCache[exchange],
ReplyTo: msg.ReplyTo,
CorrelationID: msg.CorrelationId,
},
})
}

return ch.amqpChan.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
}

// Adds Distributed Tracing headers to the amqp table object
func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) {
dummyHeaders := http.Header{}
txn.InsertDistributedTraceHeaders(dummyHeaders)

dtHeaders := dummyHeaders.Get(newrelic.DistributedTraceNewRelicHeader)
if dtHeaders != "" {
headers[newrelic.DistributedTraceNewRelicHeader] = dtHeaders
}
traceParent := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceParentHeader)
if traceParent != "" {
headers[newrelic.DistributedTraceW3CTraceParentHeader] = traceParent
}
traceState := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceStateHeader)
if traceState != "" {
headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState
}
}
16 changes: 9 additions & 7 deletions v3/integrations/nramqp/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/newrelic/go-agent/v3/integrations/nramqp

go 1.21.0
go 1.19.0

require (
github.com/newrelic/go-agent/v3 v3.24.1
Expand All @@ -9,10 +9,12 @@ require (

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

replace github.com/newrelic/go-agent/v3 => ../../.
32 changes: 0 additions & 32 deletions v3/integrations/nramqp/nramqp.go

This file was deleted.

20 changes: 20 additions & 0 deletions v3/newrelic/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (txn *Transaction) InsertDistributedTraceHeaders(hdrs http.Header) {
if txn == nil || txn.thread == nil {
return
}

txn.thread.CreateDistributedTracePayload(hdrs)
}

Expand Down Expand Up @@ -588,6 +589,22 @@ func (tt TransportType) toString() string {
}
}

// MessageBrokerAttributes is used to provide additional request information for RabbitMQ, AMQP and other message broker web requests
type MessageBrokerAttributes struct {
// the routing key of the message
RoutingKey string
// direct, fanout, topic, or headers
ExchangeType string
// the headers of the message without CAT keys/values
Headers map[string]any
// the name of the AMQP Queue this message was consumed from (only for consume operations)
QueueName string
// the callback queue used in RPC configurations
ReplyTo string
// the application-generated identifier used in RPC configurations
CorrelationID string
}

// WebRequest is used to provide request information to Transaction.SetWebRequest.
type WebRequest struct {
// Header may be nil if you don't have any headers or don't want to
Expand All @@ -605,6 +622,9 @@ type WebRequest struct {
// http.Header object and so must be passed separately.
Host string

// Attributes for message broker requests
MessageBrokerAttributes

// The following fields are needed for the secure agent's vulnerability
// detection features.
Body []byte
Expand Down

0 comments on commit 0b721dc

Please sign in to comment.