Skip to content

Commit

Permalink
Add support for adding aliases to index and shard stats
Browse files Browse the repository at this point in the history
Optionally will include aliases in a label to index and shard stats

Signed-off-by: Steven Cipriano <cipriano@squareup.com>
  • Loading branch information
bobo333 committed May 6, 2022
1 parent e5fb0a3 commit e3db0ad
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 15 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ elasticsearch_exporter --help
| es.indices | 1.0.2 | If true, query stats for all indices in the cluster. | false |
| es.indices_settings | 1.0.4rc1 | If true, query settings stats for all indices in the cluster. | false |
| es.indices_mappings | 1.2.0 | If true, query stats for mappings of all indices of the cluster. | false |
| es.aliases | 1.0.4rc1 | If true, include aliases in label for all index and shard stats. | false |
| es.shards | 1.0.3rc1 | If true, query stats for all indices in the cluster, including shard-level stats (implies `es.indices=true`). | false |
| es.snapshots | 1.0.4rc1 | If true, query stats for the cluster snapshots. | false |
| es.timeout | 1.0.2 | Timeout for trying to get stats from Elasticsearch. (ex: 20s) | 5s |
Expand Down
104 changes: 91 additions & 13 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"net/http"
"net/url"
"path"
"sort"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -53,6 +55,7 @@ type Indices struct {
client *http.Client
url *url.URL
shards bool
aliases bool
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

Expand All @@ -65,10 +68,13 @@ type Indices struct {
}

// NewIndices defines Indices Prometheus metrics
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool) *Indices {
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, aliases bool) *Indices {

indexLabels := labels{
keys: func(...string) []string {
if aliases {
return []string{"index", "aliases", "cluster"}
}
return []string{"index", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
Expand All @@ -83,6 +89,9 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo

shardLabels := labels{
keys: func(...string) []string {
if aliases {
return []string{"index", "shard", "node", "primary", "aliases", "cluster"}
}
return []string{"index", "shard", "node", "primary", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
Expand All @@ -100,6 +109,7 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
client: client,
url: url,
shards: shards,
aliases: aliases,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
Expand Down Expand Up @@ -1070,9 +1080,63 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {
u.RawQuery = "ignore_unavailable=true"
}

bts, err := i.queryUrl(&u)
if err != nil {
return isr, err
}

if err := json.Unmarshal(bts, &isr); err != nil {
i.jsonParseFailures.Inc()
return isr, err
}

if i.aliases {
isr.Aliases = map[string][]string{}
asr, err := i.fetchAndDecodeAliases()
if err != nil {
_ = level.Error(i.logger).Log("err", err.Error())
return isr, err
}

for indexName, aliases := range asr {
var aliasList []string
for aliasName, _ := range aliases.Aliases {
aliasList = append(aliasList, aliasName)
}

if len(aliasList) > 0 {
sort.Strings(aliasList)
isr.Aliases[indexName] = aliasList
}
}
}

return isr, nil
}

func (i *Indices) fetchAndDecodeAliases() (aliasesResponse, error) {
var asr aliasesResponse

u := *i.url
u.Path = path.Join(u.Path, "/_alias")

bts, err := i.queryUrl(&u)
if err != nil {
return asr, err
}

if err := json.Unmarshal(bts, &asr); err != nil {
i.jsonParseFailures.Inc()
return asr, err
}

return asr, nil
}

func (i *Indices) queryUrl(u *url.URL) ([]byte, error) {
res, err := i.client.Get(u.String())
if err != nil {
return isr, fmt.Errorf("failed to get index stats from %s://%s:%s%s: %s",
return []byte{}, fmt.Errorf("failed to get resource from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

Expand All @@ -1087,21 +1151,15 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {
}()

if res.StatusCode != http.StatusOK {
return isr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
return []byte{}, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

bts, err := ioutil.ReadAll(res.Body)
if err != nil {
i.jsonParseFailures.Inc()
return isr, err
return []byte{}, err
}

if err := json.Unmarshal(bts, &isr); err != nil {
i.jsonParseFailures.Inc()
return isr, err
}

return isr, nil
return bts, nil
}

// Collect gets Indices metric values
Expand All @@ -1127,12 +1185,26 @@ func (i *Indices) Collect(ch chan<- prometheus.Metric) {

// Index stats
for indexName, indexStats := range indexStatsResp.Indices {
var labelValues []string
var aliasesString string
if i.aliases {
if aliasNames, ok := indexStatsResp.Aliases[indexName]; ok {
aliasesString = strings.Join(aliasNames, ",")
}
}

for _, metric := range i.indexMetrics {
if i.aliases {
labelValues = metric.Labels.values(i.lastClusterInfo, indexName, aliasesString)
} else {
labelValues = metric.Labels.values(i.lastClusterInfo, indexName)
}

ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexStats),
metric.Labels.values(i.lastClusterInfo, indexName)...,
labelValues...,
)

}
Expand All @@ -1141,11 +1213,17 @@ func (i *Indices) Collect(ch chan<- prometheus.Metric) {
// gaugeVec := prometheus.NewGaugeVec(metric.Opts, metric.Labels)
for shardNumber, shards := range indexStats.Shards {
for _, shard := range shards {
if i.aliases {
labelValues = metric.Labels.values(i.lastClusterInfo, indexName, shardNumber, shard.Routing.Node, strconv.FormatBool(shard.Routing.Primary), aliasesString)
} else {
labelValues = metric.Labels.values(i.lastClusterInfo, indexName, shardNumber, shard.Routing.Node, strconv.FormatBool(shard.Routing.Primary))
}

ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(shard),
metric.Labels.values(i.lastClusterInfo, indexName, shardNumber, shard.Routing.Node, strconv.FormatBool(shard.Routing.Primary))...,
labelValues...,
)
}
}
Expand Down
9 changes: 9 additions & 0 deletions collector/indices_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ type indexStatsResponse struct {
Shards IndexStatsShardsResponse `json:"_shards"`
All IndexStatsIndexResponse `json:"_all"`
Indices map[string]IndexStatsIndexResponse `json:"indices"`
Aliases map[string][]string
}

// aliasesResponse is a representation of a Elasticsearch Alias Query
type aliasesResponse map[string]aliasMapping

// aliasMapping is a mapping of index names to a map of associated alias names where the alias names are keys
type aliasMapping struct {
Aliases map[string]map[string]interface{} `json:"aliases"`
}

// IndexStatsShardsResponse defines index stats shards information structure
Expand Down
Loading

0 comments on commit e3db0ad

Please sign in to comment.