Skip to content

Commit

Permalink
[Added] support for service export response threshold (#342)
Browse files Browse the repository at this point in the history
* [Added] support for service export response threshold

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel authored Jan 8, 2021
1 parent 4a80e87 commit b5998ae
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 9 deletions.
17 changes: 13 additions & 4 deletions cmd/addexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

cli "github.com/nats-io/cliprompts/v2"
"github.com/nats-io/jwt/v2"
Expand All @@ -46,6 +47,7 @@ func createAddExportCmd() *cobra.Command {
cmd.Flags().BoolVarP(&params.private, "private", "p", false, "private export - requires an activation to access")
cmd.Flags().StringVarP(&params.latSubject, "latency", "", "", "latency metrics subject (services only)")
cmd.Flags().StringVarP(&params.latSampling, "sampling", "", "", "latency sampling percentage [1-100] or `header` (services only)")
cmd.Flags().DurationVarP(&params.responseThreshold, "response-threshold", "", 0, "response threshold duration (units ms/s/m/h) (services only)")
hm := fmt.Sprintf("response type for the service [%s | %s | %s] (services only)", jwt.ResponseTypeSingleton, jwt.ResponseTypeStream, jwt.ResponseTypeChunked)
cmd.Flags().StringVarP(&params.responseType, "response-type", "", jwt.ResponseTypeSingleton, hm)
params.AccountContextParams.BindFlags(cmd)
Expand All @@ -65,10 +67,11 @@ type AddExportParams struct {
private bool
service bool
SignerParams
subject string
latSubject string
latSampling string
responseType string
subject string
latSubject string
latSampling string
responseType string
responseThreshold time.Duration
}

func (p *AddExportParams) longHelp() string {
Expand Down Expand Up @@ -170,6 +173,11 @@ func (p *AddExportParams) PreInteractive(ctx ActionCtx) error {
return err
}
p.export.ResponseType = jwt.ResponseType(choices[s])

p.export.ResponseThreshold, err = promptDuration("response threshold (0 disabled)", p.responseThreshold)
if err != nil {
return err
}
}

if err := p.SignerParams.Edit(ctx); err != nil {
Expand Down Expand Up @@ -297,6 +305,7 @@ func (p *AddExportParams) Validate(ctx ActionCtx) error {
return fmt.Errorf("unknown response type %q", p.responseType)
}
p.export.ResponseType = rt
p.export.ResponseThreshold = p.responseThreshold
}

if err = p.SignerParams.Resolve(ctx); err != nil {
Expand Down
33 changes: 31 additions & 2 deletions cmd/addexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd

import (
"testing"
"time"

"github.com/nats-io/jwt/v2"
"github.com/stretchr/testify/require"
Expand All @@ -39,6 +40,8 @@ func Test_AddExport(t *testing.T) {
{createAddExportCmd(), []string{"add", "export", "--subject", "mar", "--name", "ar", "--service"}, nil, []string{"added public service export \"ar\""}, false},
{createAddExportCmd(), []string{"add", "export", "--subject", "pubstream", "--private"}, nil, []string{"added private stream export \"pubstream\""}, false},
{createAddExportCmd(), []string{"add", "export", "--subject", "pubservice", "--private", "--service"}, nil, []string{"added private service export \"pubservice\""}, false},
{createAddExportCmd(), []string{"add", "export", "--subject", "th.1", "--service", "--response-threshold", "1s"}, nil, []string{"added public service export \"th.1\""}, false},
{createAddExportCmd(), []string{"add", "export", "--subject", "th.2", "--response-threshold", "1whatever"}, nil, []string{`time: unknown unit`}, true},
}

tests.Run(t, "root", "add")
Expand Down Expand Up @@ -133,7 +136,7 @@ func Test_AddExportAccountNameRequired(t *testing.T) {
require.Equal(t, "bbbb", ac.Exports[0].Name)
}

func TestAddExportInteractive(t *testing.T) {
func TestAddStreamExportInteractive(t *testing.T) {
ts := NewTestStore(t, "test")
defer ts.Done(t)

Expand All @@ -151,6 +154,31 @@ func TestAddExportInteractive(t *testing.T) {
require.Len(t, ac.Exports, 1)
require.Equal(t, "Foo Stream", ac.Exports[0].Name)
require.Equal(t, "foo.>", string(ac.Exports[0].Subject))
require.Equal(t, jwt.Stream, ac.Exports[0].Type)

}
func TestAddServiceExportInteractive(t *testing.T) {
ts := NewTestStore(t, "test")
defer ts.Done(t)

ts.AddAccount(t, "A")
ts.AddAccount(t, "B")

input := []interface{}{0, 1, "bar.>", "Bar Stream", true, true, "header", "foo", 0, "1s", 0}
cmd := createAddExportCmd()
HoistRootFlags(cmd)
_, _, err := ExecuteInteractiveCmd(cmd, input, "-i")
require.NoError(t, err)

ac, err := ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.Len(t, ac.Exports, 1)
require.Equal(t, "Bar Stream", ac.Exports[0].Name)
require.Equal(t, "bar.>", string(ac.Exports[0].Subject))
require.Equal(t, jwt.Service, ac.Exports[0].Type)
require.Equal(t, time.Second, ac.Exports[0].ResponseThreshold)
require.Equal(t, jwt.Headers, ac.Exports[0].Latency.Sampling)

}

func TestAddExportNonInteractive(t *testing.T) {
Expand Down Expand Up @@ -213,7 +241,7 @@ func TestAddServiceLatencyInteractive(t *testing.T) {
cmd := createAddExportCmd()

// service, subject, name, private, track, freq
args := []interface{}{1, "q", "q", false, true, "100", "q.lat", 1}
args := []interface{}{1, "q", "q", false, true, "100", "q.lat", 1, "0"}
_, _, err := ExecuteInteractiveCmd(cmd, args)
require.NoError(t, err)

Expand All @@ -225,4 +253,5 @@ func TestAddServiceLatencyInteractive(t *testing.T) {
require.Equal(t, "q.lat", string(ac.Exports[0].Latency.Results))
require.Equal(t, jwt.SamplingRate(100), ac.Exports[0].Latency.Sampling)
require.EqualValues(t, jwt.ResponseTypeStream, ac.Exports[0].ResponseType)
require.Equal(t, time.Duration(0), ac.Exports[0].ResponseThreshold)
}
11 changes: 11 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,14 @@ func StoreAccountAndUpdateStatus(ctx ActionCtx, token string, status *store.Repo
status.AddFromError(err)
}
}

func promptDuration(label string, defaultValue time.Duration) (time.Duration, error) {
value, err := cli.Prompt(label, defaultValue.String())
if err != nil {
return time.Duration(0), err
}
if value == "" {
return time.Duration(0), nil
}
return time.ParseDuration(value)
}
15 changes: 15 additions & 0 deletions cmd/editexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd
import (
"errors"
"fmt"
"time"

cli "github.com/nats-io/cliprompts/v2"
"github.com/nats-io/jwt/v2"
Expand Down Expand Up @@ -47,6 +48,7 @@ func createEditExportCmd() *cobra.Command {
cmd.Flags().BoolVarP(&params.rmLatencySampling, "rm-latency-sampling", "", false, "remove latency sampling")
cmd.Flags().StringVarP(&params.description, "description", "", "", "Description for this export")
cmd.Flags().StringVarP(&params.infoUrl, "info-url", "", "", "Link for more info on this export")
cmd.Flags().DurationVarP(&params.responseThreshold, "response-threshold", "", 0, "response threshold duration (units ms/s/m/h) (services only)")

hm := fmt.Sprintf("response type for the service [%s | %s | %s] (services only)", jwt.ResponseTypeSingleton, jwt.ResponseTypeStream, jwt.ResponseTypeChunked)
cmd.Flags().StringVarP(&params.responseType, "response-type", "", jwt.ResponseTypeSingleton, hm)
Expand Down Expand Up @@ -75,6 +77,7 @@ type EditExportParams struct {
rmLatencySampling bool
infoUrl string
description string
responseThreshold time.Duration
}

func (p *EditExportParams) SetDefaults(ctx ActionCtx) error {
Expand Down Expand Up @@ -231,6 +234,10 @@ func (p *EditExportParams) PostInteractive(ctx ActionCtx) error {
return err
}
p.responseType = choices[s]
p.responseThreshold, err = promptDuration("response threshold (0 disabled)", p.responseThreshold)
if err != nil {
return err
}
}

if err = p.SignerParams.Edit(ctx); err != nil {
Expand Down Expand Up @@ -265,6 +272,8 @@ func (p *EditExportParams) Validate(ctx ActionCtx) error {
rt != jwt.ResponseTypeChunked {
return fmt.Errorf("unknown response type %q", p.responseType)
}
} else if p.responseThreshold != time.Duration(0) {
return errors.New("response threshold is only applicable to services")
}

if err = p.SignerParams.Resolve(ctx); err != nil {
Expand Down Expand Up @@ -313,6 +322,10 @@ func (p *EditExportParams) syncOptions(ctx ActionCtx) {
p.responseType = string(old.ResponseType)
}

if !(cmd.Flag("response-threshold").Changed) {
p.responseThreshold = old.ResponseThreshold
}

if !(cmd.Flag("description").Changed) {
p.description = old.Description
}
Expand Down Expand Up @@ -385,6 +398,8 @@ func (p *EditExportParams) Run(ctx ActionCtx) (store.Status, error) {
}
}

export.ResponseThreshold = p.responseThreshold

rt := jwt.ResponseType(p.responseType)
if old.ResponseType != rt {
export.ResponseType = rt
Expand Down
9 changes: 6 additions & 3 deletions cmd/editexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd

import (
"testing"
"time"

"github.com/nats-io/jwt/v2"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -48,14 +49,15 @@ func Test_EditExport_Latency(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ac.Exports[0].Latency)

_, _, err = ExecuteCmd(createEditExportCmd(), "--subject", "a", "--sampling", "100", "--latency", "lat")
_, _, err = ExecuteCmd(createEditExportCmd(), "--subject", "a", "--sampling", "100", "--latency", "lat", "--response-threshold", "1s")
require.NoError(t, err)

ac, err = ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.NotNil(t, ac.Exports[0].Latency)
require.Equal(t, jwt.SamplingRate(100), ac.Exports[0].Latency.Sampling)
require.Equal(t, jwt.Subject("lat"), ac.Exports[0].Latency.Results)
require.Equal(t, time.Second, ac.Exports[0].ResponseThreshold)
}

func Test_EditExportInteractive(t *testing.T) {
Expand All @@ -67,7 +69,7 @@ func Test_EditExportInteractive(t *testing.T) {
ts.AddExport(t, "A", jwt.Service, "b", true)

link := "http://foo/bar"
_, _, err := ExecuteInteractiveCmd(createEditExportCmd(), []interface{}{1, 1, "c", "c", false, false, 1, "desc", link})
_, _, err := ExecuteInteractiveCmd(createEditExportCmd(), []interface{}{1, 1, "c", "c", false, false, 1, "1s", "desc", link})
require.NoError(t, err)

ac, err := ts.Store.ReadAccountClaim("A")
Expand All @@ -77,6 +79,7 @@ func Test_EditExportInteractive(t *testing.T) {
require.EqualValues(t, jwt.ResponseTypeStream, ac.Exports[1].ResponseType)
require.Equal(t, ac.Exports[1].Description, "desc")
require.Equal(t, ac.Exports[1].InfoURL, link)
require.Equal(t, ac.Exports[1].ResponseThreshold, time.Second)
}

func Test_EditExportInteractiveLatency(t *testing.T) {
Expand All @@ -86,7 +89,7 @@ func Test_EditExportInteractiveLatency(t *testing.T) {
ts.AddAccount(t, "A")
ts.AddExport(t, "A", jwt.Service, "a", true)

_, _, err := ExecuteInteractiveCmd(createEditExportCmd(), []interface{}{0, 1, "c", "c", false, true, "header", "lat", 2, "", ""})
_, _, err := ExecuteInteractiveCmd(createEditExportCmd(), []interface{}{0, 1, "c", "c", false, true, "header", "lat", 2, "", "", ""})
require.NoError(t, err)

ac, err := ts.Store.ReadAccountClaim("A")
Expand Down

0 comments on commit b5998ae

Please sign in to comment.