Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

Improve ES Sink: #1260

Merged
merged 5 commits into from
Sep 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ eventer
*.un~
Session.vim
.netrwhist
.idea
18 changes: 13 additions & 5 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions common/elasticsearch/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package elasticsearch

import (
"fmt"
awsauth "github.com/smartystreets/go-aws-auth"
"net/http"
"os"
)

type AWSSigningTransport struct {
HTTPClient *http.Client
Credentials awsauth.Credentials
}

// RoundTrip implementation
func (a AWSSigningTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return a.HTTPClient.Do(awsauth.Sign4(req, a.Credentials))
}

func createAWSClient() (*http.Client, error) {
id := os.Getenv("AWS_ACCESS_KEY_ID")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to deploy heapster, kubernetes or standalone?
And if using kubernetes to deploy, how to inject this environment variable?
Would you like to explain me the details of deployment?
Thanks so much.

Copy link
Contributor Author

@AlmogBaku AlmogBaku Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I personally recommend about deploying it into an external service rather than a k8s service to avoid a case of single-point-of-failure (especially when the ES used to help you track/resolving issues in the cluster...), but you can do both of course

Regarding the deployment, you can set the ENV by the k8s manifest.. for more details see what I wrote in the documentation of the ES Sink(https://github.com/AlmogBaku/heapster/blob/es_sink_improvments/docs/sink-configuration.md#aws)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, when deploying a pod in k8s, some authentication info can be read from serviceaccout (secret), equal to read from some files. I don't think putting secret information in deploy yaml file by ENVIRONMENT is a good way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find some way to collect the AWS information from this secret, but couldn't find anything sufficient..
The only idea I had(for more secured access) is to do an HTTP request to AWS endpoint in order to get the temporary AWS token associated with an EC2's role.. however that's won't work in cases of a server that doesn't uses AWS(for federation cases for instance, or for using AWS as a secondary cloud sink). Also, this method will require maintaining some mechanism of leasing, and renewing from the AWS token's endpoint, inside the heapster(=very big feature).

Do you have a different idea how to store that?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can refer to this link http://kubernetes.io/docs/user-guide/service-accounts/
with a conventional process, we can put these parameters like "AWS_ACCESS_KEY_ID" in secret, and use these by service-account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure what the difference between exposing it via ServiceAccount's secret, and a regular secret.. also I'm not sure how I access the ServiceAccount's secret.. via reading the file /var/run/secrets/kubernetes.io/serviceaccount/aws_access_key?*

Also, these ENVs are the "common practice" by AWS GO SDK, I tried to align with them here(both with names and recommendations)

*The documentation of k8s for SA is pretty lack of info..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a controversial thing I'm ok with creating an issue and postponing the discussion there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue will not block this PR, so I think it is ready to be merged.
LGTM.
@piosz Please merge this.

if id == "" {
id = os.Getenv("AWS_ACCESS_KEY")
}

secret := os.Getenv("AWS_SECRET_ACCESS_KEY")
if secret == "" {
secret = os.Getenv("AWS_SECRET_KEY")
}

if id == "" || secret == "" {
return nil, fmt.Errorf("Failed to configure AWS authentication. Both `AWS_ACCESS_KEY_ID` and " +
"`AWS_SECRET_ACCESS_KEY` environment veriables required")
}

signingTransport := AWSSigningTransport{
Credentials: awsauth.Credentials{
AccessKeyID: id,
SecretAccessKey: secret,
},
HTTPClient: http.DefaultClient,
}
return &http.Client{Transport: http.RoundTripper(signingTransport)}, nil
}
51 changes: 34 additions & 17 deletions common/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"github.com/pborman/uuid"

"gopkg.in/olivere/elastic.v3"
"os"
)

const (
Expand Down Expand Up @@ -53,14 +55,14 @@ func SaveDataIntoES(esClient *elastic.Client, indexName string, typeName string,
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("failed to create Index in ES cluster: %s", err)
return fmt.Errorf("Failed to create Index in ES cluster: %s", err)
}
}
indexID := uuid.NewUUID()
_, err = esClient.Index().
Index(indexName).
Type(typeName).
Id(string(indexID)).
Id(indexID.String()).
BodyJson(sinkData).
Do()
if err != nil {
Expand All @@ -76,7 +78,7 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
var esConfig ElasticSearchConfig
opts, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, fmt.Errorf("failed to parser url's query string: %s", err)
return nil, fmt.Errorf("Failed to parser url's query string: %s", err)
}

