From fc8ef3bf3ac8bdc0a3f70b572b15d3b0ac050f74 Mon Sep 17 00:00:00 2001 From: Alvaro Morales Date: Tue, 4 Aug 2015 14:48:13 -0700 Subject: [PATCH 1/2] Add httpjson plugin --- plugins/all/all.go | 1 + plugins/httpjson/httpjson.go | 110 +++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 plugins/httpjson/httpjson.go diff --git a/plugins/all/all.go b/plugins/all/all.go index bc610c39c4621..8670c3d8ba6fa 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" + _ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go new file mode 100644 index 0000000000000..90af0c1a79e46 --- /dev/null +++ b/plugins/httpjson/httpjson.go @@ -0,0 +1,110 @@ +package httpjson + +import ( + "encoding/json" + "fmt" + "github.com/bitly/go-simplejson" + "github.com/influxdb/telegraf/plugins" + "net/http" + "sync" +) + +type HttpJson struct { + Servers []string + Measurements map[string]string + Method string + Foo string + client *http.Client +} + +var sampleConfig = ` +# stats url endpoint +servers = ["http://localhost:5000"] + +# a name for server(s) +foo = "mycluster" + +# HTTP method (GET or POST) +method = "GET" + +# Map of key transforms # TODO describe +[httpjson.measurements] +stats_measurements_measurement = "my_measurement" +` + +func (h *HttpJson) SampleConfig() string { + return sampleConfig +} + +func (h *HttpJson) Description() string { + return "Read flattened metrics from one or more JSON HTTP endpoints" +} + +func (h *HttpJson) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + var outerr error + + for _, server := range h.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + outerr = h.gatherServer(server, acc) + }(server) + } + + wg.Wait() + + return outerr +} + +func (h *HttpJson) gatherServer(url string, acc plugins.Accumulator) error { + r, err := h.client.Get(url) + if err != nil { + return err + } + + if r.StatusCode != http.StatusOK { + return fmt.Errorf("httpjson: server '%s' responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + } + + response, err := simplejson.NewFromReader(r.Body) + + if err != nil { + return err + } + + tags := map[string]string{ + "server": url, + } + + return parseResponse(acc, h.Foo, tags, response.Interface(), h.Measurements) +} + +func parseResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}, measurements map[string]string) error { + switch t := v.(type) { + case map[string]interface{}: + for k, v := range t { + if err := parseResponse(acc, prefix+"_"+k, tags, v, measurements); err != nil { + return err + } + } + case json.Number: + if transform, ok := measurements[prefix]; ok { + prefix = transform + } + acc.Add(prefix, t, tags) + case bool, string, []interface{}: + // ignored types + return nil + default: + return fmt.Errorf("httpjson: got unexpected type %T with value %v (%s)", t, v, prefix) + } + return nil +} + +func init() { + plugins.Add("httpjson", func() plugins.Plugin { + return &HttpJson{client: http.DefaultClient} + }) +} From 52b8b1ec9699cf52133b49441583b767425d7ca0 Mon Sep 17 00:00:00 2001 From: Josh Palay Date: Fri, 14 Aug 2015 14:33:51 -0700 Subject: [PATCH 2/2] Modifications to httpjson plugin --- plugins/httpjson/httpjson.go | 204 ++++++++++++++++++++++-------- plugins/httpjson/httpjson_test.go | 187 +++++++++++++++++++++++++++ 2 files changed, 341 insertions(+), 50 deletions(-) create mode 100644 plugins/httpjson/httpjson_test.go diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go index 90af0c1a79e46..16b232f890c7b 100644 --- a/plugins/httpjson/httpjson.go +++ b/plugins/httpjson/httpjson.go @@ -2,34 +2,69 @@ package httpjson import ( "encoding/json" + "errors" "fmt" - "github.com/bitly/go-simplejson" - "github.com/influxdb/telegraf/plugins" + "io/ioutil" "net/http" + "net/url" + "strings" "sync" + + "github.com/influxdb/telegraf/plugins" ) type HttpJson struct { - Servers []string - Measurements map[string]string - Method string - Foo string - client *http.Client + Services []Service + client HTTPClient +} + +type Service struct { + Name string + Servers []string + Method string + Parameters map[string]string +} + +type HTTPClient interface { + // Returns the result of an http request + // + // Parameters: + // req: HTTP request object + // + // Returns: + // http.Response: HTTP respons object + // error : Any error that may have occurred + MakeRequest(req *http.Request) (*http.Response, error) +} + +type RealHTTPClient struct { + client *http.Client +} + +func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) } var sampleConfig = ` -# stats url endpoint -servers = ["http://localhost:5000"] +# Specify services via an array of tables +[[httpjson.services]] + + # a name for the service being polled + name = "webserver_stats" -# a name for server(s) -foo = "mycluster" + # URL of each server in the service's cluster + servers = [ + "http://localhost:9999/stats/", + "http://localhost:9998/stats/", + ] -# HTTP method (GET or POST) -method = "GET" + # HTTP method to use (case-sensitive) + method = "GET" -# Map of key transforms # TODO describe -[httpjson.measurements] -stats_measurements_measurement = "my_measurement" + # HTTP parameters (all values must be strings) + [httpjson.services.parameters] + event_type = "cpu_spike" + threshold = "0.75" ` func (h *HttpJson) SampleConfig() string { @@ -40,71 +75,140 @@ func (h *HttpJson) Description() string { return "Read flattened metrics from one or more JSON HTTP endpoints" } +// Gathers data for all servers. func (h *HttpJson) Gather(acc plugins.Accumulator) error { var wg sync.WaitGroup - var outerr error - - for _, server := range h.Servers { - wg.Add(1) - go func(server string) { - defer wg.Done() - outerr = h.gatherServer(server, acc) - }(server) + totalServers := 0 + for _, service := range h.Services { + totalServers += len(service.Servers) + } + errorChannel := make(chan error, totalServers) + + for _, service := range h.Services { + for _, server := range service.Servers { + wg.Add(1) + go func(service Service, server string) { + defer wg.Done() + if err := h.gatherServer(acc, service, server); err != nil { + errorChannel <- err + } + }(service, server) + } } wg.Wait() + close(errorChannel) - return outerr + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) } -func (h *HttpJson) gatherServer(url string, acc plugins.Accumulator) error { - r, err := h.client.Get(url) +// Gathers data from a particular server +// Parameters: +// acc : The telegraf Accumulator to use +// serverURL: endpoint to send request to +// service : the service being queried +// +// Returns: +// error: Any error that may have occurred +func (h *HttpJson) gatherServer(acc plugins.Accumulator, service Service, serverURL string) error { + resp, err := h.sendRequest(service, serverURL) if err != nil { return err } - if r.StatusCode != http.StatusOK { - return fmt.Errorf("httpjson: server '%s' responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + var jsonOut interface{} + if err = json.Unmarshal([]byte(resp), &jsonOut); err != nil { + return errors.New("Error decoding JSON response") + } + + tags := map[string]string{ + "server": serverURL, } - response, err := simplejson.NewFromReader(r.Body) + processResponse(acc, service.Name, tags, jsonOut) + return nil +} +// Sends an HTTP request to the server using the HttpJson object's HTTPClient +// Parameters: +// serverURL: endpoint to send request to +// +// Returns: +// string: body of the response +// error : Any error that may have occurred +func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error) { + // Prepare URL + requestURL, err := url.Parse(serverURL) if err != nil { - return err + return "", fmt.Errorf("Invalid server URL \"%s\"", serverURL) } - tags := map[string]string{ - "server": url, + params := url.Values{} + for k, v := range service.Parameters { + params.Add(k, v) + } + requestURL.RawQuery = params.Encode() + + // Create + send request + req, err := http.NewRequest(service.Method, requestURL.String(), nil) + if err != nil { + return "", err + } + + resp, err := h.client.MakeRequest(req) + if err != nil { + return "", err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return string(body), err } - return parseResponse(acc, h.Foo, tags, response.Interface(), h.Measurements) + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestURL.String(), + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return string(body), err + } + + return string(body), err } -func parseResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}, measurements map[string]string) error { +// Flattens the map generated from the JSON object and stores its float values using a +// plugins.Accumulator. It ignores any non-float values. +// Parameters: +// acc: the Accumulator to use +// prefix: What the name of the measurement name should be prefixed by. +// tags: telegraf tags to +func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) { switch t := v.(type) { case map[string]interface{}: for k, v := range t { - if err := parseResponse(acc, prefix+"_"+k, tags, v, measurements); err != nil { - return err - } - } - case json.Number: - if transform, ok := measurements[prefix]; ok { - prefix = transform + processResponse(acc, prefix+"_"+k, tags, v) } - acc.Add(prefix, t, tags) - case bool, string, []interface{}: - // ignored types - return nil - default: - return fmt.Errorf("httpjson: got unexpected type %T with value %v (%s)", t, v, prefix) + case float64: + acc.Add(prefix, v, tags) } - return nil } func init() { plugins.Add("httpjson", func() plugins.Plugin { - return &HttpJson{client: http.DefaultClient} + return &HttpJson{client: RealHTTPClient{client: &http.Client{}}} }) } diff --git a/plugins/httpjson/httpjson_test.go b/plugins/httpjson/httpjson_test.go new file mode 100644 index 0000000000000..9e9b7d2b7ca92 --- /dev/null +++ b/plugins/httpjson/httpjson_test.go @@ -0,0 +1,187 @@ +package httpjson + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const validJSON = ` + { + "parent": { + "child": 3, + "ignored_child": "hi" + }, + "ignored_null": null, + "integer": 4, + "ignored_list": [3, 4], + "ignored_parent": { + "another_ignored_list": [4], + "another_ignored_null": null, + "ignored_string": "hello, world!" + } + }` + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +type mockHTTPClient struct { + responseBody string + statusCode int +} + +// Mock implementation of MakeRequest. Usually returns an http.Response with +// hard-coded responseBody and statusCode. However, if the request uses a +// nonstandard method, it uses status code 405 (method not allowed) +func (c mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + + // basic error checking on request method + allowedMethods := []string{"GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"} + methodValid := false + for _, method := range allowedMethods { + if req.Method == method { + methodValid = true + break + } + } + + if !methodValid { + resp.StatusCode = 405 // Method not allowed + } + + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genMockHttpJson(response string, statusCode int) *HttpJson { + return &HttpJson{ + client: mockHTTPClient{responseBody: response, statusCode: statusCode}, + Services: []Service{ + Service{ + Servers: []string{ + "http://server1.example.com/metrics/", + "http://server2.example.com/metrics/", + }, + Name: "my_webapp", + Method: "GET", + Parameters: map[string]string{ + "httpParam1": "12", + "httpParam2": "the second parameter", + }, + }, + Service{ + Servers: []string{ + "http://server1.example.com/metrics/", + "http://server2.example.com/metrics/", + }, + Name: "other_webapp", + Method: "POST", + Parameters: map[string]string{ + "httpParam1": "12", + "httpParam2": "the second parameter", + }, + }, + }, + } +} + +// Test that the proper values are ignored or collected +func TestHttpJson200(t *testing.T) { + httpjson := genMockHttpJson(validJSON, 200) + + var acc testutil.Accumulator + err := httpjson.Gather(&acc) + require.NoError(t, err) + + assert.Equal(t, 8, len(acc.Points)) + + for _, service := range httpjson.Services { + for _, srv := range service.Servers { + require.NoError(t, + acc.ValidateTaggedValue( + fmt.Sprintf("%s_parent_child", service.Name), + 3.0, + map[string]string{"server": srv}, + ), + ) + require.NoError(t, + acc.ValidateTaggedValue( + fmt.Sprintf("%s_integer", service.Name), + 4.0, + map[string]string{"server": srv}, + ), + ) + } + } +} + +// Test response to HTTP 500 +func TestHttpJson500(t *testing.T) { + httpjson := genMockHttpJson(validJSON, 500) + + var acc testutil.Accumulator + err := httpjson.Gather(&acc) + + assert.NotNil(t, err) + // 4 error lines for (2 urls) * (2 services) + assert.Equal(t, len(strings.Split(err.Error(), "\n")), 4) + assert.Equal(t, 0, len(acc.Points)) +} + +// Test response to HTTP 405 +func TestHttpJsonBadMethod(t *testing.T) { + httpjson := genMockHttpJson(validJSON, 200) + httpjson.Services[0].Method = "NOT_A_REAL_METHOD" + + var acc testutil.Accumulator + err := httpjson.Gather(&acc) + + assert.NotNil(t, err) + // 2 error lines for (2 urls) * (1 falied service) + assert.Equal(t, len(strings.Split(err.Error(), "\n")), 2) + + // (2 measurements) * (2 servers) * (1 successful service) + assert.Equal(t, 4, len(acc.Points)) +} + +// Test response to malformed JSON +func TestHttpJsonBadJson(t *testing.T) { + httpjson := genMockHttpJson(invalidJSON, 200) + + var acc testutil.Accumulator + err := httpjson.Gather(&acc) + + assert.NotNil(t, err) + // 4 error lines for (2 urls) * (2 services) + assert.Equal(t, len(strings.Split(err.Error(), "\n")), 4) + assert.Equal(t, 0, len(acc.Points)) +} + +// Test response to empty string as response objectgT +func TestHttpJsonEmptyResponse(t *testing.T) { + httpjson := genMockHttpJson(empty, 200) + + var acc testutil.Accumulator + err := httpjson.Gather(&acc) + + assert.NotNil(t, err) + // 4 error lines for (2 urls) * (2 services) + assert.Equal(t, len(strings.Split(err.Error(), "\n")), 4) + assert.Equal(t, 0, len(acc.Points)) +}