Skip to content

Commit

Permalink
feat: native nats eventbus metrics and template customization (#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Jul 16, 2020
1 parent 1651bb5 commit 210aefb
Show file tree
Hide file tree
Showing 19 changed files with 827 additions and 115 deletions.
59 changes: 59 additions & 0 deletions api/event-bus.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,37 @@ <h3 id="argoproj.io/v1alpha1.BusConfig">BusConfig
</tr>
</tbody>
</table>
<h3 id="argoproj.io/v1alpha1.ContainerTemplate">ContainerTemplate
</h3>
<p>
(<em>Appears on:</em>
<a href="#argoproj.io/v1alpha1.NativeStrategy">NativeStrategy</a>)
</p>
<p>
<p>ContainerTemplate defines customized spec for a container</p>
</p>
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>resources</code></br>
<em>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.13/#resourcerequirements-v1-core">
Kubernetes core/v1.ResourceRequirements
</a>
</em>
</td>
<td>
</td>
</tr>
</tbody>
</table>
<h3 id="argoproj.io/v1alpha1.EventBus">EventBus
</h3>
<p>
Expand Down Expand Up @@ -377,6 +408,34 @@ <h3 id="argoproj.io/v1alpha1.NativeStrategy">NativeStrategy
<em>(Optional)</em>
</td>
</tr>
<tr>
<td>
<code>containerTemplate</code></br>
<em>
<a href="#argoproj.io/v1alpha1.ContainerTemplate">
ContainerTemplate
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>ContainerTemplate contains customized spec for NATS container</p>
</td>
</tr>
<tr>
<td>
<code>metricsContainerTemplate</code></br>
<em>
<a href="#argoproj.io/v1alpha1.ContainerTemplate">
ContainerTemplate
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>MetricsContainerTemplate contains customized spec for metrics container</p>
</td>
</tr>
</tbody>
</table>
<h3 id="argoproj.io/v1alpha1.PersistenceStrategy">PersistenceStrategy
Expand Down
115 changes: 115 additions & 0 deletions api/event-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,73 @@ NATSConfig </a> </em>

</table>

<h3 id="argoproj.io/v1alpha1.ContainerTemplate">

ContainerTemplate

</h3>

<p>

(<em>Appears on:</em>
<a href="#argoproj.io/v1alpha1.NativeStrategy">NativeStrategy</a>)

</p>

<p>

<p>

ContainerTemplate defines customized spec for a container

</p>

</p>

<table>

<thead>

<tr>

<th>

Field

</th>

<th>

Description

</th>

</tr>

</thead>

<tbody>

<tr>

<td>

<code>resources</code></br> <em>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.13/#resourcerequirements-v1-core">
Kubernetes core/v1.ResourceRequirements </a> </em>

</td>

<td>

</td>

</tr>

</tbody>

</table>

<h3 id="argoproj.io/v1alpha1.EventBus">

EventBus
Expand Down Expand Up @@ -763,6 +830,54 @@ Size is the NATS StatefulSet size

</tr>

<tr>

<td>

<code>containerTemplate</code></br> <em>
<a href="#argoproj.io/v1alpha1.ContainerTemplate"> ContainerTemplate
</a> </em>

</td>

<td>

<em>(Optional)</em>

<p>

ContainerTemplate contains customized spec for NATS container

</p>

</td>

</tr>

<tr>

<td>

<code>metricsContainerTemplate</code></br> <em>
<a href="#argoproj.io/v1alpha1.ContainerTemplate"> ContainerTemplate
</a> </em>

</td>

<td>

<em>(Optional)</em>

<p>

MetricsContainerTemplate contains customized spec for metrics container

</p>

</td>

</tr>

</tbody>

</table>
Expand Down
17 changes: 17 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@
}
}
},
"io.argoproj.eventbus.v1alpha1.ContainerTemplate": {
"description": "ContainerTemplate defines customized spec for a container",
"type": "object",
"properties": {
"resources": {
"$ref": "#/definitions/io.k8s.api.core.v1.ResourceRequirements"
}
}
},
"io.argoproj.eventbus.v1alpha1.EventBus": {
"description": "EventBus is the definition of a eventbus resource",
"type": "object",
Expand Down Expand Up @@ -296,6 +305,14 @@
"auth": {
"type": "string"
},
"containerTemplate": {
"description": "ContainerTemplate contains customized spec for NATS container",
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.ContainerTemplate"
},
"metricsContainerTemplate": {
"description": "MetricsContainerTemplate contains customized spec for metrics container",
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.ContainerTemplate"
},
"persistence": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.PersistenceStrategy"
},
Expand Down
9 changes: 7 additions & 2 deletions controllers/eventbus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

