diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2e3b02c6d..a21f108b9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,6 +25,8 @@ jobs: dirs: v3/newrelic,v3/internal,v3/examples # v3 integrations + - go-version: 1.19.x + dirs: v3/integrations/nramqp - go-version: 1.19.x dirs: v3/integrations/nrsarama - go-version: 1.19.x diff --git a/v3/integrations/nramqp/examples/consumer/main.go b/v3/integrations/nramqp/examples/consumer/main.go new file mode 100644 index 000000000..5cfc92ec4 --- /dev/null +++ b/v3/integrations/nramqp/examples/consumer/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/newrelic/go-agent/v3/integrations/nramqp" + "github.com/newrelic/go-agent/v3/newrelic" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + panic(fmt.Sprintf("%s: %s\n", msg, err)) + } +} + +// a rabit mq server must be running on localhost on port 5672 +func main() { + nrApp, err := newrelic.NewApplication( + newrelic.ConfigAppName("AMQP Consumer Example App"), + newrelic.ConfigLicense(os.Getenv("NEW_RELIC_LICENSE_KEY")), + newrelic.ConfigInfoLogger(os.Stdout), + ) + + if err != nil { + panic(err) + } + + nrApp.WaitForConnection(time.Second * 5) + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "hello", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + var forever chan struct{} + + handleDelivery, msgs, err := nramqp.Consume(nrApp, ch, + q.Name, + "", + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args) + ) + failOnError(err, "Failed to register a consumer") + + go func() { + for d := range msgs { + txn := handleDelivery(d) + log.Printf("Received a message: %s\n", d.Body) + txn.End() + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever + + nrApp.Shutdown(time.Second * 10) +} diff --git a/v3/integrations/nramqp/examples/publisher/main.go b/v3/integrations/nramqp/examples/publisher/main.go new file mode 100644 index 000000000..445947a08 --- /dev/null +++ b/v3/integrations/nramqp/examples/publisher/main.go @@ -0,0 +1,124 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/newrelic/go-agent/v3/integrations/nramqp" + "github.com/newrelic/go-agent/v3/newrelic" + + amqp "github.com/rabbitmq/amqp091-go" +) + +var indexHTML = ` + + + + +

Send a Rabbit MQ Message

