Skip to content

Commit

Permalink
total re-write that is much leaner and more effective
Browse files Browse the repository at this point in the history
  • Loading branch information
iamemilio committed Sep 20, 2023
1 parent 0b721dc commit 9147f9c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 126 deletions.
176 changes: 68 additions & 108 deletions v3/integrations/nramqp/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,131 +7,54 @@ import (

amqp "github.com/rabbitmq/amqp091-go"

"github.com/newrelic/go-agent/v3/internal"
"github.com/newrelic/go-agent/v3/internal/integrationsupport"
"github.com/newrelic/go-agent/v3/newrelic"
)

type Channel struct {
amqpChan *amqp.Channel
txn *newrelic.Transaction
// exchangeCache stores the name and the type of an exchange
exchangeCache map[string]string
}

func WrappedAMQPChannel(ch *amqp.Channel, txn *newrelic.Transaction) *Channel {
return &Channel{
amqpChan: ch,
txn: txn,
exchangeCache: map[string]string{},
}
}

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Exchange/Declare/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Exchange/Declare/Named/" + name
}
excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

ch.exchangeCache[name] = kind
return ch.amqpChan.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Queue/Declare/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Queue/Declare/Named/" + name
}
excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

return ch.amqpChan.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
txn := ch.txn

segmentName := "MessageBroker/RabbitMQ/Queue/Bind/Named/Default"
if name != "" {
segmentName = "MessageBroker/RabbitMQ/Queue/Bind/Named/" + name
}

excSegment := txn.StartSegment(segmentName)
defer excSegment.End()

err := ch.amqpChan.QueueBind(name, key, exchange, noWait, args)

txn.SetWebRequest(newrelic.WebRequest{
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
RoutingKey: key,
},
})

return err
}

// Consume creates a named segment that captures telemetry about this specific consume operation to the transaction.
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
txn := ch.txn

name := "MessageBroker/RabbitMQ/Queue/Consume/Named/Default"
if queue != "" {
if consumer != "" {
name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s/%s", queue, consumer)
} else {
name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s", queue)
}
}

consumeSegment := txn.StartSegment(name)
defer consumeSegment.End()

txn.SetWebRequest(newrelic.WebRequest{
Transport: newrelic.TransportAMQP,
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
QueueName: queue,
},
})

return ch.amqpChan.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
const (
RabbitMQLibrary = "RabbitMQ"
)

func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
txn := ch.txn
// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment.
// It will also inject distributed tracing headers into the message.
func WrappedPublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, destinationTemporary, mandatory, immediate bool, msg amqp.Publishing) error {
txn := newrelic.FromContext(ctx)
if txn != nil {
// Create copy of headers so that we can inject DT headers into original object
copyHeaders := amqp.Table{}
copyHeaders := map[string]string{}
for k, v := range msg.Headers {
copyHeaders[k] = v
copyHeaders[k] = fmt.Sprintf("%v", v)
}

injectDtHeaders(txn, msg.Headers)

exchangeName := "Default"
var destinationType newrelic.MessageDestinationType
if exchange != "" {
exchangeName = exchange
destinationType = newrelic.MessageExchange
} else if key != "" {
exchangeName = key
destinationType = newrelic.MessageQueue
}

s := newrelic.MessageProducerSegment{
StartTime: txn.StartSegmentNow(),
Library: RabbitMQLibrary,
DestinationName: exchangeName,
DestinationType: destinationType,
DestinationTemporary: destinationTemporary,
}
consumeSegment := txn.StartSegment("MessageBroker/RabbitMQ/Exchange/Produce/Named/" + exchangeName)
defer consumeSegment.End()

txn.SetWebRequest(newrelic.WebRequest{
Transport: newrelic.TransportAMQP,
Body: msg.Body,
MessageBrokerAttributes: newrelic.MessageBrokerAttributes{
RoutingKey: key,
Headers: copyHeaders,
ExchangeType: ch.exchangeCache[exchange],
ReplyTo: msg.ReplyTo,
CorrelationID: msg.CorrelationId,
},
})

integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, key, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo, nil)

defer s.End()
}

return ch.amqpChan.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
return ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
}

// Adds Distributed Tracing headers to the amqp table object
Expand All @@ -152,3 +75,40 @@ func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) {
headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState
}
}

func getDtHeaders(headers amqp.Table) http.Header {
headersHTTP := http.Header{}
for k, v := range headers {
headersHTTP.Set(k, fmt.Sprintf("%v", v))
}

return headersHTTP
}

func WrapConsume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (func(amqp.Delivery) *newrelic.Transaction, <-chan amqp.Delivery, error) {
var handler func(amqp.Delivery) *newrelic.Transaction
if app != nil {
handler = func(delivery amqp.Delivery) *newrelic.Transaction {
namer := internal.MessageMetricKey{
Library: RabbitMQLibrary,
DestinationType: string(newrelic.MessageQueue),
DestinationName: queue,
Consumer: true,
}

txn := app.StartTransaction(namer.Name())
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil)

hdrs := getDtHeaders(delivery.Headers)
txn.AcceptDistributedTraceHeaders(newrelic.TransportAMQP, hdrs)

return txn
}
}

msgChan, err := ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
return handler, msgChan, err
}
3 changes: 3 additions & 0 deletions v3/newrelic/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type ExternalSegment struct {
secureAgentEvent any
}

type MessageConsumerSegment struct {
}

// MessageProducerSegment instruments calls to add messages to a queueing system.
type MessageProducerSegment struct {
StartTime SegmentStartTime
Expand Down
22 changes: 4 additions & 18 deletions v3/newrelic/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,21 +589,10 @@ func (tt TransportType) toString() string {
}
}

// MessageBrokerAttributes is used to provide additional request information for RabbitMQ, AMQP and other message broker web requests
type MessageBrokerAttributes struct {
// the routing key of the message
RoutingKey string
// direct, fanout, topic, or headers
ExchangeType string
// the headers of the message without CAT keys/values
Headers map[string]any
// the name of the AMQP Queue this message was consumed from (only for consume operations)
QueueName string
// the callback queue used in RPC configurations
ReplyTo string
// the application-generated identifier used in RPC configurations
CorrelationID string
}
const (
RabbitMQMessageBrokerLibrary = "RabbitMQ"
ExchangeDestinationType = "Exchange"
)

// WebRequest is used to provide request information to Transaction.SetWebRequest.
type WebRequest struct {
Expand All @@ -622,9 +611,6 @@ type WebRequest struct {
// http.Header object and so must be passed separately.
Host string

// Attributes for message broker requests
MessageBrokerAttributes

// The following fields are needed for the secure agent's vulnerability
// detection features.
Body []byte
Expand Down

0 comments on commit 9147f9c

Please sign in to comment.