Skip to content

Commit

Permalink
feat: TLS for Jetstream (#1815)
Browse files Browse the repository at this point in the history
* starting to incorporate TLS into Jetstream

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>
  • Loading branch information
juliev0 authored Apr 6, 2022
1 parent f869f6b commit 29616ec
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 106 deletions.
12 changes: 12 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ const (
JetStreamServerSecretEncryptionKey = "encryption"
// key of client auth secret
JetStreamClientAuthSecretKey = "client-auth"
// key for server private key
JetStreamServerPrivateKeyKey = "private-key"
// key for server TLS certificate
JetStreamServerCertKey = "cert"
// key for server CA certificate
JetStreamServerCACertKey = "ca-cert"
// key for server private key
JetStreamClusterPrivateKeyKey = "cluster-private-key"
// key for server TLS certificate
JetStreamClusterCertKey = "cluster-cert"
// key for server CA certificate
JetStreamClusterCACertKey = "cluster-ca-cert"
// key of nats-js.conf in the configmap
JetStreamConfigMapKey = "nats-js"
// Jetstream Stream name
Expand Down
19 changes: 14 additions & 5 deletions common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package leaderelection

import (
"context"
"crypto/tls"

"github.com/fsnotify/fsnotify"
"github.com/nats-io/graft"
Expand All @@ -28,6 +29,7 @@ type LeaderCallbacks struct {

func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) {
logger := logging.FromContext(ctx)

var eventBusType apicommon.EventBusType
var eventBusAuth *eventbusv1alpha1.AuthStrategy
switch {
Expand All @@ -40,6 +42,7 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
default:
return nil, errors.New("invalid event bus")
}

var auth *eventbuscommon.Auth
cred := &eventbuscommon.AuthCredential{}
if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone {
Expand All @@ -66,10 +69,11 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus
logger.Fatal("Eventbus auth config file changed, exiting..")
})
auth = &eventbuscommon.Auth{
Strategy: *eventBusAuth,
Crendential: cred,
Strategy: *eventBusAuth,
Credential: cred,
}
}

var elector Elector
switch eventBusType {
case apicommon.EventBusNATS:
Expand Down Expand Up @@ -107,11 +111,16 @@ func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCall
opts.MaxReconnect = -1
opts.Url = e.url
if e.auth.Strategy == eventbusv1alpha1.AuthStrategyToken {
opts.Token = e.auth.Crendential.Token
opts.Token = e.auth.Credential.Token
} else if e.auth.Strategy == eventbusv1alpha1.AuthStrategyBasic {
opts.User = e.auth.Crendential.Username
opts.Password = e.auth.Crendential.Password
opts.User = e.auth.Credential.Username
opts.Password = e.auth.Credential.Password
}

opts.TLSConfig = &tls.Config{ // seems fine to pass this in even when we're not using TLS
InsecureSkipVerify: true,
}

rpc, err := graft.NewNatsRpc(opts)
if err != nil {
log.Fatalw("failed to new Nats Rpc", zap.Error(err))
Expand Down
45 changes: 25 additions & 20 deletions common/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ func createCACertTemplate(org string, hosts []string, notAfter time.Time) (*x509
return rootCert, nil
}

func createServerCertTemplate(org string, hosts []string, notAfter time.Time) (*x509.Certificate, error) {
serverCert, err := certTemplate(org, hosts, notAfter)
if err != nil {
return nil, err
}
serverCert.KeyUsage = x509.KeyUsageDigitalSignature
serverCert.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}
return serverCert, err
}

// Sign the cert
func createCert(template, parent *x509.Certificate, pub, parentPriv interface{}) (
cert *x509.Certificate, certPEM []byte, err error) {
Expand Down Expand Up @@ -86,32 +76,47 @@ func createCA(org string, hosts []string, notAfter time.Time) (*rsa.PrivateKey,
return rootKey, rootCert, rootCertPEM, nil
}

// CreateCerts creates and returns a CA certificate and certificate and
// key for the server
func CreateCerts(org string, hosts []string, notAfter time.Time) (serverKey, serverCert, caCert []byte, err error) {
// CreateCerts creates and returns a CA certificate and certificate and key
// if server==true, generate these for a server
// if client==true, generate these for a client
// can generate for both server and client but at least one must be specified
func CreateCerts(org string, hosts []string, notAfter time.Time, server bool, client bool) (serverKey, serverCert, caCert []byte, err error) {
if !server && !client {
return nil, nil, nil, errors.Wrap(err, "CreateCerts() must specify either server or client")
}

// Create a CA certificate and private key
caKey, caCertificate, caCertificatePEM, err := createCA(org, hosts, notAfter)
if err != nil {
return nil, nil, nil, err
}

// Create the private key
servKey, err := rsa.GenerateKey(rand.Reader, 2048)
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to generate random key")
}
servCertTemplate, err := createServerCertTemplate(org, hosts, notAfter)
var cert *x509.Certificate

cert, err = certTemplate(org, hosts, notAfter)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to create server cert template")
return nil, nil, nil, err
}
cert.KeyUsage = x509.KeyUsageDigitalSignature
if server {
cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth)
}
if client {
cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)
}

// create a certificate wrapping the public key, sign it with the CA private key
_, servCertPEM, err := createCert(servCertTemplate, caCertificate, &servKey.PublicKey, caKey)
_, certPEM, err := createCert(cert, caCertificate, &privateKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to sign server cert")
}
servKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(servKey),
privateKeyPEM := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey),
})
return servKeyPEM, servCertPEM, caCertificatePEM, nil
return privateKeyPEM, certPEM, caCertificatePEM, nil
}
2 changes: 1 addition & 1 deletion common/tls/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func TestCreateCerts(t *testing.T) {
t.Run("test create certs", func(t *testing.T) {
sKey, serverCertPEM, caCertBytes, err := CreateCerts("test-org", []string{"test-host"}, time.Now().AddDate(1, 0, 0))
sKey, serverCertPEM, caCertBytes, err := CreateCerts("test-org", []string{"test-host"}, time.Now().AddDate(1, 0, 0), true, false)
assert.NoError(t, err)
p, _ := pem.Decode(sKey)
assert.Equal(t, "RSA PRIVATE KEY", p.Type)
Expand Down
6 changes: 6 additions & 0 deletions controllers/eventbus/installer/assets/jetstream/nats.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ cluster {
routes: [{{.Routes}}]
cluster_advertise: $CLUSTER_ADVERTISE
connect_retries: 120

tls {
cert_file: "/etc/nats-config/cluster-server-cert.pem"
key_file: "/etc/nats-config/cluster-server-key.pem"
ca_file: "/etc/nats-config/cluster-ca-cert.pem"
}
}
lame_duck_duration: 120s
##################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ accounts: {
]
}
}

tls {
cert_file: "/etc/nats-config/server-cert.pem"
key_file: "/etc/nats-config/server-key.pem"
ca_file: "/etc/nats-config/ca-cert.pem"
}
Loading

0 comments on commit 29616ec

Please sign in to comment.