diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 86ac40c3a18..553009be902 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -272,6 +272,7 @@ https://github.com/elastic/beats/compare/v5.0.2...v5.1.1[View commits] - Add add_cloud_metadata processor for collecting cloud provider metadata. {pull}2728[2728] - Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605] +- Add Tencent Cloud provider for add_cloud_metadata processor. {pull}4023[4023] *Metricbeat* diff --git a/libbeat/docs/processors-config.asciidoc b/libbeat/docs/processors-config.asciidoc index 277bd7ec685..4d005162745 100644 --- a/libbeat/docs/processors-config.asciidoc +++ b/libbeat/docs/processors-config.asciidoc @@ -266,11 +266,12 @@ The `add_cloud_metadata` processor enriches each event with instance metadata from the machine's hosting provider. At startup it will detect the hosting provider and cache the instance metadata. -Three cloud providers are supported. +The following cloud providers are supported: - Amazon Elastic Compute Cloud (EC2) - Digital Ocean - Google Compute Engine (GCE) +- https://www.qcloud.com/?lang=en[Tencent Cloud] (QCloud) The simple configuration below enables the processor. @@ -341,6 +342,22 @@ _GCE_ } ------------------------------------------------------------------------------- +_Tencent Clound_ + +[source,json] +------------------------------------------------------------------------------- +{ + "meta": { + "cloud": { + "availability_zone": "gz-azone2", + "instance_id": "ins-qcloudv5", + "provider": "qcloud", + "region": "china-south-gz" + } + } +} +------------------------------------------------------------------------------- + [[add-locale]] === add_locale diff --git a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go index 713750cc650..08b8645276c 100644 --- a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go +++ b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go @@ -31,10 +31,20 @@ const ( // Google GCE Metadata Service gceMetadataURI = "/computeMetadata/v1/?recursive=true&alt=json" + + // Tencent Clound Metadata Service + qcloudMetadataHost = "metadata.tencentyun.com" + qcloudMetadataInstanceIDURI = "/meta-data/instance-id" + qcloudMetadataRegionURI = "/meta-data/placement/region" + qcloudMetadataZoneURI = "/meta-data/placement/zone" + + // Default config + defaultTimeOut = 3 * time.Second ) var debugf = logp.MakeDebug("filters") +// metadata schemas for all prividers. var ( ec2Schema = func(m map[string]interface{}) common.MapStr { out, _ := s.Schema{ @@ -74,6 +84,10 @@ var ( return out } + + qcloudSchema = func(m map[string]interface{}) common.MapStr { + return common.MapStr(m) + } ) // init registers the add_cloud_metadata processor. @@ -81,71 +95,94 @@ func init() { processors.RegisterPlugin("add_cloud_metadata", newCloudMetadata) } -// result is the result of a query for a specific hosting provider's metadata. -type result struct { - provider string // Hosting provider type. - err error // Error that occurred while fetching (if any). - metadata common.MapStr // A specific subset of the metadata received from the hosting provider. -} +type schemaConv func(m map[string]interface{}) common.MapStr -func (r result) String() string { - return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", - r.provider, r.err, r.metadata) +// responseHandler is the callback function that used to write something +// to the result according the HTTP response. +type responseHandler func(all []byte, res *result) error + +type metadataFetcher struct { + provider string + headers map[string]string + responseHandlers map[string]responseHandler + conv schemaConv } -// fetchJSON query metadata from a hosting provider's metadata service. -func fetchJSON( +// fetchRaw queries raw metadata from a hosting provider's metadata service. +func (f *metadataFetcher) fetchRaw( ctx context.Context, - provider string, - headers map[string]string, - url string, - conv func(map[string]interface{}) common.MapStr, client http.Client, -) result { - result := result{provider: provider} - + url string, + responseHandler responseHandler, + result *result, +) { req, err := http.NewRequest("GET", url, nil) if err != nil { - result.err = errors.Wrapf(err, "failed to create http request for %v", provider) - return result + result.err = errors.Wrapf(err, "failed to create http request for %v", f.provider) + return } - for k, v := range headers { + for k, v := range f.headers { req.Header.Add(k, v) } req = req.WithContext(ctx) rsp, err := client.Do(req) if err != nil { - result.err = errors.Wrapf(err, "failed requesting %v metadata", provider) - return result + result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) + return } defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { result.err = errors.Errorf("failed with http status code %v", rsp.StatusCode) - return result + return } all, err := ioutil.ReadAll(rsp.Body) if err != nil { - result.err = errors.Wrapf(err, "failed requesting %v metadata", provider) - return result + result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) + return } // Decode JSON. - dec := json.NewDecoder(bytes.NewReader(all)) - dec.UseNumber() - err = dec.Decode(&result.metadata) + err = responseHandler(all, result) if err != nil { - result.err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) - return result + result.err = err + return + } + + return +} + +// fetchMetadata queries metadata from a hosting provider's metadata service. +// Some providers require multiple HTTP requests to gather the whole metadata, +// len(f.responseHandlers) > 1 indicates that multiple requests are needed. +func (f *metadataFetcher) fetchMetadata(ctx context.Context, client http.Client) result { + res := result{provider: f.provider, metadata: common.MapStr{}} + for url, responseHandler := range f.responseHandlers { + f.fetchRaw(ctx, client, url, responseHandler, &res) + if res.err != nil { + return res + } } // Apply schema. - result.metadata = conv(result.metadata) - result.metadata["provider"] = provider + res.metadata = f.conv(res.metadata) + res.metadata["provider"] = f.provider + + return res +} - return result +// result is the result of a query for a specific hosting provider's metadata. +type result struct { + provider string // Hosting provider type. + err error // Error that occurred while fetching (if any). + metadata common.MapStr // A specific subset of the metadata received from the hosting provider. +} + +func (r result) String() string { + return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", + r.provider, r.err, r.metadata) } // writeResult blocks until it can write the result r to the channel c or until @@ -163,7 +200,7 @@ func writeResult(ctx context.Context, c chan result, r result) error { // hosting providers supported by this processor. It wait for the results to // be returned or for a timeout to occur then returns the results that // completed in time. -func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result { +func fetchMetadata(metadataFetchers []*metadataFetcher, timeout time.Duration) *result { debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout) start := time.Now() defer func() { @@ -187,11 +224,13 @@ func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result defer cancel() c := make(chan result) - go func() { writeResult(ctx, c, fetchJSON(ctx, "digitalocean", nil, doURL, doSchema, client)) }() - go func() { writeResult(ctx, c, fetchJSON(ctx, "ec2", nil, ec2URL, ec2Schema, client)) }() - go func() { writeResult(ctx, c, fetchJSON(ctx, "gce", gceHeaders, gceURL, gceSchema, client)) }() + for _, fetcher := range metadataFetchers { + go func(fetcher *metadataFetcher) { + writeResult(ctx, c, fetcher.fetchMetadata(ctx, client)) + }(fetcher) + } - for i := 0; i < 3; i++ { + for i := 0; i < len(metadataFetchers); i++ { select { case result := <-c: debugf("add_cloud_metadata: received disposition for %v after %v. %v", @@ -209,30 +248,146 @@ func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result return nil } -type addCloudMetadata struct { - metadata common.MapStr +// getMetadataURLs loads config and generates the metadata URLs. +func getMetadataURLs(c common.Config, defaultHost string, metadataURIs []string) ([]string, error) { + var urls []string + config := struct { + MetadataHostAndPort string `config:"host"` // Specifies the host and port of the metadata service (for testing purposes only). + }{ + MetadataHostAndPort: defaultHost, + } + err := c.Unpack(&config) + if err != nil { + return urls, errors.Wrap(err, "failed to unpack add_cloud_metadata config") + } + for _, uri := range metadataURIs { + urls = append(urls, "http://"+config.MetadataHostAndPort+uri) + } + return urls, nil +} + +// makeJSONPicker returns a responseHandler function that unmarshals JSON +// from a hosting provider's HTTP response and writes it to the result. +func makeJSONPicker(provider string) responseHandler { + return func(all []byte, res *result) error { + dec := json.NewDecoder(bytes.NewReader(all)) + dec.UseNumber() + err := dec.Decode(&res.metadata) + if err != nil { + err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) + return err + } + return nil + } +} + +// newMetadataFetcher return metadataFetcher with one pass JSON responseHandler. +func newMetadataFetcher( + c common.Config, + provider string, + headers map[string]string, + host string, + conv schemaConv, + uri string, +) (*metadataFetcher, error) { + urls, err := getMetadataURLs(c, host, []string{uri}) + if err != nil { + return nil, err + } + responseHandlers := map[string]responseHandler{urls[0]: makeJSONPicker(provider)} + fetcher := &metadataFetcher{provider, headers, responseHandlers, conv} + return fetcher, nil +} + +func newDoMetadataFetcher(c common.Config) (*metadataFetcher, error) { + fetcher, err := newMetadataFetcher(c, "digitalocean", nil, metadataHost, doSchema, doMetadataURI) + return fetcher, err +} + +func newEc2MetadataFetcher(c common.Config) (*metadataFetcher, error) { + fetcher, err := newMetadataFetcher(c, "ec2", nil, metadataHost, ec2Schema, ec2InstanceIdentityURI) + return fetcher, err +} + +func newGceMetadataFetcher(c common.Config) (*metadataFetcher, error) { + fetcher, err := newMetadataFetcher(c, "gce", gceHeaders, metadataHost, gceSchema, gceMetadataURI) + return fetcher, err +} + +// newQcloudMetadataFetcher return the concrete metadata fetcher for qcloud provider +// which requires more than one way to assemble the metadata. +func newQcloudMetadataFetcher(c common.Config) (*metadataFetcher, error) { + urls, err := getMetadataURLs(c, qcloudMetadataHost, []string{ + qcloudMetadataInstanceIDURI, + qcloudMetadataRegionURI, + qcloudMetadataZoneURI, + }) + if err != nil { + return nil, err + } + responseHandlers := map[string]responseHandler{ + urls[0]: func(all []byte, result *result) error { + result.metadata["instance_id"] = string(all) + return nil + }, + urls[1]: func(all []byte, result *result) error { + result.metadata["region"] = string(all) + return nil + }, + urls[2]: func(all []byte, result *result) error { + result.metadata["availability_zone"] = string(all) + return nil + }, + } + fetcher := &metadataFetcher{"qcloud", nil, responseHandlers, qcloudSchema} + return fetcher, nil +} + +func setupFetchers(c common.Config) ([]*metadataFetcher, error) { + var fetchers []*metadataFetcher + doFetcher, err := newDoMetadataFetcher(c) + if err != nil { + return fetchers, err + } + ec2Fetcher, err := newEc2MetadataFetcher(c) + if err != nil { + return fetchers, err + } + gceFetcher, err := newGceMetadataFetcher(c) + if err != nil { + return fetchers, err + } + qcloudFetcher, err := newQcloudMetadataFetcher(c) + if err != nil { + return fetchers, err + } + + fetchers = []*metadataFetcher{ + doFetcher, + ec2Fetcher, + gceFetcher, + qcloudFetcher, + } + return fetchers, nil } func newCloudMetadata(c common.Config) (processors.Processor, error) { config := struct { - MetadataHostAndPort string `config:"host"` // Specifies the host and port of the metadata service (for testing purposes only). - Timeout time.Duration `config:"timeout"` // Amount of time to wait for responses from the metadata services. + Timeout time.Duration `config:"timeout"` // Amount of time to wait for responses from the metadata services. }{ - MetadataHostAndPort: metadataHost, - Timeout: 3 * time.Second, + Timeout: defaultTimeOut, } err := c.Unpack(&config) if err != nil { return nil, errors.Wrap(err, "failed to unpack add_cloud_metadata config") } - var ( - doURL = "http://" + config.MetadataHostAndPort + doMetadataURI - ec2URL = "http://" + config.MetadataHostAndPort + ec2InstanceIdentityURI - gceURL = "http://" + config.MetadataHostAndPort + gceMetadataURI - ) + fetchers, err := setupFetchers(c) + if err != nil { + return nil, err + } - result := fetchMetadata(doURL, ec2URL, gceURL, config.Timeout) + result := fetchMetadata(fetchers, config.Timeout) if result == nil { logp.Info("add_cloud_metadata: hosting provider type not detected.") return addCloudMetadata{}, nil @@ -244,6 +399,10 @@ func newCloudMetadata(c common.Config) (processors.Processor, error) { return addCloudMetadata{metadata: result.metadata}, nil } +type addCloudMetadata struct { + metadata common.MapStr +} + func (p addCloudMetadata) Run(event common.MapStr) (common.MapStr, error) { if len(p.metadata) == 0 { return event, nil diff --git a/libbeat/processors/add_cloud_metadata/add_cloud_metadata_test.go b/libbeat/processors/add_cloud_metadata/add_cloud_metadata_test.go index c4362d9ba42..b79e879abdb 100644 --- a/libbeat/processors/add_cloud_metadata/add_cloud_metadata_test.go +++ b/libbeat/processors/add_cloud_metadata/add_cloud_metadata_test.go @@ -194,6 +194,25 @@ func initGCETestServer() *httptest.Server { })) } +func initQCloundTestServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI == "/meta-data/instance-id" { + w.Write([]byte("ins-qcloudv5")) + return + } + if r.RequestURI == "/meta-data/placement/region" { + w.Write([]byte("china-south-gz")) + return + } + if r.RequestURI == "/meta-data/placement/zone" { + w.Write([]byte("gz-azone2")) + return + } + + http.Error(w, "not found", http.StatusNotFound) + })) +} + func TestRetrieveAWSMetadata(t *testing.T) { if testing.Verbose() { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) @@ -308,3 +327,42 @@ func TestRetrieveGCEMetadata(t *testing.T) { } assert.Equal(t, expected, actual) } + +func TestRetrieveQCloudMetadata(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + server := initQCloundTestServer() + defer server.Close() + + config, err := common.NewConfigFrom(map[string]interface{}{ + "host": server.Listener.Addr().String(), + }) + + if err != nil { + t.Fatal(err) + } + + p, err := newCloudMetadata(*config) + if err != nil { + t.Fatal(err) + } + + actual, err := p.Run(common.MapStr{}) + if err != nil { + t.Fatal(err) + } + + expected := common.MapStr{ + "meta": common.MapStr{ + "cloud": common.MapStr{ + "provider": "qcloud", + "instance_id": "ins-qcloudv5", + "region": "china-south-gz", + "availability_zone": "gz-azone2", + }, + }, + } + assert.Equal(t, expected, actual) +}