-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Tencent Cloud provider for add_cloud_metadata proccessor #4023
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -266,11 +266,12 @@ The `add_cloud_metadata` processor enriches each event with instance metadata | |
from the machine's hosting provider. At startup it will detect the hosting | ||
provider and cache the instance metadata. | ||
|
||
Three cloud providers are supported. | ||
Four cloud providers are supported. | ||
|
||
- Amazon Elastic Compute Cloud (EC2) | ||
- Digital Ocean | ||
- Google Compute Engine (GCE) | ||
- [Tencent Cloud](https://www.qcloud.com/?lang=en) (QCloud) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe this is the proper format for a hyperlink in asciidoc. I think the format is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦♂️ a clumsy product placement 😢 |
||
|
||
The simple configuration below enables the processor. | ||
|
||
|
@@ -341,6 +342,22 @@ _GCE_ | |
} | ||
------------------------------------------------------------------------------- | ||
|
||
_Tencent Clound_ | ||
|
||
[source,json] | ||
------------------------------------------------------------------------------- | ||
{ | ||
"meta": { | ||
"cloud": { | ||
"availability_zone": "gz-azone2", | ||
"instance_id": "ins-qcloudv5", | ||
"provider": "qcloud", | ||
"region": "china-south-gz" | ||
} | ||
} | ||
} | ||
------------------------------------------------------------------------------- | ||
|
||
[[add-locale]] | ||
=== add_locale | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,10 +31,17 @@ const ( | |
|
||
// Google GCE Metadata Service | ||
gceMetadataURI = "/computeMetadata/v1/?recursive=true&alt=json" | ||
|
||
// Tencent Clound Metadata Service | ||
qcloudMetadataHost = "metadata.tencentyun.com" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we are starting to add more cloud providers it would be nice to have each provider in its own file. This makes it easier to spot which parts are specific to a provider and which is the shared code. Just a general note, could also be done in a follow up PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea, I will apply it when implement AliCloud metadata in next PR ;) |
||
qcloudMetadataInstanceIdURI = "/meta-data/instance-id" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [golint] reported by reviewdog 🐶 |
||
qcloudMetadataRegionURI = "/meta-data/placement/region" | ||
qcloudMetadataZoneURI = "/meta-data/placement/zone" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know if they have versioned endpoints that we can point to (like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK, there is no plan for Tentcent Qcloud endpoints to support |
||
) | ||
|
||
var debugf = logp.MakeDebug("filters") | ||
|
||
// metadata schemas for all prividers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var ( | ||
ec2Schema = func(m map[string]interface{}) common.MapStr { | ||
out, _ := s.Schema{ | ||
|
@@ -74,78 +81,108 @@ var ( | |
|
||
return out | ||
} | ||
|
||
qcloudSchema = func(m map[string]interface{}) common.MapStr { | ||
out, _ := s.Schema{ | ||
"instance_id": c.Str("instance_id"), | ||
"region": c.Str("region"), | ||
"availability_zone": c.Str("zone"), | ||
}.Apply(m) | ||
return out | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like this doesn't even need to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
) | ||
|
||
// init registers the add_cloud_metadata processor. | ||
func init() { | ||
processors.RegisterPlugin("add_cloud_metadata", newCloudMetadata) | ||
} | ||
|
||
// result is the result of a query for a specific hosting provider's metadata. | ||
type result struct { | ||
provider string // Hosting provider type. | ||
err error // Error that occurred while fetching (if any). | ||
metadata common.MapStr // A specific subset of the metadata received from the hosting provider. | ||
} | ||
type schemaConv func(m map[string]interface{}) common.MapStr | ||
|
||
func (r result) String() string { | ||
return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", | ||
r.provider, r.err, r.metadata) | ||
type pick func([]byte, *result) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a godoc comment describing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, change applied. |
||
|
||
type metadataFetcher struct { | ||
provider string | ||
headers map[string]string | ||
pickers map[string]pick | ||
conv schemaConv | ||
} | ||
|
||
// fetchJSON query metadata from a hosting provider's metadata service. | ||
func fetchJSON( | ||
// fetchRaw query raw metadata from a hosting provider's metadata service. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func (f *metadataFetcher) fetchRaw( | ||
ctx context.Context, | ||
provider string, | ||
headers map[string]string, | ||
url string, | ||
conv func(map[string]interface{}) common.MapStr, | ||
client http.Client, | ||
) result { | ||
result := result{provider: provider} | ||
|
||
url string, | ||
pick pick, | ||
result *result, | ||
) { | ||
req, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
result.err = errors.Wrapf(err, "failed to create http request for %v", provider) | ||
return result | ||
result.err = errors.Wrapf(err, "failed to create http request for %v", f.provider) | ||
return | ||
} | ||
for k, v := range headers { | ||
for k, v := range f.headers { | ||
req.Header.Add(k, v) | ||
} | ||
req = req.WithContext(ctx) | ||
|
||
rsp, err := client.Do(req) | ||
if err != nil { | ||
result.err = errors.Wrapf(err, "failed requesting %v metadata", provider) | ||
return result | ||
result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) | ||
return | ||
} | ||
defer rsp.Body.Close() | ||
|
||
if rsp.StatusCode != http.StatusOK { | ||
result.err = errors.Errorf("failed with http status code %v", rsp.StatusCode) | ||
return result | ||
return | ||
} | ||
|
||
all, err := ioutil.ReadAll(rsp.Body) | ||
if err != nil { | ||
result.err = errors.Wrapf(err, "failed requesting %v metadata", provider) | ||
return result | ||
result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) | ||
return | ||
} | ||
|
||
// Decode JSON. | ||
dec := json.NewDecoder(bytes.NewReader(all)) | ||
dec.UseNumber() | ||
err = dec.Decode(&result.metadata) | ||
err = pick(all, result) | ||
if err != nil { | ||
result.err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) | ||
return result | ||
result.err = err | ||
return | ||
} | ||
|
||
return | ||
} | ||
|
||
// fetchMetadata query metadata from a hosting provider's metadata service. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// some providers require multiple HTTP request to gather the whole metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// len(f.pickers) > 1 indicates that multiple requesting is needed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func (f *metadataFetcher) fetchMetadata(ctx context.Context, client http.Client) result { | ||
res := result{provider: f.provider, metadata: common.MapStr{}} | ||
for url, pick := range f.pickers { | ||
f.fetchRaw(ctx, client, url, pick, &res) | ||
if res.err != nil { | ||
return res | ||
} | ||
} | ||
|
||
// Apply schema. | ||
result.metadata = conv(result.metadata) | ||
result.metadata["provider"] = provider | ||
res.metadata = f.conv(res.metadata) | ||
res.metadata["provider"] = f.provider | ||
|
||
return res | ||
} | ||
|
||
return result | ||
// result is the result of a query for a specific hosting provider's metadata. | ||
type result struct { | ||
provider string // Hosting provider type. | ||
err error // Error that occurred while fetching (if any). | ||
metadata common.MapStr // A specific subset of the metadata received from the hosting provider. | ||
} | ||
|
||
func (r result) String() string { | ||
return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", | ||
r.provider, r.err, r.metadata) | ||
} | ||
|
||
// writeResult blocks until it can write the result r to the channel c or until | ||
|
@@ -163,7 +200,7 @@ func writeResult(ctx context.Context, c chan result, r result) error { | |
// hosting providers supported by this processor. It wait for the results to | ||
// be returned or for a timeout to occur then returns the results that | ||
// completed in time. | ||
func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result { | ||
func fetchMetadata(metadataFetchers []*metadataFetcher, timeout time.Duration) *result { | ||
debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout) | ||
start := time.Now() | ||
defer func() { | ||
|
@@ -187,11 +224,13 @@ func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result | |
defer cancel() | ||
|
||
c := make(chan result) | ||
go func() { writeResult(ctx, c, fetchJSON(ctx, "digitalocean", nil, doURL, doSchema, client)) }() | ||
go func() { writeResult(ctx, c, fetchJSON(ctx, "ec2", nil, ec2URL, ec2Schema, client)) }() | ||
go func() { writeResult(ctx, c, fetchJSON(ctx, "gce", gceHeaders, gceURL, gceSchema, client)) }() | ||
for _, fetcher := range metadataFetchers { | ||
go func(fetcher *metadataFetcher) { | ||
writeResult(ctx, c, fetcher.fetchMetadata(ctx, client)) | ||
}(fetcher) | ||
} | ||
|
||
for i := 0; i < 3; i++ { | ||
for i := 0; i < len(metadataFetchers); i++ { | ||
select { | ||
case result := <-c: | ||
debugf("add_cloud_metadata: received disposition for %v after %v. %v", | ||
|
@@ -209,30 +248,137 @@ func fetchMetadata(doURL, ec2URL, gceURL string, timeout time.Duration) *result | |
return nil | ||
} | ||
|
||
type addCloudMetadata struct { | ||
metadata common.MapStr | ||
} | ||
|
||
func newCloudMetadata(c common.Config) (processors.Processor, error) { | ||
// getMetadataURL loads config and generate metadata url | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func getMetadataURL(c common.Config, defaultHost string, timeout time.Duration, metadataURIs []string) ([]string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This returns a slice of URLs so I'm thinking it should be |
||
var urls []string | ||
config := struct { | ||
MetadataHostAndPort string `config:"host"` // Specifies the host and port of the metadata service (for testing purposes only). | ||
Timeout time.Duration `config:"timeout"` // Amount of time to wait for responses from the metadata services. | ||
}{ | ||
MetadataHostAndPort: metadataHost, | ||
Timeout: 3 * time.Second, | ||
MetadataHostAndPort: defaultHost, | ||
Timeout: timeout, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relocated. |
||
} | ||
err := c.Unpack(&config) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to unpack add_cloud_metadata config") | ||
return urls, errors.Wrap(err, "failed to unpack add_cloud_metadata config") | ||
} | ||
for _, uri := range metadataURIs { | ||
urls = append(urls, "http://"+config.MetadataHostAndPort+uri) | ||
} | ||
return urls, nil | ||
} | ||
|
||
// makeCommomJSONPicker generate fetch function which query json metadata from a hosting provider's HTTP response | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you could drop
|
||
func makeCommomJSONPicker(provider string) pick { | ||
return func(all []byte, res *result) error { | ||
dec := json.NewDecoder(bytes.NewReader(all)) | ||
dec.UseNumber() | ||
err := dec.Decode(&res.metadata) | ||
if err != nil { | ||
err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) | ||
return err | ||
} | ||
return nil | ||
} | ||
} | ||
|
||
var ( | ||
doURL = "http://" + config.MetadataHostAndPort + doMetadataURI | ||
ec2URL = "http://" + config.MetadataHostAndPort + ec2InstanceIdentityURI | ||
gceURL = "http://" + config.MetadataHostAndPort + gceMetadataURI | ||
) | ||
// newMetadataFetcher return metadataFetcher with one pass json picker | ||
func newMetadataFetcher( | ||
c common.Config, | ||
timeout time.Duration, | ||
provider string, | ||
headers map[string]string, | ||
host string, | ||
conv schemaConv, | ||
uris []string, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why accept a list if only one is ever used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, change to single string now. |
||
) (*metadataFetcher, error) { | ||
urls, err := getMetadataURL(c, host, timeout, uris) | ||
if err != nil { | ||
return nil, err | ||
} | ||
picker := map[string]pick{urls[0]: makeCommomJSONPicker(provider)} | ||
fetcher := &metadataFetcher{provider, headers, picker, conv} | ||
return fetcher, nil | ||
} | ||
|
||
result := fetchMetadata(doURL, ec2URL, gceURL, config.Timeout) | ||
func newDoMetadataFetcher(c common.Config, timeout time.Duration) (*metadataFetcher, error) { | ||
fetcher, err := newMetadataFetcher(c, timeout, "digitalocean", nil, metadataHost, doSchema, []string{ | ||
doMetadataURI, | ||
}) | ||
return fetcher, err | ||
} | ||
|
||
func newEc2MetadataFetcher(c common.Config, timeout time.Duration) (*metadataFetcher, error) { | ||
fetcher, err := newMetadataFetcher(c, timeout, "ec2", nil, metadataHost, ec2Schema, []string{ | ||
ec2InstanceIdentityURI, | ||
}) | ||
return fetcher, err | ||
} | ||
|
||
func newGceMetadataFetcher(c common.Config, timeout time.Duration) (*metadataFetcher, error) { | ||
fetcher, err := newMetadataFetcher(c, timeout, "gce", gceHeaders, metadataHost, gceSchema, []string{ | ||
gceMetadataURI, | ||
}) | ||
return fetcher, err | ||
} | ||
|
||
// newQcloudMetadataFetcher return the concrete metadata fetcher for qcloud provider | ||
// which requires more than one way to assemble the metadata | ||
func newQcloudMetadataFetcher(c common.Config, timeout time.Duration) (*metadataFetcher, error) { | ||
urls, err := getMetadataURL(c, qcloudMetadataHost, timeout, []string{ | ||
qcloudMetadataInstanceIdURI, | ||
qcloudMetadataRegionURI, | ||
qcloudMetadataZoneURI, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
picker := map[string]pick{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
urls[0]: func(all []byte, result *result) error { | ||
result.metadata["instance_id"] = string(all) | ||
return nil | ||
}, | ||
urls[1]: func(all []byte, result *result) error { | ||
result.metadata["region"] = string(all) | ||
return nil | ||
}, | ||
urls[2]: func(all []byte, result *result) error { | ||
result.metadata["zone"] = string(all) | ||
return nil | ||
}, | ||
} | ||
fetcher := &metadataFetcher{"qcloud", nil, picker, qcloudSchema} | ||
return fetcher, nil | ||
} | ||
|
||
func newCloudMetadata(c common.Config) (processors.Processor, error) { | ||
timeout := 3 * time.Second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we introduce a constant for that so it is easy to find? Something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout value should be unpacked from the configuration data. Otherwise it is hardcoded here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, it's hardcoded here, already move timeout configuration out of the getMetadataURLs and put it directly in newCloudMetadata. |
||
|
||
doFetcher, err := newDoMetadataFetcher(c, timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
ec2Fetcher, err := newEc2MetadataFetcher(c, timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
gceFetcher, err := newGceMetadataFetcher(c, timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
qcloudFetcher, err := newQcloudMetadataFetcher(c, timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var fetchers = []*metadataFetcher{ | ||
doFetcher, | ||
ec2Fetcher, | ||
gceFetcher, | ||
qcloudFetcher, | ||
} | ||
|
||
result := fetchMetadata(fetchers, timeout) | ||
if result == nil { | ||
logp.Info("add_cloud_metadata: hosting provider type not detected.") | ||
return addCloudMetadata{}, nil | ||
|
@@ -244,6 +390,10 @@ func newCloudMetadata(c common.Config) (processors.Processor, error) { | |
return addCloudMetadata{metadata: result.metadata}, nil | ||
} | ||
|
||
type addCloudMetadata struct { | ||
metadata common.MapStr | ||
} | ||
|
||
func (p addCloudMetadata) Run(event common.MapStr) (common.MapStr, error) { | ||
if len(p.metadata) == 0 { | ||
return event, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably drop the number here. More like
The following cloud providers are supported:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion