diff --git a/api/v1beta3/provider_types.go b/api/v1beta3/provider_types.go index 8bf360f78..f44963109 100644 --- a/api/v1beta3/provider_types.go +++ b/api/v1beta3/provider_types.go @@ -51,12 +51,13 @@ const ( AlertManagerProvider string = "alertmanager" PagerDutyProvider string = "pagerduty" DataDogProvider string = "datadog" + NATSProvider string = "nats" ) // ProviderSpec defines the desired state of the Provider. type ProviderSpec struct { // Type specifies which Provider implementation to use. - // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog + // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats // +required Type string `json:"type"` diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml index cc229fb64..49c21015c 100644 --- a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml +++ b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml @@ -509,6 +509,7 @@ spec: - githubdispatch - pagerduty - datadog + - nats type: string username: description: Username specifies the name under which events are posted. diff --git a/docs/spec/v1beta3/providers.md b/docs/spec/v1beta3/providers.md index bc273005f..4e53c5d3c 100644 --- a/docs/spec/v1beta3/providers.md +++ b/docs/spec/v1beta3/providers.md @@ -107,6 +107,7 @@ The supported alerting providers are: | [Slack](#slack) | `slack` | | [Telegram](#telegram) | `telegram` | | [WebEx](#webex) | `webex` | +| [NATS](#nats) | `nats` | The supported providers for [Git commit status updates](#git-commit-status-updates) are: @@ -973,6 +974,50 @@ stringData: token: ``` +##### NATS + +When `.spec.type` is set to `nats`, the controller will publish the payload of +an [Event](events.md#event-structure) on the [NATS Subject](https://docs.nats.io/nats-concepts/subjects) provided in the +[Channel](#channel) field, using the server specified in the [Address](#address) field. + +This Provider type can optionally use the [Secret reference](#secret-reference) to +authenticate to the NATS server using [Username/Password](https://docs.nats.io/using-nats/developer/connecting/userpass). +The credentials must be specified in [the `username`](#username-example) and `password` fields of the Secret. +Alternatively, NATS also supports passing the credentials with [the server URL](https://docs.nats.io/using-nats/developer/connecting/userpass#connecting-with-a-user-password-in-the-url). In this case the `address` should be provided through a +Secret reference. + +Additionally if using credentials, the User must have [authorization](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/authorization) to publish on the Subject provided. + +###### NATS with Username/Password Credentials Example + +To configure a Provider for NATS authenticating with Username/Password, create a Secret with the +`username` and `password` fields set, and add a `nats` Provider with the associated +[Secret reference](#secret-reference). + +```yaml +--- +apiVersion: notification.toolkit.fluxcd.io/v1beta3 +kind: Provider +metadata: + name: nats-provider + namespace: desired-namespace +spec: + type: nats + address: + channel: + secretRef: + name: nats-provider-creds +--- +apiVersion: v1 +kind: Secret +metadata: + name: nats-provider-creds + namespace: desired-namespace +stringData: + username: + password: +``` + ### Address `.spec.address` is an optional field that specifies the endpoint where the events are posted. diff --git a/go.mod b/go.mod index bd1859ab9..25666ec6d 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.5 github.com/ktrysmt/go-bitbucket v0.9.66 github.com/microsoft/azure-devops-go-api/azuredevops/v6 v6.0.1 + github.com/nats-io/nats.go v1.31.0 github.com/onsi/gomega v1.30.0 github.com/sethvargo/go-limiter v0.7.2 github.com/slok/go-http-metrics v0.10.0 @@ -109,6 +110,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect @@ -122,6 +124,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 63ca8ae2d..99a6a391a 100644 --- a/go.sum +++ b/go.sum @@ -974,6 +974,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -1025,6 +1027,12 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 h1:n6/ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod h1:Pm3mSP3c5uWn86xMLZ5Sa7JB9GsEZySvHYXCTK4E9q4= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/onsi/ginkgo/v2 v2.13.1 h1:LNGfMbR2OVGBfXjvRZIZ2YCTQdGKtPLvuI1rMCCj3OU= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= diff --git a/internal/notifier/factory.go b/internal/notifier/factory.go index 4d9900606..3280951ea 100644 --- a/internal/notifier/factory.go +++ b/internal/notifier/factory.go @@ -117,6 +117,8 @@ func (f Factory) Notifier(provider string) (Interface, error) { n, err = NewPagerDuty(f.URL, f.ProxyURL, f.CertPool, f.Channel) case apiv1.DataDogProvider: n, err = NewDataDog(f.URL, f.ProxyURL, f.CertPool, f.Token) + case apiv1.NATSProvider: + n, err = NewNATS(f.URL, f.Channel, f.Username, f.Password) default: err = fmt.Errorf("provider %s not supported", provider) } diff --git a/internal/notifier/nats.go b/internal/notifier/nats.go new file mode 100644 index 000000000..a276e56a1 --- /dev/null +++ b/internal/notifier/nats.go @@ -0,0 +1,105 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notifier + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + "github.com/nats-io/nats.go" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type ( + // NATS holds a NATS client and target subject. + NATS struct { + subject string + client interface { + publish(ctx context.Context, subject string, eventPayload []byte) (err error) + } + } + + natsClient struct { + server string + username string + password string + } +) + +func NewNATS(server string, subject string, username string, password string) (*NATS, error) { + if server == "" { + return nil, errors.New("NATS server (address) cannot be empty") + } + if subject == "" { + return nil, errors.New("NATS subject (channel) cannot be empty") + } + return &NATS{ + subject: subject, + client: &natsClient{ + server: server, + username: username, + password: password, + }, + }, nil +} + +// Post posts Flux events to a NATS subject. +func (n *NATS) Post(ctx context.Context, event eventv1.Event) error { + // Skip Git commit status update event. + if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) { + return nil + } + + eventPayload, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("error json-marshaling event: %w", err) + } + + err = n.client.publish(ctx, n.subject, eventPayload) + if err != nil { + return fmt.Errorf("error publishing event to subject %s: %w", n.subject, err) + } + + // debug log + log.FromContext(ctx).V(1).Info("Event published to NATS subject", "subject", n.subject) + + return nil +} + +func (n *natsClient) publish(ctx context.Context, subject string, eventPayload []byte) (err error) { + opts := []nats.Option{nats.Name("NATS Provider Publisher")} + if n.username != "" && n.password != "" { + opts = append(opts, nats.UserInfo(n.username, n.password)) + } + + nc, err := nats.Connect(n.server, opts...) + if err != nil { + return fmt.Errorf("error connecting to server: %w", err) + } + defer nc.Close() + + nc.Publish(subject, eventPayload) + nc.Flush() + if err = nc.LastError(); err != nil { + return fmt.Errorf("error publishing message to server: %w", err) + } + + return err +} diff --git a/internal/notifier/nats_test.go b/internal/notifier/nats_test.go new file mode 100644 index 000000000..2dc53a719 --- /dev/null +++ b/internal/notifier/nats_test.go @@ -0,0 +1,152 @@ +package notifier + +import ( + "context" + "errors" + "fmt" + "testing" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + . "github.com/onsi/gomega" +) + +func TestNewNATS(t *testing.T) { + tests := []struct { + name string + subject string + server string + username string + password string + expectedErr error + expectedSubject string + expectedUsername string + expectedPassword string + }{ + { + name: "empty subject is not allowed", + subject: "", + server: "nats", + expectedErr: errors.New("NATS subject (channel) cannot be empty"), + }, + { + name: "empty server is not allowed", + subject: "test", + server: "", + expectedErr: errors.New("NATS server (address) cannot be empty"), + }, + { + name: "empty creds are stored properly", + subject: "test", + server: "nats", + username: "", + password: "", + expectedSubject: "test", + expectedUsername: "", + expectedPassword: "", + }, + { + name: "non-empty creds are stored properly", + subject: "test", + server: "nats", + username: "user", + password: "pass", + expectedSubject: "test", + expectedUsername: "user", + expectedPassword: "pass", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + provider, err := NewNATS(tt.server, tt.subject, tt.username, tt.password) + if tt.expectedErr != nil { + g.Expect(err).To(Equal(tt.expectedErr)) + g.Expect(provider).To(BeNil()) + } else { + g.Expect(err).To(BeNil()) + g.Expect(provider).NotTo(BeNil()) + + g.Expect(provider.subject).To(Equal(tt.expectedSubject)) + + g.Expect(provider.client).NotTo(BeNil()) + client := provider.client.(*natsClient) + g.Expect(client).NotTo(BeNil()) + + g.Expect(client.server).To(Equal(tt.server)) + g.Expect(client.username).To(Equal(tt.expectedUsername)) + g.Expect(client.password).To(Equal(tt.expectedPassword)) + } + }) + } +} + +type natsPostTestCase struct { + name string + subject string + event eventv1.Event + expectedEventPayload string + publishErr error + expectedErr error + publishShouldExecute bool + publishExecuted bool + + g *WithT +} + +func (tt *natsPostTestCase) publish(ctx context.Context, subject string, eventPayload []byte) (err error) { + tt.g.THelper() + tt.publishExecuted = true + tt.g.Expect(subject).To(Equal(tt.subject)) + tt.g.Expect(string(eventPayload)).To(Equal(tt.expectedEventPayload)) + return tt.publishErr +} + +func TestNATSPost(t *testing.T) { + tests := []*natsPostTestCase{ + { + name: "events are properly marshaled", + event: eventv1.Event{ + Metadata: map[string]string{"foo": "bar"}, + }, + expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","metadata":{"foo":"bar"},"reportingController":""}`, + publishShouldExecute: true, + }, + { + name: "commit status updates are dropped", + event: eventv1.Event{ + Metadata: map[string]string{"commit_status": "update"}, + }, + publishShouldExecute: false, + }, + { + name: "publish error is wrapped and relayed", + subject: "test", + expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`, + publishErr: errors.New("publish error"), + expectedErr: fmt.Errorf("error publishing event to subject test: %w", errors.New("publish error")), + publishShouldExecute: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + tt.g = g + + topic := &NATS{ + client: tt, + subject: tt.subject, + } + + err := topic.Post(context.Background(), tt.event) + if tt.expectedErr == nil { + g.Expect(err).To(BeNil()) + } else { + g.Expect(err).To(Equal(tt.expectedErr)) + } + g.Expect(tt.publishExecuted).To(Equal(tt.publishShouldExecute)) + }) + } +}