From 93f8432d8728ae922be75cc5270b4908a45cd827 Mon Sep 17 00:00:00 2001 From: Jacek Kucharczyk Date: Fri, 21 Jul 2023 23:11:57 +0200 Subject: [PATCH] pubsub/natspubsub: NATS v2.2.0+ native message headers and message encoding (#3282) --- internal/website/data/examples.json | 16 ++ pubsub/natspubsub/example_test.go | 77 +++++++ pubsub/natspubsub/go.mod | 8 +- pubsub/natspubsub/go.sum | 21 +- pubsub/natspubsub/nats.go | 338 ++++++++++++++++++++++++---- pubsub/natspubsub/nats_test.go | 257 ++++++++++++++++----- samples/go.mod | 2 +- samples/go.sum | 10 +- 8 files changed, 603 insertions(+), 126 deletions(-) diff --git a/internal/website/data/examples.json b/internal/website/data/examples.json index 4a0179ec35..a6a27d8c21 100644 --- a/internal/website/data/examples.json +++ b/internal/website/data/examples.json @@ -271,10 +271,18 @@ "imports": "import (\n\t\"context\"\n\n\t\"github.com/nats-io/nats.go\"\n\t\"gocloud.dev/pubsub/natspubsub\"\n)", "code": "natsConn, err := nats.Connect(\"nats://nats.example.com\")\nif err != nil {\n\treturn err\n}\ndefer natsConn.Close()\n\nsubscription, err := natspubsub.OpenSubscription(\n\tnatsConn,\n\t\"example.mysubject\",\n\tnil)\nif err != nil {\n\treturn err\n}\ndefer subscription.Shutdown(ctx)" }, + "gocloud.dev/pubsub/natspubsub.ExampleOpenSubscriptionV2": { + "imports": "import (\n\t\"context\"\n\n\t\"github.com/nats-io/nats.go\"\n\t\"gocloud.dev/pubsub/natspubsub\"\n)", + "code": "subscription, err := natspubsub.OpenSubscriptionV2(\n\tnatsConn,\n\t\"example.mysubject\",\n\tnil)\nif err != nil {\n\treturn err\n}\ndefer subscription.Shutdown(ctx)" + }, "gocloud.dev/pubsub/natspubsub.ExampleOpenTopic": { "imports": "import (\n\t\"context\"\n\n\t\"github.com/nats-io/nats.go\"\n\t\"gocloud.dev/pubsub/natspubsub\"\n)", "code": "natsConn, err := nats.Connect(\"nats://nats.example.com\")\nif err != nil {\n\treturn err\n}\ndefer natsConn.Close()\n\ntopic, err := natspubsub.OpenTopic(natsConn, \"example.mysubject\", nil)\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" }, + "gocloud.dev/pubsub/natspubsub.ExampleOpenTopicV2": { + "imports": "import (\n\t\"context\"\n\n\t\"github.com/nats-io/nats.go\"\n\t\"gocloud.dev/pubsub/natspubsub\"\n)", + "code": "natsConn, err := nats.Connect(\"nats://nats.example.com\")\nif err != nil {\n\treturn err\n}\ndefer natsConn.Close()\n\ntopic, err := natspubsub.OpenTopicV2(natsConn, \"example.mysubject\", nil)\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" + }, "gocloud.dev/pubsub/natspubsub.Example_openQueueSubscriptionFromURL": { "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/natspubsub\"\n)", "code": "// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.\n// This URL will Dial the NATS server at the URL in the environment variable\n// NATS_SERVER_URL and receive messages with subject \"example.mysubject\"\n// This URL will be parsed and the queue attribute will be used as the Queue parameter when creating the NATS Subscription.\nsubscription, err := pubsub.OpenSubscription(ctx, \"nats://example.mysubject?queue=myqueue\")\nif err != nil {\n\treturn err\n}\ndefer subscription.Shutdown(ctx)" @@ -283,10 +291,18 @@ "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/natspubsub\"\n)", "code": "// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.\n// This URL will Dial the NATS server at the URL in the environment variable\n// NATS_SERVER_URL and receive messages with subject \"example.mysubject\".\nsubscription, err := pubsub.OpenSubscription(ctx, \"nats://example.mysubject\")\nif err != nil {\n\treturn err\n}\ndefer subscription.Shutdown(ctx)" }, + "gocloud.dev/pubsub/natspubsub.Example_openSubscriptionV2FromURL": { + "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/natspubsub\"\n)", + "code": "// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.\n// This URL will Dial the NATS server at the URL in the environment variable\n// NATS_SERVER_URL and receive messages with subject \"example.mysubject\".\n// This URL will be parsed and the natsv2 attribute will be used to\n// use NATS v2.2.0+ native message headers as the message metadata.\nsubscription, err := pubsub.OpenSubscription(ctx, \"nats://example.mysubject?natsv2\")\nif err != nil {\n\treturn err\n}\ndefer subscription.Shutdown(ctx)" + }, "gocloud.dev/pubsub/natspubsub.Example_openTopicFromURL": { "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/natspubsub\"\n)", "code": "// pubsub.OpenTopic creates a *pubsub.Topic from a URL.\n// This URL will Dial the NATS server at the URL in the environment variable\n// NATS_SERVER_URL and send messages with subject \"example.mysubject\".\ntopic, err := pubsub.OpenTopic(ctx, \"nats://example.mysubject\")\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" }, + "gocloud.dev/pubsub/natspubsub.Example_openTopicV2FromURL": { + "imports": "import (\n\t\"context\"\n\n\t\"gocloud.dev/pubsub\"\n\t_ \"gocloud.dev/pubsub/natspubsub\"\n)", + "code": "// pubsub.OpenTopic creates a *pubsub.Topic from a URL.\n// This URL will Dial the NATS server at the URL in the environment variable\n// NATS_SERVER_URL and send messages with subject \"example.mysubject\".\n// This URL will be parsed and the natsv2 attribute will be used to\n// use NATS v2.2.0+ native message headers as the message metadata.\ntopic, err := pubsub.OpenTopic(ctx, \"nats://example.mysubject?natsv2\")\nif err != nil {\n\treturn err\n}\ndefer topic.Shutdown(ctx)" + }, "gocloud.dev/pubsub/rabbitpubsub.ExampleOpenSubscription": { "imports": "import (\n\t\"context\"\n\n\tamqp \"github.com/rabbitmq/amqp091-go\"\n\t\"gocloud.dev/pubsub/rabbitpubsub\"\n)", "code": "rabbitConn, err := amqp.Dial(\"amqp://guest:guest@localhost:5672/\")\nif err != nil {\n\treturn err\n}\ndefer rabbitConn.Close()\nsubscription := rabbitpubsub.OpenSubscription(rabbitConn, \"myqueue\", nil)\ndefer subscription.Shutdown(ctx)" diff --git a/pubsub/natspubsub/example_test.go b/pubsub/natspubsub/example_test.go index 684a3eb29e..397d29e012 100644 --- a/pubsub/natspubsub/example_test.go +++ b/pubsub/natspubsub/example_test.go @@ -19,6 +19,7 @@ import ( "log" "github.com/nats-io/nats.go" + "gocloud.dev/pubsub" "gocloud.dev/pubsub/natspubsub" ) @@ -131,3 +132,79 @@ func Example_openQueueSubscriptionFromURL() { } defer subscription.Shutdown(ctx) } + +func ExampleOpenSubscriptionV2() { + // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. + // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub" + // PRAGMA: On gocloud.dev, hide lines until the next blank line. + ctx := context.Background() + natsConn, err := nats.Connect("nats://nats.example.com") + if err != nil { + log.Fatal(err) + } + defer natsConn.Close() + + subscription, err := natspubsub.OpenSubscriptionV2( + natsConn, + "example.mysubject", + nil) + if err != nil { + log.Fatal(err) + } + defer subscription.Shutdown(ctx) +} + +func ExampleOpenTopicV2() { + // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. + // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub" + // PRAGMA: On gocloud.dev, hide lines until the next blank line. + ctx := context.Background() + + natsConn, err := nats.Connect("nats://nats.example.com") + if err != nil { + log.Fatal(err) + } + defer natsConn.Close() + + topic, err := natspubsub.OpenTopicV2(natsConn, "example.mysubject", nil) + if err != nil { + log.Fatal(err) + } + defer topic.Shutdown(ctx) +} + +func Example_openTopicV2FromURL() { + // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. + // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub" + // PRAGMA: On gocloud.dev, hide lines until the next blank line. + ctx := context.Background() + + // pubsub.OpenTopic creates a *pubsub.Topic from a URL. + // This URL will Dial the NATS server at the URL in the environment variable + // NATS_SERVER_URL and send messages with subject "example.mysubject". + // This URL will be parsed and the natsv2 attribute will be used to + // use NATS v2.2.0+ native message headers as the message metadata. + topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject?natsv2") + if err != nil { + log.Fatal(err) + } + defer topic.Shutdown(ctx) +} + +func Example_openSubscriptionV2FromURL() { + // PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored. + // PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub" + // PRAGMA: On gocloud.dev, hide lines until the next blank line. + ctx := context.Background() + + // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. + // This URL will Dial the NATS server at the URL in the environment variable + // NATS_SERVER_URL and receive messages with subject "example.mysubject". + // This URL will be parsed and the natsv2 attribute will be used to + // use NATS v2.2.0+ native message headers as the message metadata. + subscription, err := pubsub.OpenSubscription(ctx, "nats://example.mysubject?natsv2") + if err != nil { + log.Fatal(err) + } + defer subscription.Shutdown(ctx) +} diff --git a/pubsub/natspubsub/go.mod b/pubsub/natspubsub/go.mod index 6bf3eb786a..17577710b4 100644 --- a/pubsub/natspubsub/go.mod +++ b/pubsub/natspubsub/go.mod @@ -18,8 +18,8 @@ go 1.19 require ( github.com/google/go-cmp v0.5.9 - github.com/nats-io/nats-server/v2 v2.7.2 - github.com/nats-io/nats.go v1.27.1 + github.com/nats-io/nats-server/v2 v2.9.20 + github.com/nats-io/nats.go v1.28.0 gocloud.dev v0.32.0 ) @@ -28,8 +28,8 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/klauspost/compress v1.16.7 // indirect - github.com/minio/highwayhash v1.0.1 // indirect - github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.4.1 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect go.opencensus.io v0.24.0 // indirect diff --git a/pubsub/natspubsub/go.sum b/pubsub/natspubsub/go.sum index 835a6678cb..9c09a56629 100644 --- a/pubsub/natspubsub/go.sum +++ b/pubsub/natspubsub/go.sum @@ -47,15 +47,14 @@ github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56 github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= -github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI= -github.com/nats-io/nats-server/v2 v2.7.2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo= -github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k= +github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -72,7 +71,6 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -85,7 +83,6 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -100,10 +97,8 @@ golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= diff --git a/pubsub/natspubsub/nats.go b/pubsub/natspubsub/nats.go index ec09c5f865..645ff9ff85 100644 --- a/pubsub/natspubsub/nats.go +++ b/pubsub/natspubsub/nats.go @@ -23,6 +23,27 @@ // for the scheme "nats". // The default URL opener will connect to a default server based on the // environment variable "NATS_SERVER_URL". +// +// For servers that support it (NATS Server 2.2.0 or later), messages can +// be encoded using native NATS message headers, and native message content. +// This provides full support for non-Go clients. Versions prior to 2.2.0 +// uses gob.Encoder to encode the message headers and content, which limits +// the subscribers only to Go clients. +// To use this feature, set the query parameter "natsv2" in the URL. +// If no value is provided, it assumes the value is true. Otherwise, the value +// needs to be parsable as a boolean. For example: +// - nats://mysubject?natsv2 +// - nats://mysubject?natsv2=true +// +// This feature can also be enabled by setting the UseV2 field in the +// URLOpener. +// If the server does not support this feature, any attempt to use it will +// result in an error. +// Using native NATS message headers and content is more efficient than using +// gob.Encoder, and allows non-Go clients to subscribe to the topic and +// receive messages. It is recommended to use this feature if the server +// supports it. +// // To customize the URL opener, or for more details on the URL format, // see URLOpener. // See https://gocloud.dev/concepts/urls/ for background information. @@ -39,7 +60,7 @@ // natspubsub exposes the following types for As: // - Topic: *nats.Conn // - Subscription: *nats.Subscription -// - Message.BeforeSend: None. +// - Message.BeforeSend: None for v1, *nats.Msg for v2. // - Message.AfterSend: None. // - Message: *nats.Msg package natspubsub // import "gocloud.dev/pubsub/natspubsub" @@ -53,11 +74,14 @@ import ( "net/url" "os" "path" + "regexp" + "strconv" "strings" "sync" "time" "github.com/nats-io/nats.go" + "gocloud.dev/gcerrors" "gocloud.dev/pubsub" "gocloud.dev/pubsub/batcher" @@ -93,12 +117,14 @@ func init() { // defaultDialer dials a default NATS server based on the environment // variable "NATS_SERVER_URL". type defaultDialer struct { - init sync.Once - opener *URLOpener - err error + init sync.Once + err error + + opener URLOpener + openerV2 URLOpener } -func (o *defaultDialer) defaultConn(ctx context.Context) (*URLOpener, error) { +func (o *defaultDialer) defaultConn(ctx context.Context) error { o.init.Do(func() { serverURL := os.Getenv("NATS_SERVER_URL") if serverURL == "" { @@ -110,31 +136,76 @@ func (o *defaultDialer) defaultConn(ctx context.Context) (*URLOpener, error) { o.err = fmt.Errorf("failed to dial NATS_SERVER_URL %q: %v", serverURL, err) return } - o.opener = &URLOpener{Connection: conn} + o.opener = URLOpener{Connection: conn} + o.openerV2 = URLOpener{Connection: conn, UseV2: true} }) - return o.opener, o.err + return o.err +} + +type serverVersion struct { + major, minor, patch int } func (o *defaultDialer) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { - opener, err := o.defaultConn(ctx) + err := o.defaultConn(ctx) if err != nil { return nil, fmt.Errorf("open topic %v: failed to open default connection: %v", u, err) } - return opener.OpenTopicURL(ctx, u) + useV2, err := queryUseV2(u.Query()) + if err != nil { + return nil, fmt.Errorf("open topic %v: %v", u, err) + } + if useV2 { + return o.openerV2.OpenTopicURL(ctx, u) + } + return o.opener.OpenTopicURL(ctx, u) } func (o *defaultDialer) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { - opener, err := o.defaultConn(ctx) + err := o.defaultConn(ctx) if err != nil { return nil, fmt.Errorf("open subscription %v: failed to open default connection: %v", u, err) } - return opener.OpenSubscriptionURL(ctx, u) + useV2, err := queryUseV2(u.Query()) + if err != nil { + return nil, fmt.Errorf("open subscription %v: %v", u, err) + } + if useV2 { + return o.openerV2.OpenSubscriptionURL(ctx, u) + } + return o.opener.OpenSubscriptionURL(ctx, u) +} + +var semVerRegexp = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) + +func parseServerVersion(version string) (serverVersion, error) { + m := semVerRegexp.FindStringSubmatch(version) + if m == nil { + return serverVersion{}, errors.New("failed to parse server version") + } + var ( + major, minor, patch int + err error + ) + major, err = strconv.Atoi(m[1]) + if err != nil { + return serverVersion{}, fmt.Errorf("failed to parse server version major number %q: %v", m[1], err) + } + minor, err = strconv.Atoi(m[2]) + if err != nil { + return serverVersion{}, fmt.Errorf("failed to parse server version minor number %q: %v", m[2], err) + } + patch, err = strconv.Atoi(m[3]) + if err != nil { + return serverVersion{}, fmt.Errorf("failed to parse server version patch number %q: %v", m[3], err) + } + return serverVersion{major: major, minor: minor, patch: patch}, nil } // Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux. const Scheme = "nats" -// URLOpener opens NATS URLs like "nats://mysubject". +// URLOpener opens NATS URLs like "nats://mysubject?natsv2=true". // // The URL host+path is used as the subject. // @@ -146,14 +217,24 @@ type URLOpener struct { TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions + // UseV2 indicates whether the NATS Server is at least version 2.2.0. + UseV2 bool } +const natsV2QueryParameter = "natsv2" + // OpenTopicURL opens a pubsub.Topic based on u. func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) { for param := range u.Query() { + if strings.ToLower(param) == natsV2QueryParameter { + continue + } return nil, fmt.Errorf("open topic %v: invalid query parameter %s", u, param) } subject := path.Join(u.Host, u.Path) + if o.UseV2 { + return OpenTopicV2(o.Connection, subject, &o.TopicOptions) + } return OpenTopic(o.Connection, subject, &o.TopicOptions) } @@ -161,13 +242,22 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { opts := o.SubscriptionOptions for param, values := range u.Query() { - if strings.ToLower(param) == "queue" && values != nil { + switch strings.ToLower(param) { + case natsV2QueryParameter: + continue + case "queue": + if len(values) != 1 { + return nil, fmt.Errorf("open subscription %v: invalid query parameter %s", u, param) + } opts.Queue = values[0] - } else { + default: return nil, fmt.Errorf("open subscription %v: invalid query parameter %s", u, param) } } subject := path.Join(u.Host, u.Path) + if o.UseV2 { + return OpenSubscriptionV2(o.Connection, subject, &opts) + } return OpenSubscription(o.Connection, subject, &opts) } @@ -183,15 +273,30 @@ type SubscriptionOptions struct { } type topic struct { - nc *nats.Conn - subj string + useV2 bool + nc *nats.Conn + subj string } // OpenTopic returns a *pubsub.Topic for use with NATS. // The subject is the NATS Subject; for more info, see // https://nats.io/documentation/writing_applications/subjects. func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error) { - dt, err := openTopic(nc, subject) + dt, err := openTopic(nc, subject, false) + if err != nil { + return nil, err + } + return pubsub.NewTopic(dt, nil), nil +} + +// OpenTopicV2 returns a *pubsub.Topic for use with NATS at least version 2.2.0. +// This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. +// In previous versions the message headers were encoded along with the message content using gob.Encoder, +// which limits the subscribers only to Go clients. +// This implementation uses native NATS message headers, and native message content, which provides full support +// for non-Go clients. +func OpenTopicV2(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error) { + dt, err := openTopic(nc, subject, true) if err != nil { return nil, err } @@ -200,11 +305,21 @@ func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, e // openTopic returns the driver for OpenTopic. This function exists so the test // harness can get the driver interface implementation if it needs to. -func openTopic(nc *nats.Conn, subject string) (driver.Topic, error) { +func openTopic(nc *nats.Conn, subject string, useV2 bool) (driver.Topic, error) { if nc == nil { return nil, errors.New("natspubsub: nats.Conn is required") } - return &topic{nc, subject}, nil + if useV2 { + sv, err := parseServerVersion(nc.ConnectedServerVersion()) + if err != nil { + return nil, fmt.Errorf("failed to parse NATS server version %q: %v", nc.ConnectedServerVersion(), err) + } + // Check if the server version is at least 2.2.0. + if sv.major < 2 && sv.minor < 2 { + return nil, fmt.Errorf("natspubsub: NATS server version %q is not supported", nc.ConnectedServerVersion()) + } + } + return &topic{nc: nc, subj: subject, useV2: useV2}, nil } // SendBatch implements driver.Topic.SendBatch. @@ -214,30 +329,19 @@ func (t *topic) SendBatch(ctx context.Context, msgs []*driver.Message) error { } for _, m := range msgs { - if err := ctx.Err(); err != nil { - return err - } - // TODO(jba): benchmark message encoding to see if it's - // worth reusing a buffer. - payload, err := encodeMessage(m) + err := ctx.Err() if err != nil { return err } - if m.BeforeSend != nil { - asFunc := func(i interface{}) bool { return false } - if err := m.BeforeSend(asFunc); err != nil { - return err - } + + if t.useV2 { + err = t.sendMessageV2(m) + } else { + err = t.sendMessage(m) } - if err := t.nc.Publish(t.subj, payload); err != nil { + if err != nil { return err } - if m.AfterSend != nil { - asFunc := func(i interface{}) bool { return false } - if err := m.AfterSend(asFunc); err != nil { - return err - } - } } // Per specification this is supposed to only return after // a message has been sent. Normally NATS is very efficient @@ -249,6 +353,59 @@ func (t *topic) SendBatch(ctx context.Context, msgs []*driver.Message) error { return nil } +func (t *topic) sendMessage(m *driver.Message) error { + // TODO(jba): benchmark message encoding to see if it's + // worth reusing a buffer. + payload, err := encodeMessage(m) + if err != nil { + return err + } + if m.BeforeSend != nil { + asFunc := func(i interface{}) bool { return false } + if err := m.BeforeSend(asFunc); err != nil { + return err + } + } + if err = t.nc.Publish(t.subj, payload); err != nil { + return err + } + if m.AfterSend != nil { + asFunc := func(i interface{}) bool { return false } + if err := m.AfterSend(asFunc); err != nil { + return err + } + } + return nil +} + +func (t *topic) sendMessageV2(m *driver.Message) error { + msg := encodeMessageV2(m, t.subj) + if m.BeforeSend != nil { + asFunc := func(i interface{}) bool { + if nm, ok := i.(**nats.Msg); ok { + *nm = msg + return true + } + return false + } + if err := m.BeforeSend(asFunc); err != nil { + return err + } + } + + if err := t.nc.PublishMsg(msg); err != nil { + return err + } + + if m.AfterSend != nil { + asFunc := func(i interface{}) bool { return false } + if err := m.AfterSend(asFunc); err != nil { + return err + } + } + return nil +} + // IsRetryable implements driver.Topic.IsRetryable. func (*topic) IsRetryable(error) bool { return false } @@ -290,6 +447,7 @@ func (*topic) ErrorCode(err error) gcerrors.ErrorCode { func (*topic) Close() error { return nil } type subscription struct { + useV2 bool nc *nats.Conn nsub *nats.Subscription nextID int @@ -299,14 +457,29 @@ type subscription struct { // The subject is the NATS Subject to subscribe to; // for more info, see https://nats.io/documentation/writing_applications/subjects. func OpenSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error) { - ds, err := openSubscription(nc, subject, opts) + ds, err := openSubscription(nc, subject, opts, false) + if err != nil { + return nil, err + } + return pubsub.NewSubscription(ds, recvBatcherOpts, nil), nil +} + +// OpenSubscriptionV2 returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription +// for use with NATS at least version 2.2.0. +// This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. +// In previous versions the message headers were encoded along with the message content using gob.Encoder, +// which limits the subscribers only to Go clients. +// This implementation uses native NATS message headers, and native message content, which provides full support +// for non-Go clients. +func OpenSubscriptionV2(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error) { + ds, err := openSubscription(nc, subject, opts, true) if err != nil { return nil, err } return pubsub.NewSubscription(ds, recvBatcherOpts, nil), nil } -func openSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) (driver.Subscription, error) { +func openSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions, useV2 bool) (driver.Subscription, error) { var sub *nats.Subscription var err error if opts != nil && opts.Queue != "" { @@ -317,11 +490,11 @@ func openSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) if err != nil { return nil, err } - return &subscription{nc, sub, 1}, nil + return &subscription{nc: nc, nsub: sub, nextID: 1, useV2: useV2}, nil } // ReceiveBatch implements driver.ReceiveBatch. -func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { +func (s *subscription) ReceiveBatch(ctx context.Context, _ int) ([]*driver.Message, error) { if s == nil || s.nsub == nil { return nil, nats.ErrBadSubscription } @@ -333,7 +506,13 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr } return nil, err } - dm, err := decode(msg) + + var dm *driver.Message + if s.useV2 { + dm, err = decodeMessageV2(msg) + } else { + dm, err = decode(msg) + } if err != nil { return nil, err } @@ -357,7 +536,7 @@ func decode(msg *nats.Msg) (*driver.Message, error) { } func messageAsFunc(msg *nats.Msg) func(interface{}) bool { - return func(i interface{}) bool { + return func(i any) bool { p, ok := i.(**nats.Msg) if !ok { return false @@ -450,3 +629,76 @@ func decodeMessage(data []byte, dm *driver.Message) error { } return dec.Decode(&dm.Body) } + +func queryUseV2(q url.Values) (bool, error) { + if len(q) == 0 { + return false, nil + } + v, ok := q[natsV2QueryParameter] + if !ok { + return false, nil + } + + if len(v) == 0 { + // If the query parameter was provided without any value i.e. nats://mysubject?natsv2 + // it assumes the value is true. + return true, nil + } + if len(v) > 1 { + return false, fmt.Errorf("invalid query parameter %s - multiple values provided", natsV2QueryParameter) + } + if v[0] == "" { + return true, nil + } + useV2, err := strconv.ParseBool(v[0]) + if err != nil { + return false, fmt.Errorf("invalid query parameter %s - value either needs to be parsable as a boolean or empty", natsV2QueryParameter) + } + return useV2, nil +} + +func encodeMessageV2(dm *driver.Message, sub string) *nats.Msg { + var header nats.Header + if dm.Metadata != nil { + header = nats.Header{} + for k, v := range dm.Metadata { + header[url.QueryEscape(k)] = []string{url.QueryEscape(v)} + } + } + return &nats.Msg{ + Subject: sub, + Data: dm.Body, + Header: header, + } +} + +func decodeMessageV2(msg *nats.Msg) (*driver.Message, error) { + if msg == nil { + return nil, nats.ErrInvalidMsg + } + + dm := driver.Message{ + AsFunc: messageAsFunc(msg), + Body: msg.Data, + } + + if msg.Header != nil { + dm.Metadata = map[string]string{} + for k, v := range msg.Header { + var sv string + if len(v) > 0 { + sv = v[0] + } + kb, err := url.QueryUnescape(k) + if err != nil { + return nil, err + } + vb, err := url.QueryUnescape(sv) + if err != nil { + return nil, err + } + dm.Metadata[kb] = vb + } + } + return &dm, nil +} diff --git a/pubsub/natspubsub/nats_test.go b/pubsub/natspubsub/nats_test.go index 97387e6098..d41ba50954 100644 --- a/pubsub/natspubsub/nats_test.go +++ b/pubsub/natspubsub/nats_test.go @@ -38,8 +38,9 @@ const ( ) type harness struct { - s *server.Server - nc *nats.Conn + s *server.Server + nc *nats.Conn + useV2 bool } func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) { @@ -50,12 +51,23 @@ func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) { if err != nil { return nil, err } - return &harness{s, nc}, nil + return &harness{s: s, nc: nc, useV2: false}, nil +} + +func newHarnessV2(ctx context.Context, t *testing.T) (drivertest.Harness, error) { + opts := gnatsd.DefaultTestOptions + opts.Port = testPort + s := gnatsd.RunServer(&opts) + nc, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", testPort)) + if err != nil { + return nil, err + } + return &harness{s: s, nc: nc, useV2: true}, nil } func (h *harness) CreateTopic(ctx context.Context, testName string) (driver.Topic, func(), error) { cleanup := func() {} - dt, err := openTopic(h.nc, testName) + dt, err := openTopic(h.nc, testName, h.useV2) if err != nil { return nil, nil, err } @@ -68,7 +80,7 @@ func (h *harness) MakeNonexistentTopic(ctx context.Context) (driver.Topic, error } func (h *harness) CreateSubscription(ctx context.Context, dt driver.Topic, testName string) (driver.Subscription, func(), error) { - ds, err := openSubscription(h.nc, testName, nil) + ds, err := openSubscription(h.nc, testName, nil, h.useV2) if err != nil { return nil, nil, err } @@ -82,7 +94,7 @@ func (h *harness) CreateSubscription(ctx context.Context, dt driver.Topic, testN } func (h *harness) CreateQueueSubscription(ctx context.Context, dt driver.Topic, testName string) (driver.Subscription, func(), error) { - ds, err := openSubscription(h.nc, testName, &SubscriptionOptions{Queue: testName}) + ds, err := openSubscription(h.nc, testName, &SubscriptionOptions{Queue: testName}, h.useV2) if err != nil { return nil, nil, err } @@ -108,7 +120,9 @@ func (h *harness) MaxBatchSizes() (int, int) { return 0, 0 } func (*harness) SupportsMultipleSubscriptions() bool { return true } -type natsAsTest struct{} +type natsAsTest struct { + useV2 bool +} func (natsAsTest) Name() string { return "nats test" @@ -166,7 +180,19 @@ func (natsAsTest) MessageCheck(m *pubsub.Message) error { return nil } -func (natsAsTest) BeforeSend(as func(interface{}) bool) error { +func (n natsAsTest) BeforeSend(as func(interface{}) bool) error { + if !n.useV2 { + return nil + } + var pm nats.Msg + if as(&pm) { + return fmt.Errorf("cast succeeded for %T, want failure", &pm) + } + + var ppm *nats.Msg + if !as(&ppm) { + return fmt.Errorf("cast failed for %T", &ppm) + } return nil } @@ -179,6 +205,11 @@ func TestConformance(t *testing.T) { drivertest.RunConformanceTests(t, newHarness, asTests) } +func TestConformanceV2(t *testing.T) { + asTests := []drivertest.AsTest{natsAsTest{useV2: true}} + drivertest.RunConformanceTests(t, newHarnessV2, asTests) +} + // These are natspubsub specific to increase coverage. // If we only send a body we should be able to get that from a direct NATS subscriber. @@ -231,6 +262,65 @@ func TestInteropWithDirectNATS(t *testing.T) { } } +// These are natspubsub specific to increase coverage. + +// If we only send a body we should be able to get that from a direct NATS subscriber. +func TestInteropWithDirectNATSV2(t *testing.T) { + ctx := context.Background() + dh, err := newHarnessV2(ctx, t) + if err != nil { + t.Fatal(err) + } + defer dh.Close() + conn := dh.(*harness).nc + + const topic = "foo" + // In version V2 we can use metadata which will be natively used in the nats message. + md := map[string]string{"a": "1", "b": "2", "c": "3"} + body := []byte("hello") + + // Send a message using Go CDK and receive it using NATS directly. + pt, err := OpenTopicV2(conn, topic, nil) + if err != nil { + t.Fatal(err) + } + defer pt.Shutdown(ctx) + nsub, _ := conn.SubscribeSync(topic) + if err = pt.Send(ctx, &pubsub.Message{Body: body, Metadata: md}); err != nil { + t.Fatal(err) + } + m, err := nsub.NextMsgWithContext(ctx) + if err != nil { + t.Fatalf(err.Error()) + } + if !bytes.Equal(m.Data, body) { + t.Fatalf("Data did not match. %q vs %q\n", m.Data, body) + } + for k, v := range md { + if m.Header.Get(k) != v { + t.Fatalf("Metadata %q did not match. %q vs %q\n", k, m.Header.Get(k), v) + } + } + + // Send a message using NATS directly and receive it using Go CDK. + ps, err := OpenSubscriptionV2(conn, topic, nil) + if err != nil { + t.Fatal(err) + } + defer ps.Shutdown(ctx) + if err := conn.Publish(topic, body); err != nil { + t.Fatal(err) + } + msg, err := ps.Receive(ctx) + if err != nil { + t.Fatal(err) + } + defer msg.Ack() + if !bytes.Equal(msg.Body, body) { + t.Fatalf("Data did not match. %q vs %q\n", m.Data, body) + } +} + func TestErrorCode(t *testing.T) { ctx := context.Background() dh, err := newHarness(ctx, t) @@ -241,7 +331,7 @@ func TestErrorCode(t *testing.T) { h := dh.(*harness) // Topics - dt, err := openTopic(h.nc, "bar") + dt, err := openTopic(h.nc, "bar", false) if err != nil { t.Fatal(err) } @@ -266,7 +356,7 @@ func TestErrorCode(t *testing.T) { } // Subscriptions - ds, err := openSubscription(h.nc, "bar", nil) + ds, err := openSubscription(h.nc, "bar", nil, false) if err != nil { t.Fatal(err) } @@ -299,7 +389,7 @@ func TestErrorCode(t *testing.T) { } // Queue Subscription - qs, err := openSubscription(h.nc, "bar", &SubscriptionOptions{Queue: t.Name()}) + qs, err := openSubscription(h.nc, "bar", &SubscriptionOptions{Queue: t.Name()}, false) if err != nil { t.Fatal(err) } @@ -346,25 +436,34 @@ func BenchmarkNatsQueuePubSub(b *testing.B) { } defer nc.Close() - h := &harness{s, nc} - dt, cleanup, err := h.CreateTopic(ctx, b.Name()) - if err != nil { - b.Fatal(err) - } - defer cleanup() - - qs, cleanup, err := h.CreateQueueSubscription(ctx, dt, b.Name()) - if err != nil { - b.Fatal(err) + for _, tc := range []struct { + name string + h *harness + }{ + {name: "V1", h: &harness{s: s, nc: nc, useV2: false}}, + {name: "V2", h: &harness{s: s, nc: nc, useV2: true}}, + } { + b.Run(tc.name, func(b *testing.B) { + dt, cleanup, err := tc.h.CreateTopic(ctx, b.Name()) + if err != nil { + b.Fatal(err) + } + defer cleanup() + + qs, cleanup, err := tc.h.CreateQueueSubscription(ctx, dt, b.Name()) + if err != nil { + b.Fatal(err) + } + defer cleanup() + + topic := pubsub.NewTopic(dt, nil) + defer topic.Shutdown(ctx) + queueSub := pubsub.NewSubscription(qs, recvBatcherOpts, nil) + defer queueSub.Shutdown(ctx) + + drivertest.RunBenchmarks(b, topic, queueSub) + }) } - defer cleanup() - - topic := pubsub.NewTopic(dt, nil) - defer topic.Shutdown(ctx) - queueSub := pubsub.NewSubscription(qs, recvBatcherOpts, nil) - defer queueSub.Shutdown(ctx) - - drivertest.RunBenchmarks(b, topic, queueSub) } func BenchmarkNatsPubSub(b *testing.B) { @@ -381,24 +480,33 @@ func BenchmarkNatsPubSub(b *testing.B) { } defer nc.Close() - h := &harness{s, nc} - dt, cleanup, err := h.CreateTopic(ctx, b.Name()) - if err != nil { - b.Fatal(err) - } - defer cleanup() - ds, cleanup, err := h.CreateSubscription(ctx, dt, b.Name()) - if err != nil { - b.Fatal(err) + for _, tc := range []struct { + name string + h *harness + }{ + {name: "V1", h: &harness{s: s, nc: nc, useV2: false}}, + {name: "V2", h: &harness{s: s, nc: nc, useV2: true}}, + } { + b.Run(tc.name, func(b *testing.B) { + dt, cleanup, err := tc.h.CreateTopic(ctx, b.Name()) + if err != nil { + b.Fatal(err) + } + defer cleanup() + ds, cleanup, err := tc.h.CreateSubscription(ctx, dt, b.Name()) + if err != nil { + b.Fatal(err) + } + defer cleanup() + + topic := pubsub.NewTopic(dt, nil) + defer topic.Shutdown(ctx) + sub := pubsub.NewSubscription(ds, recvBatcherOpts, nil) + defer sub.Shutdown(ctx) + + drivertest.RunBenchmarks(b, topic, sub) + }) } - defer cleanup() - - topic := pubsub.NewTopic(dt, nil) - defer topic.Shutdown(ctx) - sub := pubsub.NewSubscription(ds, recvBatcherOpts, nil) - defer sub.Shutdown(ctx) - - drivertest.RunBenchmarks(b, topic, sub) } func fakeConnectionStringInEnv() func() { @@ -462,6 +570,15 @@ func TestOpenSubscriptionFromURL(t *testing.T) { {"nats://mytopic?param=value", true}, // Queue URL Parameter for QueueSubscription. {"nats://mytopic?queue=queue1", false}, + // Multiple values for Queue URL Parameter for QueueSubscription. + {"nats://mytopic?queue=queue1&queue=queue2", true}, + // NATSV2 URL should be acceptable without values. + {"nats://mytopic?natsv2", false}, + // NATSV2 URL should be acceptable with boolean parsable values. + {"nats://mytopic?natsv2=true", false}, + {"nats://mytopic?natsv2=false", false}, + // NATSV2 URL should throw error with non-boolean parsable values. + {"nats://mytopic?natsv2=foo", true}, } for _, test := range tests { @@ -476,6 +593,7 @@ func TestOpenSubscriptionFromURL(t *testing.T) { } func TestCodec(t *testing.T) { + const sub = "foo" for _, dm := range []*driver.Message{ {Metadata: nil, Body: nil}, {Metadata: map[string]string{"a": "1"}, Body: nil}, @@ -484,19 +602,38 @@ func TestCodec(t *testing.T) { {Metadata: map[string]string{"a": "1"}, Body: []byte("hello"), AckID: "foo", AsFunc: func(interface{}) bool { return true }}, } { - bytes, err := encodeMessage(dm) - if err != nil { - t.Fatal(err) - } - var got driver.Message - if err := decodeMessage(bytes, &got); err != nil { - t.Fatal(err) - } - want := *dm - want.AckID = nil - want.AsFunc = nil - if diff := cmp.Diff(got, want); diff != "" { - t.Errorf("%+v:\n%s", want, diff) - } + t.Run("V1", func(t *testing.T) { + bytes, err := encodeMessage(dm) + if err != nil { + t.Fatal(err) + } + var got driver.Message + if err := decodeMessage(bytes, &got); err != nil { + t.Fatal(err) + } + want := *dm + want.AckID = nil + want.AsFunc = nil + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("%+v:\n%s", want, diff) + } + }) + t.Run("V2", func(t *testing.T) { + nm := encodeMessageV2(dm, sub) + got, err := decodeMessageV2(nm) + if err != nil { + t.Fatal(err) + } + + want := *dm + want.AckID = nil + want.AsFunc = nil + // AsFunc needs to be cleared as it cannot be comparable using Diff. + got.AsFunc = nil + if diff := cmp.Diff(*got, want); diff != "" { + t.Errorf("%+v:\n%s", want, diff) + } + }) + } } diff --git a/samples/go.mod b/samples/go.mod index 1e0ae9c5f6..d7b054f579 100644 --- a/samples/go.mod +++ b/samples/go.mod @@ -130,7 +130,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/montanaflynn/stats v0.7.1 // indirect - github.com/nats-io/nats.go v1.27.1 // indirect + github.com/nats-io/nats.go v1.28.0 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect diff --git a/samples/go.sum b/samples/go.sum index 3a17307530..f9cc1cc997 100644 --- a/samples/go.sum +++ b/samples/go.sum @@ -1232,7 +1232,7 @@ github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S github.com/microsoft/go-mssqldb v1.3.0/go.mod h1:lmWsjHD8XX/Txr0f8ZqgbEZSC+BZjmEQy/Ms+rLrvho= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= -github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -1246,10 +1246,10 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= -github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= -github.com/nats-io/nats-server/v2 v2.7.2 h1:+LEN8m0+jdCkiGc884WnDuxR+qj80/5arj+szKuRpRI= -github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo= -github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=