+ +
+
+
+ +
+ + + + ` + +func failOnError(err error, msg string) { + if err != nil { + panic(fmt.Sprintf("%s: %s\n", msg, err)) + } +} + +type amqpServer struct { + ch *amqp.Channel + exchange string + routingKey string +} + +func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer { + return &amqpServer{ + channel, + exchangeName, + routingKeyName, + } +} + +func (serv *amqpServer) index(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, indexHTML) +} + +func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // get the message from the HTTP form + r.ParseForm() + message := r.Form.Get("msg") + + err := nramqp.PublishWithContext(serv.ch, + ctx, + serv.exchange, // exchange + serv.routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(message), + }) + + if err != nil { + txn := newrelic.FromContext(ctx) + txn.NoticeError(err) + } + + serv.index(w, r) +} + +// a rabit mq server must be running on localhost on port 5672 +func main() { + nrApp, err := newrelic.NewApplication( + newrelic.ConfigAppName("AMQP Publisher Example App"), + newrelic.ConfigFromEnvironment(), + newrelic.ConfigInfoLogger(os.Stdout), + ) + + if err != nil { + panic(err) + } + + nrApp.WaitForConnection(time.Second * 5) + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + q, err := ch.QueueDeclare( + "hello", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + server := NewServer(ch, "", q.Name) + + http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index)) + http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage)) + + fmt.Println("\n\nlistening on: http://localhost:8000/") + http.ListenAndServe(":8000", nil) + + nrApp.Shutdown(time.Second * 10) +} diff --git a/v3/integrations/nramqp/go.mod b/v3/integrations/nramqp/go.mod new file mode 100644 index 000000000..c099244b4 --- /dev/null +++ b/v3/integrations/nramqp/go.mod @@ -0,0 +1,8 @@ +module github.com/newrelic/go-agent/v3/integrations/nramqp + +go 1.19 + +require ( + github.com/newrelic/go-agent/v3 v3.27.0 + github.com/rabbitmq/amqp091-go v1.9.0 +) diff --git a/v3/integrations/nramqp/headers.go b/v3/integrations/nramqp/headers.go new file mode 100644 index 000000000..d3604e493 --- /dev/null +++ b/v3/integrations/nramqp/headers.go @@ -0,0 +1,69 @@ +package nramqp + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/newrelic/go-agent/v3/newrelic" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + MaxHeaderLen = 4096 +) + +// Adds Distributed Tracing headers to the amqp table object +func injectDtHeaders(txn *newrelic.Transaction, headers amqp.Table) amqp.Table { + dummyHeaders := http.Header{} + + txn.InsertDistributedTraceHeaders(dummyHeaders) + if headers == nil { + headers = amqp.Table{} + } + + dtHeaders := dummyHeaders.Get(newrelic.DistributedTraceNewRelicHeader) + if dtHeaders != "" { + headers[newrelic.DistributedTraceNewRelicHeader] = dtHeaders + } + traceParent := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceParentHeader) + if traceParent != "" { + headers[newrelic.DistributedTraceW3CTraceParentHeader] = traceParent + } + traceState := dummyHeaders.Get(newrelic.DistributedTraceW3CTraceStateHeader) + if traceState != "" { + headers[newrelic.DistributedTraceW3CTraceStateHeader] = traceState + } + + return headers +} + +func toHeader(headers amqp.Table) http.Header { + headersHTTP := http.Header{} + if headers == nil { + return headersHTTP + } + + for k, v := range headers { + headersHTTP.Set(k, fmt.Sprintf("%v", v)) + } + + return headersHTTP +} + +func getHeadersAttributeString(hdrs amqp.Table) (string, error) { + if len(hdrs) == 0 { + return "", nil + } + + delete(hdrs, newrelic.DistributedTraceNewRelicHeader) + delete(hdrs, newrelic.DistributedTraceW3CTraceParentHeader) + delete(hdrs, newrelic.DistributedTraceW3CTraceStateHeader) + + if len(hdrs) == 0 { + return "", nil + } + + bytes, err := json.Marshal(hdrs) + return string(bytes), err +} diff --git a/v3/integrations/nramqp/headers_test.go b/v3/integrations/nramqp/headers_test.go new file mode 100644 index 000000000..b5ee3548b --- /dev/null +++ b/v3/integrations/nramqp/headers_test.go @@ -0,0 +1,296 @@ +package nramqp + +import ( + "encoding/json" + "testing" + "time" + + "github.com/newrelic/go-agent/v3/internal" + "github.com/newrelic/go-agent/v3/internal/integrationsupport" + "github.com/newrelic/go-agent/v3/newrelic" + + amqp "github.com/rabbitmq/amqp091-go" +) + +var replyFn = func(reply *internal.ConnectReply) { + reply.SetSampleEverything() + reply.AccountID = "123" + reply.TrustedAccountKey = "123" + reply.PrimaryAppID = "456" +} + +var cfgFn = func(cfg *newrelic.Config) { + cfg.Attributes.Include = append(cfg.Attributes.Include, + newrelic.AttributeMessageRoutingKey, + newrelic.AttributeMessageQueueName, + newrelic.AttributeMessageExchangeType, + newrelic.AttributeMessageReplyTo, + newrelic.AttributeMessageCorrelationID, + newrelic.AttributeMessageHeaders, + ) +} + +func createTestApp() integrationsupport.ExpectApp { + return integrationsupport.NewTestApp(replyFn, cfgFn, integrationsupport.ConfigFullTraces, newrelic.ConfigCodeLevelMetricsEnabled(false)) +} + +func TestAddHeaderAttribute(t *testing.T) { + app := createTestApp() + txn := app.StartTransaction("test") + + hdrs := amqp.Table{ + "str": "hello", + "int": 5, + "bool": true, + "nil": nil, + "time": time.Now(), + "bytes": []byte("a slice of bytes"), + "decimal": amqp.Decimal{Scale: 2, Value: 12345}, + "zero decimal": amqp.Decimal{Scale: 0, Value: 12345}, + } + attrStr, err := getHeadersAttributeString(hdrs) + if err != nil { + t.Fatal(err) + } + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageHeaders, attrStr, hdrs) + + txn.End() + + app.ExpectTxnTraces(t, []internal.WantTxnTrace{ + { + AgentAttributes: map[string]interface{}{ + newrelic.AttributeMessageHeaders: attrStr, + }, + }, + }) +} + +func TestInjectHeaders(t *testing.T) { + nrApp := createTestApp() + txn := nrApp.StartTransaction("test txn") + defer txn.End() + + msg := amqp.Publishing{} + msg.Headers = injectDtHeaders(txn, msg.Headers) + + if len(msg.Headers) != 3 { + t.Error("Expected DT headers to be injected into Headers object") + } +} + +func TestInjectHeadersPreservesExistingHeaders(t *testing.T) { + nrApp := createTestApp() + txn := nrApp.StartTransaction("test txn") + defer txn.End() + + msg := amqp.Publishing{ + Headers: amqp.Table{ + "one": 1, + "two": 2, + }, + } + msg.Headers = injectDtHeaders(txn, msg.Headers) + + if len(msg.Headers) != 5 { + t.Error("Expected DT headers to be injected into Headers object") + } +} + +func TestToHeader(t *testing.T) { + nrApp := createTestApp() + txn := nrApp.StartTransaction("test txn") + defer txn.End() + + msg := amqp.Publishing{ + Headers: amqp.Table{ + "one": 1, + "two": 2, + }, + } + msg.Headers = injectDtHeaders(txn, msg.Headers) + + hdr := toHeader(msg.Headers) + + if v := hdr.Get(newrelic.DistributedTraceNewRelicHeader); v == "" { + t.Errorf("header did not contain a DT header with the key %s", newrelic.DistributedTraceNewRelicHeader) + } + if v := hdr.Get(newrelic.DistributedTraceW3CTraceParentHeader); v == "" { + t.Errorf("header did not contain a DT header with the key %s", newrelic.DistributedTraceW3CTraceParentHeader) + } + if v := hdr.Get(newrelic.DistributedTraceW3CTraceStateHeader); v == "" { + t.Errorf("header did not contain a DT header with the key %s", newrelic.DistributedTraceW3CTraceStateHeader) + } +} + +func BenchmarkGetAttributeHeaders(b *testing.B) { + hdrs := amqp.Table{ + "str": "hello", + "int": 5, + "bool": true, + "nil": nil, + "time": time.Now(), + "bytes": []byte("a slice of bytes"), + "decimal": amqp.Decimal{Scale: 2, Value: 12345}, + "zero decimal": amqp.Decimal{Scale: 0, Value: 12345}, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + getHeadersAttributeString(hdrs) + } +} + +func TestGetAttributeHeaders(t *testing.T) { + ti := time.Now() + hdrs := amqp.Table{ + "str": "hello", + "int": 5, + "bool": true, + "nil": nil, + "time": ti, + "bytes": []byte("a slice of bytes"), + "decimal": amqp.Decimal{Scale: 2, Value: 12345}, + "zero decimal": amqp.Decimal{Scale: 0, Value: 12345}, + "array": []interface{}{5, true, "hi", ti}, + } + + hdrStr, err := getHeadersAttributeString(hdrs) + if err != nil { + t.Fatal(err) + } + + t.Log(hdrStr) + + var v map[string]any + err = json.Unmarshal([]byte(hdrStr), &v) + if err != nil { + t.Fatal(err) + } + + if len(v) != 9 { + t.Errorf("expected 6 key value pairs, but got %d", len(v)) + } + + _, ok := v["str"] + if !ok { + t.Error("string header key value pair was dropped") + } + + _, ok = v["bytes"] + if !ok { + t.Error("bytes header key value pair was dropped") + } + + _, ok = v["int"] + if !ok { + t.Error("int header key value pair was dropped") + } + + _, ok = v["bool"] + if !ok { + t.Error("bool header key value pair was dropped") + } + + _, ok = v["nil"] + if !ok { + t.Error("nil header key value pair was dropped") + } + + _, ok = v["decimal"] + if !ok { + t.Error("decimal header key value pair was dropped") + } + + _, ok = v["zero decimal"] + if !ok { + t.Error("zero decimal header key value pair was dropped") + } + + _, ok = v["array"] + if !ok { + t.Error("array header key value pair was dropped") + } + + _, ok = v["time"] + if !ok { + t.Error("time header key value pair was dropped") + } +} + +func TestGetAttributeHeadersEmpty(t *testing.T) { + hdrs := amqp.Table{} + + hdrStr, err := getHeadersAttributeString(hdrs) + if err != nil { + t.Fatal(err) + } + if hdrStr != "" { + t.Errorf("should return empty string for empty or nil header table, instead got: %s", hdrStr) + } +} + +func TestGetAttributeHeadersNil(t *testing.T) { + hdrStr, err := getHeadersAttributeString(nil) + if err != nil { + t.Fatal(err) + } + if hdrStr != "" { + t.Errorf("should return empty string for empty or nil header table, instead got: %s", hdrStr) + } +} + +func TestGetAttributeHeadersIgnoresDT(t *testing.T) { + app := createTestApp() + txn := app.StartTransaction("test") + defer txn.End() + + hdrs := amqp.Table{ + "str": "hello", + } + + injectDtHeaders(txn, hdrs) + + hdrStr, err := getHeadersAttributeString(hdrs) + if err != nil { + t.Fatal(err) + } + t.Log(hdrStr) + + var v map[string]any + err = json.Unmarshal([]byte(hdrStr), &v) + if err != nil { + t.Fatal(err) + } + + if len(v) != 1 { + t.Errorf("expected 1 key value pair, but got %d", len(v)) + } + + val, ok := v["str"] + if !ok { + t.Error("string header key value pair was dropped") + } else if val.(string) != "hello" { + t.Error("string header value was corrupted") + } +} + +func TestGetAttributeHeadersEmptyAfterStrippingDT(t *testing.T) { + app := createTestApp() + txn := app.StartTransaction("test") + defer txn.End() + + hdrs := amqp.Table{} + + injectDtHeaders(txn, hdrs) + + hdrStr, err := getHeadersAttributeString(hdrs) + if err != nil { + t.Fatal(err) + } + + if hdrStr != "" { + t.Errorf("expected an empty header string, but got: %s", hdrStr) + } +} diff --git a/v3/integrations/nramqp/nramqp.go b/v3/integrations/nramqp/nramqp.go new file mode 100644 index 000000000..7fd3ab258 --- /dev/null +++ b/v3/integrations/nramqp/nramqp.go @@ -0,0 +1,104 @@ +package nramqp + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/newrelic/go-agent/v3/internal" + "github.com/newrelic/go-agent/v3/internal/integrationsupport" + "github.com/newrelic/go-agent/v3/newrelic" +) + +const ( + RabbitMQLibrary = "RabbitMQ" +) + +func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment { + s := newrelic.MessageProducerSegment{ + Library: RabbitMQLibrary, + DestinationName: "Default", + DestinationType: newrelic.MessageQueue, + } + + if exchange != "" { + s.DestinationName = exchange + s.DestinationType = newrelic.MessageExchange + } else if key != "" { + s.DestinationName = key + } + + return &s +} + +// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment. +// It will also inject distributed tracing headers into the message. +func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + txn := newrelic.FromContext(ctx) + if txn != nil { + // generate message broker segment + s := creatProducerSegment(exchange, key) + + // capture telemetry for AMQP producer + if msg.Headers != nil && len(msg.Headers) > 0 { + hdrStr, err := getHeadersAttributeString(msg.Headers) + if err != nil { + return err + } + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr) + } + + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageRoutingKey, key) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId) + integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo) + + // inject DT headers into headers object + msg.Headers = injectDtHeaders(txn, msg.Headers) + + s.StartTime = txn.StartSegmentNow() + err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + s.End() + return err + } else { + return ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) + } +} + +// Consume performs a consume request on the provided amqp Channel, and returns a consume function, a consumer channel, and an error. +// The consumer function should be applied to each amqp Delivery that is read from the consume Channel, in order to collect tracing data +// on that message. The consume function will then return a transaction for that message. +func Consume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (func(amqp.Delivery) *newrelic.Transaction, <-chan amqp.Delivery, error) { + var handler func(amqp.Delivery) *newrelic.Transaction + if app != nil { + handler = func(delivery amqp.Delivery) *newrelic.Transaction { + namer := internal.MessageMetricKey{ + Library: RabbitMQLibrary, + DestinationType: string(newrelic.MessageExchange), + DestinationName: queue, + Consumer: true, + } + + txn := app.StartTransaction(namer.Name()) + + hdrs := toHeader(delivery.Headers) + txn.AcceptDistributedTraceHeaders(newrelic.TransportAMQP, hdrs) + + if delivery.Headers != nil && len(delivery.Headers) > 0 { + hdrStr, err := getHeadersAttributeString(delivery.Headers) + if err == nil { + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr, nil) + } + } + + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil) + integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil) + + return txn + } + } + + msgChan, err := ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) + return handler, msgChan, err +} diff --git a/v3/integrations/nramqp/nramqp_test.go b/v3/integrations/nramqp/nramqp_test.go new file mode 100644 index 000000000..3db9e4ce9 --- /dev/null +++ b/v3/integrations/nramqp/nramqp_test.go @@ -0,0 +1,78 @@ +package nramqp + +import ( + "testing" + + "github.com/newrelic/go-agent/v3/newrelic" +) + +func BenchmarkCreateProducerSegment(b *testing.B) { + app := createTestApp() + txn := app.StartTransaction("test") + defer txn.End() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + creatProducerSegment("exchange", "key") + } +} + +func TestCreateProducerSegment(t *testing.T) { + app := createTestApp() + txn := app.StartTransaction("test") + defer txn.End() + + type testObject struct { + exchange string + key string + expect newrelic.MessageProducerSegment + } + + tests := []testObject{ + { + "test exchange", + "", + newrelic.MessageProducerSegment{ + DestinationName: "test exchange", + DestinationType: newrelic.MessageExchange, + }, + }, + { + "", + "test queue", + newrelic.MessageProducerSegment{ + DestinationName: "test queue", + DestinationType: newrelic.MessageQueue, + }, + }, + { + "", + "", + newrelic.MessageProducerSegment{ + DestinationName: "Default", + DestinationType: newrelic.MessageQueue, + }, + }, + { + "test exchange", + "test queue", + newrelic.MessageProducerSegment{ + DestinationName: "test exchange", + DestinationType: newrelic.MessageExchange, + }, + }, + } + + for _, test := range tests { + s := creatProducerSegment(test.exchange, test.key) + if s.DestinationName != test.expect.DestinationName { + t.Errorf("expected destination name %s, got %s", test.expect.DestinationName, s.DestinationName) + } + if s.DestinationType != test.expect.DestinationType { + t.Errorf("expected destination type %s, got %s", test.expect.DestinationType, s.DestinationType) + } + } + +} diff --git a/v3/newrelic/attributes.go b/v3/newrelic/attributes.go index 258d23b4b..0221b5795 100644 --- a/v3/newrelic/attributes.go +++ b/v3/newrelic/attributes.go @@ -127,6 +127,8 @@ const ( AttributeMessageReplyTo = "message.replyTo" // The application-generated identifier used in RPC configurations. AttributeMessageCorrelationID = "message.correlationId" + // The headers of the message without CAT keys/values + AttributeMessageHeaders = "message.headers" ) // Attributes destined for Span Events. These attributes appear only on Span diff --git a/v3/newrelic/attributes_from_internal.go b/v3/newrelic/attributes_from_internal.go index b9a016300..64012b93b 100644 --- a/v3/newrelic/attributes_from_internal.go +++ b/v3/newrelic/attributes_from_internal.go @@ -49,6 +49,7 @@ var ( AttributeAWSLambdaEventSourceARN: usualDests, AttributeMessageRoutingKey: usualDests, AttributeMessageQueueName: usualDests, + AttributeMessageHeaders: usualDests, AttributeMessageExchangeType: destNone, AttributeMessageReplyTo: destNone, AttributeMessageCorrelationID: destNone, diff --git a/v3/newrelic/transaction.go b/v3/newrelic/transaction.go index d1d519c63..d553f68b1 100644 --- a/v3/newrelic/transaction.go +++ b/v3/newrelic/transaction.go @@ -352,6 +352,7 @@ func (txn *Transaction) InsertDistributedTraceHeaders(hdrs http.Header) { if txn == nil || txn.thread == nil { return } + txn.thread.CreateDistributedTracePayload(hdrs) }