Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Provider for NATS Subject #651

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/v1beta3/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ const (
AlertManagerProvider string = "alertmanager"
PagerDutyProvider string = "pagerduty"
DataDogProvider string = "datadog"
NATSProvider string = "nats"
)

// ProviderSpec defines the desired state of the Provider.
type ProviderSpec struct {
// Type specifies which Provider implementation to use.
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats
// +required
Type string `json:"type"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ spec:
- githubdispatch
- pagerduty
- datadog
- nats
type: string
username:
description: Username specifies the name under which events are posted.
Expand Down
45 changes: 45 additions & 0 deletions docs/spec/v1beta3/providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The supported alerting providers are:
| [Slack](#slack) | `slack` |
| [Telegram](#telegram) | `telegram` |
| [WebEx](#webex) | `webex` |
| [NATS](#nats) | `nats` |

The supported providers for [Git commit status updates](#git-commit-status-updates) are:

Expand Down Expand Up @@ -973,6 +974,50 @@ stringData:
token: <bot-token>
```

##### NATS

When `.spec.type` is set to `nats`, the controller will publish the payload of
an [Event](events.md#event-structure) on the [NATS Subject](https://docs.nats.io/nats-concepts/subjects) provided in the
[Channel](#channel) field, using the server specified in the [Address](#address) field.

This Provider type can optionally use the [Secret reference](#secret-reference) to
authenticate to the NATS server using [Username/Password](https://docs.nats.io/using-nats/developer/connecting/userpass).
The credentials must be specified in [the `username`](#username-example) and `password` fields of the Secret.
Alternatively, NATS also supports passing the credentials with [the server URL](https://docs.nats.io/using-nats/developer/connecting/userpass#connecting-with-a-user-password-in-the-url). In this case the `address` should be provided through a
Secret reference.

Additionally if using credentials, the User must have [authorization](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/authorization) to publish on the Subject provided.

###### NATS with Username/Password Credentials Example

To configure a Provider for NATS authenticating with Username/Password, create a Secret with the
`username` and `password` fields set, and add a `nats` Provider with the associated
[Secret reference](#secret-reference).

```yaml
---
apiVersion: notification.toolkit.fluxcd.io/v1beta3
kind: Provider
metadata:
name: nats-provider
namespace: desired-namespace
spec:
type: nats
address: <NATS Server URL>
channel: <Subject>
secretRef:
name: nats-provider-creds
---
apiVersion: v1
kind: Secret
metadata:
name: nats-provider-creds
namespace: desired-namespace
stringData:
username: <NATS Username>
password: <NATS Password>
```

### Address

`.spec.address` is an optional field that specifies the endpoint where the events are posted.
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/ktrysmt/go-bitbucket v0.9.66
github.com/microsoft/azure-devops-go-api/azuredevops/v6 v6.0.1
github.com/nats-io/nats.go v1.31.0
github.com/onsi/gomega v1.30.0
github.com/sethvargo/go-limiter v0.7.2
github.com/slok/go-http-metrics v0.10.0
Expand Down Expand Up @@ -109,6 +110,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand All @@ -122,6 +124,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -1025,6 +1027,12 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 h1:n6/
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod h1:Pm3mSP3c5uWn86xMLZ5Sa7JB9GsEZySvHYXCTK4E9q4=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/onsi/ginkgo/v2 v2.13.1 h1:LNGfMbR2OVGBfXjvRZIZ2YCTQdGKtPLvuI1rMCCj3OU=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
Expand Down
2 changes: 2 additions & 0 deletions internal/notifier/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (f Factory) Notifier(provider string) (Interface, error) {
n, err = NewPagerDuty(f.URL, f.ProxyURL, f.CertPool, f.Channel)
case apiv1.DataDogProvider:
n, err = NewDataDog(f.URL, f.ProxyURL, f.CertPool, f.Token)
case apiv1.NATSProvider:
n, err = NewNATS(f.URL, f.Channel, f.Username, f.Password)
default:
err = fmt.Errorf("provider %s not supported", provider)
}
Expand Down
105 changes: 105 additions & 0 deletions internal/notifier/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2023 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package notifier

import (
"context"
"encoding/json"
"errors"
"fmt"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/nats-io/nats.go"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type (
// NATS holds a NATS client and target subject.
NATS struct {
subject string
client interface {
publish(ctx context.Context, subject string, eventPayload []byte) (err error)
}
}

natsClient struct {
server string
username string
password string
}
)

func NewNATS(server string, subject string, username string, password string) (*NATS, error) {
if server == "" {
return nil, errors.New("NATS server (address) cannot be empty")
}
if subject == "" {
return nil, errors.New("NATS subject (channel) cannot be empty")
}
return &NATS{
subject: subject,
client: &natsClient{
server: server,
username: username,
password: password,
},
}, nil
}

// Post posts Flux events to a NATS subject.
func (n *NATS) Post(ctx context.Context, event eventv1.Event) error {
// Skip Git commit status update event.
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
return nil
}

eventPayload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error json-marshaling event: %w", err)
}

err = n.client.publish(ctx, n.subject, eventPayload)
if err != nil {
return fmt.Errorf("error publishing event to subject %s: %w", n.subject, err)
}

// debug log
log.FromContext(ctx).V(1).Info("Event published to NATS subject", "subject", n.subject)

return nil
}

func (n *natsClient) publish(ctx context.Context, subject string, eventPayload []byte) (err error) {
opts := []nats.Option{nats.Name("NATS Provider Publisher")}
if n.username != "" && n.password != "" {
opts = append(opts, nats.UserInfo(n.username, n.password))
}

nc, err := nats.Connect(n.server, opts...)
if err != nil {
return fmt.Errorf("error connecting to server: %w", err)
}
defer nc.Close()

nc.Publish(subject, eventPayload)
nc.Flush()
if err = nc.LastError(); err != nil {
return fmt.Errorf("error publishing message to server: %w", err)
}

return err
}
152 changes: 152 additions & 0 deletions internal/notifier/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package notifier

import (
"context"
"errors"
"fmt"
"testing"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
. "github.com/onsi/gomega"
)

func TestNewNATS(t *testing.T) {
tests := []struct {
name string
subject string
server string
username string
password string
expectedErr error
expectedSubject string
expectedUsername string
expectedPassword string
}{
{
name: "empty subject is not allowed",
subject: "",
server: "nats",
expectedErr: errors.New("NATS subject (channel) cannot be empty"),
},
{
name: "empty server is not allowed",
subject: "test",
server: "",
expectedErr: errors.New("NATS server (address) cannot be empty"),
},
{
name: "empty creds are stored properly",
subject: "test",
server: "nats",
username: "",
password: "",
expectedSubject: "test",
expectedUsername: "",
expectedPassword: "",
},
{
name: "non-empty creds are stored properly",
subject: "test",
server: "nats",
username: "user",
password: "pass",
expectedSubject: "test",
expectedUsername: "user",
expectedPassword: "pass",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

provider, err := NewNATS(tt.server, tt.subject, tt.username, tt.password)
if tt.expectedErr != nil {
g.Expect(err).To(Equal(tt.expectedErr))
g.Expect(provider).To(BeNil())
} else {
g.Expect(err).To(BeNil())
g.Expect(provider).NotTo(BeNil())

g.Expect(provider.subject).To(Equal(tt.expectedSubject))

g.Expect(provider.client).NotTo(BeNil())
client := provider.client.(*natsClient)
g.Expect(client).NotTo(BeNil())

g.Expect(client.server).To(Equal(tt.server))
g.Expect(client.username).To(Equal(tt.expectedUsername))
g.Expect(client.password).To(Equal(tt.expectedPassword))
}
})
}
}

type natsPostTestCase struct {
name string
subject string
event eventv1.Event
expectedEventPayload string
publishErr error
expectedErr error
publishShouldExecute bool
publishExecuted bool

g *WithT
}

func (tt *natsPostTestCase) publish(ctx context.Context, subject string, eventPayload []byte) (err error) {
tt.g.THelper()
tt.publishExecuted = true
tt.g.Expect(subject).To(Equal(tt.subject))
tt.g.Expect(string(eventPayload)).To(Equal(tt.expectedEventPayload))
return tt.publishErr
}

func TestNATSPost(t *testing.T) {
tests := []*natsPostTestCase{
{
name: "events are properly marshaled",
event: eventv1.Event{
Metadata: map[string]string{"foo": "bar"},
},
expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","metadata":{"foo":"bar"},"reportingController":""}`,
publishShouldExecute: true,
},
{
name: "commit status updates are dropped",
event: eventv1.Event{
Metadata: map[string]string{"commit_status": "update"},
},
publishShouldExecute: false,
},
{
name: "publish error is wrapped and relayed",
subject: "test",
expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`,
publishErr: errors.New("publish error"),
expectedErr: fmt.Errorf("error publishing event to subject test: %w", errors.New("publish error")),
publishShouldExecute: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
tt.g = g

topic := &NATS{
client: tt,
subject: tt.subject,
}

err := topic.Post(context.Background(), tt.event)
if tt.expectedErr == nil {
g.Expect(err).To(BeNil())
} else {
g.Expect(err).To(Equal(tt.expectedErr))
}
g.Expect(tt.publishExecuted).To(Equal(tt.publishShouldExecute))
})
}
}