// set the index for es,the default value is "heapster"
Expand All @@ -87,12 +89,15 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {

// Set the URL endpoints of the ES's nodes. Notice that when sniffing is
// enabled, these URLs are used to initially sniff the cluster on startup.
if len(opts["nodes"]) < 1 {
var startupFns []elastic.ClientOptionFunc
if len(opts["nodes"]) > 0 {
startupFns = append(startupFns, elastic.SetURL(opts["nodes"]...))
} else if uri.Opaque != "" {
startupFns = append(startupFns, elastic.SetURL(uri.Opaque))
} else {
return nil, fmt.Errorf("There is no node assigned for connecting ES cluster")
}

startupFns := []elastic.ClientOptionFunc{elastic.SetURL(opts["nodes"]...)}

// If the ES cluster needs authentication, the username and secret
// should be set in sink config.Else, set the Authenticate flag to false
if len(opts["esUserName"]) > 0 && len(opts["esUserSecret"]) > 0 {
Expand All @@ -115,14 +120,6 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheck(healthCheck))
}

if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}

if len(opts["startupHealthcheckTimeout"]) > 0 {
timeout, err := time.ParseDuration(opts["startupHealthcheckTimeout"][0] + "s")
if err != nil {
Expand All @@ -131,12 +128,32 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheckTimeoutStartup(timeout))
}

if os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "" ||
os.Getenv("AWS_SECRET_ACCESS_KEY") != "" || os.Getenv("AWS_SECRET_KEY") != "" {
glog.Info("Configuring with AWS credentials..")

awsClient, err := createAWSClient()
if err != nil {
return nil, err
}

startupFns = append(startupFns, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
} else {
if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}
}

esConfig.EsClient, err = elastic.NewClient(startupFns...)
if err != nil {
return nil, fmt.Errorf("failed to create ElasticSearch client: %v", err)
return nil, fmt.Errorf("Failed to create ElasticSearch client: %v", err)
}

glog.V(2).Infof("elasticsearch sink configure successfully")
glog.V(2).Infof("ElasticSearch sink configure successfully")

return &esConfig, nil
}
2 changes: 1 addition & 1 deletion common/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
)

func TestCreateElasticSearchConfig(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions docs/sink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,18 @@ The following options are available:
### Elasticsearch
This sink supports monitoring metrics and events. To use the ElasticSearch
sink add the following flag:

```
--sink=elasticsearch:<ES_SERVER_URL>[?<OPTIONS>]

```
Normally an ElasticSearch cluster has multiple nodes or a proxy, so these need
to be configured for the ElasticSearch sink. To do this, you can set
`ES_SERVER_URL` to a dummy value, and use the `?nodes=` query value for each
additional node in the cluster. For example:

```
--sink=elasticsearch:?nodes=foo.com:9200&nodes=bar.com:9200
```
(*) Notice that using the `?nodes` notation will override the `ES_SERVER_URL`


Besides this, the following options can be set in query string:

Expand All @@ -189,6 +192,32 @@ Like this:

--sink="elasticsearch:?nodes=0.0.0.0:9200&Index=testEvent"

#### AWS Integration
In order to use AWS Managed Elastic we need to use one of the following methods:

1. Making sure the public IPs of the Heapster are allowed on the ElasticSearch's Access Policy

-OR-

2. Configuring an Access Policy with IAM
1. Configure the ElasticSearch cluster policy with IAM User
2. Create a secret that stores the IAM credentials
3. Expose the credentials to the environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`

```
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.secret
```

## Using multiple sinks

Heapster can be configured to send k8s metrics and events to multiple sinks by specifying the`--sink=...` flag multiple times.
Expand Down
11 changes: 7 additions & 4 deletions events/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
event_core "k8s.io/heapster/events/core"
"k8s.io/heapster/metrics/core"
Expand Down Expand Up @@ -86,7 +86,10 @@ func (sink *elasticSearchSink) ExportEvents(eventBatch *event_core.EventBatch) {
if err != nil {
glog.Warningf("Failed to convert event to point: %v", err)
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err = sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
}

Expand All @@ -102,12 +105,12 @@ func NewElasticSearchSink(uri *url.URL) (event_core.EventSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.Warning("Failed to config ElasticSearch")
return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Info("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion events/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/events/core"
kube_api "k8s.io/kubernetes/pkg/api"
Expand Down
16 changes: 11 additions & 5 deletions metrics/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down Expand Up @@ -58,7 +58,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
for _, metric := range metricSet.LabeledMetrics {
labels := make(map[string]string)
Expand All @@ -76,7 +79,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.Warningf("Failed to export data to ElasticSearch sink: %v", err)
}
}
}
}
Expand All @@ -93,12 +99,12 @@ func NewElasticSearchSink(uri *url.URL) (core.DataSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.Warningf("Failed to config ElasticSearch: %v", err)
return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Info("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion metrics/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/gedex/inflector/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading