Skip to content

Commit

Permalink
feat/groundwork-output: improve plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavlo Sumkin committed Jul 19, 2022
1 parent 04536a8 commit ff72d34
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 98 deletions.
29 changes: 18 additions & 11 deletions plugins/outputs/groundwork/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ GW8+

## List of tags used by the plugin

* group - to define the name of the group you want to monitor, can be changed
with config.
* host - to define the name of the host you want to monitor, can be changed with
config.
* service - to define the name of the service you want to monitor.
* status - to define the status of the service. Supported statuses:
* __group__ - to define the name of the group you want to monitor,
can be changed with config.
* __host__ - to define the name of the host you want to monitor,
can be changed with config.
* __service__ - to define the name of the service you want to monitor.
* __status__ - to define the status of the service. Supported statuses:
"SERVICE_OK", "SERVICE_WARNING", "SERVICE_UNSCHEDULED_CRITICAL",
"SERVICE_PENDING", "SERVICE_SCHEDULED_CRITICAL", "SERVICE_UNKNOWN".
* message - to provide any message you want, it overrides message field value.
* unitType - to use in monitoring contexts(subset of The Unified Code for Units
of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* warning - to define warning threshold value.
* critical - to define critical threshold value.
* __message__ - to provide any message you want,
it overrides __message__ field value.
* __unitType__ - to use in monitoring contexts (subset of The Unified Code for
Units of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* __critical__ - to define the default critical threshold value,
it overrides value_cr field value.
* __warning__ - to define the default warning threshold value,
it overrides value_wn field value.
* __value_cr__ - to define critical threshold value,
it overrides __critical__ tag value and __value_cr__ field value.
* __value_wn__ - to define warning threshold value,
it overrides __warning__ tag value and __value_wn__ field value.

## NOTE

Expand Down
181 changes: 99 additions & 82 deletions plugins/outputs/groundwork/groundwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/gwos/tcg/sdk/clients"
"github.com/gwos/tcg/sdk/logper"
Expand Down Expand Up @@ -48,28 +49,28 @@ func (*Groundwork) SampleConfig() string {

func (g *Groundwork) Init() error {
if g.Server == "" {
return errors.New("no 'url' provided")
return errors.New(`no "url" provided`)
}
if g.AgentID == "" {
return errors.New("no 'agent_id' provided")
return errors.New(`no "agent_id" provided`)
}
if g.Username == "" {
return errors.New("no 'username' provided")
return errors.New(`no "username" provided`)
}
if g.Password == "" {
return errors.New("no 'password' provided")
return errors.New(`no "password" provided`)
}
if g.DefaultAppType == "" {
return errors.New("no 'default_app_type' provided")
return errors.New(`no "default_app_type" provided`)
}
if g.DefaultHost == "" {
return errors.New("no 'default_host' provided")
return errors.New(`no "default_host" provided`)
}
if g.ResourceTag == "" {
return errors.New("no 'resource_tag' provided")
return errors.New(`no "resource_tag" provided`)
}
if !validStatus(g.DefaultServiceState) {
return errors.New("invalid 'default_service_state' provided")
return errors.New(`invalid "default_service_state" provided`)
}

g.client = clients.GWClient{
Expand Down Expand Up @@ -214,40 +215,18 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M
group, _ := metric.GetTag(g.GroupTag)

resource := g.DefaultHost
if value, present := metric.GetTag(g.ResourceTag); present {
resource = value
if v, ok := metric.GetTag(g.ResourceTag); ok {
resource = v
}

service := metric.Name()
if value, present := metric.GetTag("service"); present {
service = value
}

status := g.DefaultServiceState
value, statusPresent := metric.GetTag("status")
if validStatus(value) {
status = value
if v, ok := metric.GetTag("service"); ok {
service = v
}

unitType := string(transit.UnitCounter)
if value, present := metric.GetTag("unitType"); present {
unitType = value
}

var critical float64
value, criticalPresent := metric.GetTag("critical")
if criticalPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
critical = s
}
}

var warning float64
value, warningPresent := metric.GetTag("warning")
if warningPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
warning = s
}
if v, ok := metric.GetTag("unitType"); ok {
unitType = v
}

lastCheckTime := transit.NewTimestamp()
Expand All @@ -259,85 +238,123 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M
Owner: resource,
},
MonitoredInfo: transit.MonitoredInfo{
Status: transit.MonitorStatus(status),
Status: transit.MonitorStatus(g.DefaultServiceState),
LastCheckTime: lastCheckTime,
NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins
},
Metrics: nil,
}

for _, value := range metric.FieldList() {
if value.Key == "message" {
switch m := value.Value.(type) {
case string:
serviceObject.LastPluginOutput = m
case []byte:
serviceObject.LastPluginOutput = string(m)
default:
serviceObject.LastPluginOutput = fmt.Sprintf("%v", m)
}
for _, field := range metric.FieldList() {
if field.Key == "message" ||
strings.HasSuffix(field.Key, "_cr") ||
strings.HasSuffix(field.Key, "_wn") {
continue
}

switch value.Value.(type) {
switch field.Value.(type) {
case string, []byte:
g.Log.Warnf("string values are not supported, skipping field %s: %q", value.Key, value.Value)
g.Log.Warnf("string values are not supported, skipping field %s: %q", field.Key, field.Value)
continue
}

typedValue := transit.NewTypedValue(value.Value)
typedValue := transit.NewTypedValue(field.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", value.Value, value.Key, value.Value)
g.Log.Warnf("could not convert type %T, skipping field %s: %v", field.Value, field.Key, field.Value)
continue
}

var thresholds []transit.ThresholdValue
if warningPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: value.Key + "_wn",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &warning,
},
})
addCriticalThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: field.Key + "_cr",
Value: tv,
})
}
}
if criticalPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: value.Key + "_cr",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &critical,
},
})
addWarningThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: field.Key + "_wn",
Value: tv,
})
}
}
if v, ok := metric.GetTag(field.Key + "_cr"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetTag("critical"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_cr"); ok {
addCriticalThreshold(v)
}
if v, ok := metric.GetTag(field.Key + "_wn"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetTag("warning"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_wn"); ok {
addWarningThreshold(v)
}

serviceObject.Metrics = append(serviceObject.Metrics, transit.TimeSeries{
MetricName: value.Key,
MetricName: field.Key,
SampleType: transit.Value,
Interval: &transit.TimeInterval{
EndTime: lastCheckTime,
},
Interval: &transit.TimeInterval{EndTime: lastCheckTime},
Value: typedValue,
Unit: transit.UnitType(unitType),
Thresholds: thresholds,
})
}

if !statusPresent {
serviceStatus, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if err != nil {
g.Log.Infof("could not calculate service status, reverting to default_service_state: %v", err)
serviceObject.Status = transit.MonitorStatus(g.DefaultServiceState)
}
serviceObject.Status = serviceStatus
}

if m, ok := metric.GetTag("message"); ok {
serviceObject.LastPluginOutput = m
} else if m, ok := metric.GetField("message"); ok {
switch m := m.(type) {
case string:
serviceObject.LastPluginOutput = m
case []byte:
serviceObject.LastPluginOutput = string(m)
default:
serviceObject.LastPluginOutput = fmt.Sprintf("%v", m)
}
}

func() {
if s, ok := metric.GetTag("status"); ok && validStatus(s) {
serviceObject.Status = transit.MonitorStatus(s)
return
}
if s, ok := metric.GetField("status"); ok {
status := g.DefaultServiceState
switch s := s.(type) {
case string:
status = s
case []byte:
status = string(s)
}
if validStatus(status) {
serviceObject.Status = transit.MonitorStatus(status)
return
}
}
status, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if err != nil {
g.Log.Infof("could not calculate service status, reverting to default_service_state: %v", err)
status = transit.MonitorStatus(g.DefaultServiceState)
}
serviceObject.Status = status
}()

return metricMeta{resource: resource, group: group}, &serviceObject, nil
}