const (
natsStreamingEnvVar = "NATS_STREAMING_IMAGE"
natsStreamingEnvVar = "NATS_STREAMING_IMAGE"
natsMetricsExporterEnvVar = "NATS_METRICS_EXPORTER_IMAGE"
)

var (
Expand All @@ -47,6 +48,10 @@ func main() {
if !defined {
panic(fmt.Errorf("required environment variable '%s' not defined", natsStreamingEnvVar))
}
natsMetricsImage, defined := os.LookupEnv(natsMetricsExporterEnvVar)
if !defined {
panic(fmt.Errorf("required environment variable '%s' not defined", natsMetricsExporterEnvVar))
}
opts := ctrl.Options{}
if namespaced {
opts.Namespace = managedNamespace
Expand All @@ -62,7 +67,7 @@ func main() {
}
// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), natsStreamingImage, log.WithName("reconciler")),
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), natsStreamingImage, natsMetricsImage, log.WithName("reconciler")),
})
if err != nil {
mainLog.Error(err, "unable to set up individual controller")
Expand Down
9 changes: 5 additions & 4 deletions controllers/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ type reconciler struct {
scheme *runtime.Scheme

natsStreamingImage string
natsMetricsImage string
logger logr.Logger
}

// NewReconciler returns a new reconciler
func NewReconciler(client client.Client, scheme *runtime.Scheme, natsStreamingImage string, logger logr.Logger) reconcile.Reconciler {
return &reconciler{client: client, scheme: scheme, natsStreamingImage: natsStreamingImage, logger: logger}
func NewReconciler(client client.Client, scheme *runtime.Scheme, natsStreamingImage, natsMetricsImage string, logger logr.Logger) reconcile.Reconciler {
return &reconciler{client: client, scheme: scheme, natsStreamingImage: natsStreamingImage, natsMetricsImage: natsMetricsImage, logger: logger}
}

func (r *reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
if !eventBus.DeletionTimestamp.IsZero() {
log.Info("deleting eventbus")
// Finalizer logic should be added here.
err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, log)
err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
if err != nil {
log.Error(err, "failed to uninstall")
return nil
Expand All @@ -79,7 +80,7 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
r.addFinalizer(eventBus)

eventBus.Status.InitConditions()
return installer.Install(eventBus, r.client, r.natsStreamingImage, log)
return installer.Install(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
}

func (r *reconciler) addFinalizer(s *v1alpha1.EventBus) {
Expand Down
12 changes: 6 additions & 6 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type Installer interface {
}

// Install function installs the event bus
func Install(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage string, logger logr.Logger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, logger)
func Install(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger logr.Logger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Error(err, "failed to an installer")
}
Expand All @@ -33,12 +33,12 @@ func Install(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingIma
}

// GetInstaller returns Installer implementation
func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage string, logger logr.Logger) (Installer, error) {
func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger logr.Logger) (Installer, error) {
if nats := eventBus.Spec.NATS; nats != nil {
if nats.Exotic != nil {
return NewExoticNATSInstaller(eventBus, logger), nil
} else if nats.Native != nil {
return NewNATSInstaller(client, eventBus, natsStreamingImage, getLabels(eventBus), logger), nil
return NewNATSInstaller(client, eventBus, natsStreamingImage, natsMetricsImage, getLabels(eventBus), logger), nil
}
}
return nil, errors.New("invalid eventbus spec")
Expand All @@ -56,8 +56,8 @@ func getLabels(bus *v1alpha1.EventBus) map[string]string {
// the dependency resources should have been deleted by owner references cascade
// deletion, but things like PVC created by StatefulSet need to be cleaned up
// separately.
func Uninstall(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage string, logger logr.Logger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, logger)
func Uninstall(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger logr.Logger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Error(err, "failed to get an installer")
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

func TestGetInstaller(t *testing.T) {
t.Run("get installer", func(t *testing.T) {
installer, err := getInstaller(testEventBus, nil, "", ctrl.Log.WithName("test"))
installer, err := getInstaller(testEventBus, nil, "", "", ctrl.Log.WithName("test"))
assert.NoError(t, err)
assert.NotNil(t, installer)
_, ok := installer.(*natsInstaller)
assert.True(t, ok)

installer, err = getInstaller(testExoticBus, nil, "", ctrl.Log.WithName("test"))
installer, err = getInstaller(testExoticBus, nil, "", "", ctrl.Log.WithName("test"))
assert.NoError(t, err)
assert.NotNil(t, installer)
_, ok = installer.(*exoticNATSInstaller)
Expand Down
Loading

0 comments on commit 210aefb

Please sign in to comment.