diff --git a/v3/integrations/nramqp/channel.go b/v3/integrations/nramqp/channel.go index 559d4bb3a..22a7f35f1 100644 --- a/v3/integrations/nramqp/channel.go +++ b/v3/integrations/nramqp/channel.go @@ -7,131 +7,54 @@ import ( amqp "github.com/rabbitmq/amqp091-go" + "github.com/newrelic/go-agent/v3/internal" + "github.com/newrelic/go-agent/v3/internal/integrationsupport" "github.com/newrelic/go-agent/v3/newrelic" ) -type Channel struct { - amqpChan *amqp.Channel - txn *newrelic.Transaction - // exchangeCache stores the name and the type of an exchange - exchangeCache map[string]string -} - -func WrappedAMQPChannel(ch *amqp.Channel, txn *newrelic.Transaction) *Channel { - return &Channel{ - amqpChan: ch, - txn: txn, - exchangeCache: map[string]string{}, - } -} - -func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { - txn := ch.txn - - segmentName := "MessageBroker/RabbitMQ/Exchange/Declare/Named/Default" - if name != "" { - segmentName = "MessageBroker/RabbitMQ/Exchange/Declare/Named/" + name - } - excSegment := txn.StartSegment(segmentName) - defer excSegment.End() - - ch.exchangeCache[name] = kind - return ch.amqpChan.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args) -} - -func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { - txn := ch.txn - - segmentName := "MessageBroker/RabbitMQ/Queue/Declare/Named/Default" - if name != "" { - segmentName = "MessageBroker/RabbitMQ/Queue/Declare/Named/" + name - } - excSegment := txn.StartSegment(segmentName) - defer excSegment.End() - - return ch.amqpChan.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args) -} - -func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { - txn := ch.txn - - segmentName := "MessageBroker/RabbitMQ/Queue/Bind/Named/Default" - if name != "" { - segmentName = "MessageBroker/RabbitMQ/Queue/Bind/Named/" + name - } - - excSegment := txn.StartSegment(segmentName) - defer excSegment.End() - - err := ch.amqpChan.QueueBind(name, key, exchange, noWait, args) - - txn.SetWebRequest(newrelic.WebRequest{ - MessageBrokerAttributes: newrelic.MessageBrokerAttributes{ - RoutingKey: key, - }, - }) - - return err -} - -// Consume creates a named segment that captures telemetry about this specific consume operation to the transaction. -func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { - txn := ch.txn - - name := "MessageBroker/RabbitMQ/Queue/Consume/Named/Default" - if queue != "" { - if consumer != "" { - name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s/%s", queue, consumer) - } else { - name = fmt.Sprintf("MessageBroker/RabbitMQ/Queue/Consume/Named/%s", queue) - } - } - - consumeSegment := txn.StartSegment(name) - defer consumeSegment.End() - - txn.SetWebRequest(newrelic.WebRequest{ - Transport: newrelic.TransportAMQP, - MessageBrokerAttributes: newrelic.MessageBrokerAttributes{ - QueueName: queue, - }, - }) - - return ch.amqpChan.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) -} +const ( + RabbitMQLibrary = "RabbitMQ" +) -func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - txn := ch.txn +// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment. +// It will also inject distributed tracing headers into the message. +func WrappedPublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, destinationTemporary, mandatory, immediate bool, msg amqp.Publishing) error { + txn := newrelic.FromContext(ctx) if txn != nil { // Create copy of headers so that we can inject DT headers into original object - copyHeaders := amqp.Table{} + copyHeaders := map[string]string{} for k, v := range msg.Headers { - copyHeaders[k] = v + copyHeaders[k] = fmt.Sprintf("%v", v) } injectDtHeaders(txn, msg.Headers) exchangeName := "Default" + var destinationType newrelic.MessageDestinationType if exchange != "" { exchangeName = exchange + destinationType = newrelic.MessageExchange + } else if key != "" { + exchangeName = key + destinationType = newrelic.MessageQueue + } + + s := newrelic.MessageProducerSegment{ + StartTime: txn.StartSegmentNow(), + Library: RabbitMQLibrary, + DestinationName: exchangeName, + DestinationType: destinationType, + DestinationTemporary: destinationTemporary, } - consumeSegment := txn.StartSegment("MessageBroker/RabbitMQ/Exchange/Produce/Named/" + exchangeName) - defer consumeSegment.End() - - txn.SetWebRequest(newrelic.WebRequest{ - Transport: newrelic.TransportAMQP, - Body: msg.Body, - MessageBrokerAttributes: newrelic.MessageBrokerAttributes{ - RoutingKey: key, - Headers: copyHeaders, - ExchangeType: ch.exchangeCache[exchange], - ReplyTo: msg.ReplyTo, - CorrelationID: msg.CorrelationId, - }, - }) + + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, key, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo, nil) + + defer s.End() } - return ch.amqpChan.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + return ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) } // Adds Distributed Tracing headers to the amqp table object @@ -152,3 +75,40 @@ func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) { headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState } } + +func getDtHeaders(headers amqp.Table) http.Header { + headersHTTP := http.Header{} + for k, v := range headers { + headersHTTP.Set(k, fmt.Sprintf("%v", v)) + } + + return headersHTTP +} + +func WrapConsume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (func(amqp.Delivery) *newrelic.Transaction, <-chan amqp.Delivery, error) { + var handler func(amqp.Delivery) *newrelic.Transaction + if app != nil { + handler = func(delivery amqp.Delivery) *newrelic.Transaction { + namer := internal.MessageMetricKey{ + Library: RabbitMQLibrary, + DestinationType: string(newrelic.MessageQueue), + DestinationName: queue, + Consumer: true, + } + + txn := app.StartTransaction(namer.Name()) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil) + + hdrs := getDtHeaders(delivery.Headers) + txn.AcceptDistributedTraceHeaders(newrelic.TransportAMQP, hdrs) + + return txn + } + } + + msgChan, err := ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) + return handler, msgChan, err +} diff --git a/v3/newrelic/segments.go b/v3/newrelic/segments.go index 65344033a..1f4c71a95 100644 --- a/v3/newrelic/segments.go +++ b/v3/newrelic/segments.go @@ -130,6 +130,9 @@ type ExternalSegment struct { secureAgentEvent any } +type MessageConsumerSegment struct { +} + // MessageProducerSegment instruments calls to add messages to a queueing system. type MessageProducerSegment struct { StartTime SegmentStartTime diff --git a/v3/newrelic/transaction.go b/v3/newrelic/transaction.go index 32e203a94..3f75945ed 100644 --- a/v3/newrelic/transaction.go +++ b/v3/newrelic/transaction.go @@ -589,21 +589,10 @@ func (tt TransportType) toString() string { } } -// MessageBrokerAttributes is used to provide additional request information for RabbitMQ, AMQP and other message broker web requests -type MessageBrokerAttributes struct { - // the routing key of the message - RoutingKey string - // direct, fanout, topic, or headers - ExchangeType string - // the headers of the message without CAT keys/values - Headers map[string]any - // the name of the AMQP Queue this message was consumed from (only for consume operations) - QueueName string - // the callback queue used in RPC configurations - ReplyTo string - // the application-generated identifier used in RPC configurations - CorrelationID string -} +const ( + RabbitMQMessageBrokerLibrary = "RabbitMQ" + ExchangeDestinationType = "Exchange" +) // WebRequest is used to provide request information to Transaction.SetWebRequest. type WebRequest struct { @@ -622,9 +611,6 @@ type WebRequest struct { // http.Header object and so must be passed separately. Host string - // Attributes for message broker requests - MessageBrokerAttributes - // The following fields are needed for the secure agent's vulnerability // detection features. Body []byte