Expand Down
28 changes: 23 additions & 5 deletions plugins/outputs/groundwork/groundwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestWriteWithDefaults(t *testing.T) {
require.Equal(t, defaultTestAgentID, obj.Context.AgentID)
require.Equal(t, customAppType, obj.Context.AppType)
require.Equal(t, defaultHost, obj.Resources[0].Name)
require.Equal(t, transit.MonitorStatus("SERVICE_OK"), obj.Resources[0].Services[0].Status)
require.Equal(t, "IntMetric", obj.Resources[0].Services[0].Name)
require.Equal(t, int64(42), *obj.Resources[0].Services[0].Metrics[0].Value.IntegerValue)
require.Equal(t, 0, len(obj.Groups))
Expand Down Expand Up @@ -73,7 +74,10 @@ func TestWriteWithDefaults(t *testing.T) {
func TestWriteWithFields(t *testing.T) {
// Generate test metric with fields to test Write logic
floatMetric := testutil.TestMetric(1.0, "FloatMetric")
floatMetric.AddField("value_cr", 3.0)
floatMetric.AddField("value_wn", 2.0)
floatMetric.AddField("message", "Test Message")
floatMetric.AddField("status", "SERVICE_WARNING")

// Simulate Groundwork server that should receive custom metrics
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -86,7 +90,11 @@ func TestWriteWithFields(t *testing.T) {
require.NoError(t, err)

// Check if server gets proper data
require.Equal(t, "Test Message", obj.Resources[0].Services[0].MonitoredInfo.LastPluginOutput)
require.Equal(t, "Test Message", obj.Resources[0].Services[0].LastPluginOutput)
require.Equal(t, transit.MonitorStatus("SERVICE_WARNING"), obj.Resources[0].Services[0].Status)
require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, float64(3.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue)
require.Equal(t, float64(2.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue)

_, err = fmt.Fprintln(w, "OK")
require.NoError(t, err)
Expand Down Expand Up @@ -118,10 +126,17 @@ func TestWriteWithFields(t *testing.T) {
func TestWriteWithTags(t *testing.T) {
// Generate test metric with tags to test Write logic
floatMetric := testutil.TestMetric(1.0, "FloatMetric")
floatMetric.AddField("value_cr", 3.0)
floatMetric.AddField("value_wn", 2.0)
floatMetric.AddField("message", "Test Message")
floatMetric.AddField("status", "SERVICE_WARNING")
floatMetric.AddTag("value_cr", "9.0")
floatMetric.AddTag("value_wn", "6.0")
floatMetric.AddTag("message", "Test Tag")
floatMetric.AddTag("host", "Host01")
floatMetric.AddTag("status", "SERVICE_PENDING")
floatMetric.AddTag("group", "Group01")
floatMetric.AddTag("host", "Host01")
floatMetric.AddTag("service", "Service01")

// Simulate Groundwork server that should receive custom metrics
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -137,11 +152,14 @@ func TestWriteWithTags(t *testing.T) {
require.Equal(t, defaultTestAgentID, obj.Context.AgentID)
require.Equal(t, defaultAppType, obj.Context.AppType)
require.Equal(t, "Host01", obj.Resources[0].Name)
require.Equal(t, "FloatMetric", obj.Resources[0].Services[0].Name)
require.Equal(t, 1.0, *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, "Service01", obj.Resources[0].Services[0].Name)
require.Equal(t, "Group01", obj.Groups[0].GroupName)
require.Equal(t, "Host01", obj.Groups[0].Resources[0].Name)
require.Equal(t, "Test Tag", obj.Resources[0].Services[0].MonitoredInfo.LastPluginOutput)
require.Equal(t, "Test Tag", obj.Resources[0].Services[0].LastPluginOutput)
require.Equal(t, transit.MonitorStatus("SERVICE_PENDING"), obj.Resources[0].Services[0].Status)
require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, float64(9.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue)
require.Equal(t, float64(6.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue)

_, err = fmt.Fprintln(w, "OK")
require.NoError(t, err)
Expand Down

0 comments on commit ff72d34

Please sign in to comment.