Skip to content

Commit

Permalink
Add support for adding aliases to index and shard stats (#563)
Browse files Browse the repository at this point in the history
* Add support for adding aliases to index and shard stats

Optionally will include aliases in a label to index and shard stats

Signed-off-by: Steven Cipriano <cipriano@squareup.com>

* move aliases to separate informational metrics

Signed-off-by: Steven Cipriano <cipriano@squareup.com>

* address review

- switch boolean flag
- fix linting errors

Signed-off-by: Steven Cipriano <cipriano@squareup.com>
  • Loading branch information
bobo333 authored May 19, 2022
1 parent 9ece896 commit b536294
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 18 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ elasticsearch_exporter --help
| es.indices | 1.0.2 | If true, query stats for all indices in the cluster. | false |
| es.indices_settings | 1.0.4rc1 | If true, query settings stats for all indices in the cluster. | false |
| es.indices_mappings | 1.2.0 | If true, query stats for mappings of all indices of the cluster. | false |
| es.aliases | 1.0.4rc1 | If true, include informational aliases metrics. | true |
| es.shards | 1.0.3rc1 | If true, query stats for all indices in the cluster, including shard-level stats (implies `es.indices=true`). | false |
| es.snapshots | 1.0.4rc1 | If true, query stats for the cluster snapshots. | false |
| es.slm | | If true, query stats for SLM. | false |
Expand Down
137 changes: 121 additions & 16 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ package collector
import (
"encoding/json"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
"io/ioutil"
"net/http"
"net/url"
"path"
"sort"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
)

type labels struct {
Expand All @@ -47,12 +47,20 @@ type shardMetric struct {
Labels labels
}

type aliasMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func() float64
Labels labels
}

// Indices information struct
type Indices struct {
logger log.Logger
client *http.Client
url *url.URL
shards bool
aliases bool
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

Expand All @@ -62,10 +70,11 @@ type Indices struct {

indexMetrics []*indexMetric
shardMetrics []*shardMetric
aliasMetrics []*aliasMetric
}

// NewIndices defines Indices Prometheus metrics
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool) *Indices {
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, includeAliases bool) *Indices {

indexLabels := labels{
keys: func(...string) []string {
Expand Down Expand Up @@ -95,11 +104,26 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
},
}

aliasLabels := labels{
keys: func(...string) []string {
return []string{"index", "alias", "cluster"}
},
values: func(lastClusterinfo *clusterinfo.Response, s ...string) []string {
if lastClusterinfo != nil {
return append(s, lastClusterinfo.ClusterName)
}
// this shouldn't happen, as the clusterinfo Retriever has a blocking
// Run method. It blocks until the first clusterinfo call has succeeded
return append(s, "unknown_cluster")
},
}

indices := &Indices{
logger: logger,
client: client,
url: url,
shards: shards,
aliases: includeAliases,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
Expand Down Expand Up @@ -1022,6 +1046,21 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
Labels: shardLabels,
},
},

aliasMetrics: []*aliasMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indices", "aliases"),
"Record aliases associated with an index",
aliasLabels.keys(), nil,
),
Value: func() float64 {
return float64(1)
},
Labels: aliasLabels,
},
},
}

// start go routine to fetch clusterinfo updates and save them to lastClusterinfo
Expand Down Expand Up @@ -1070,9 +1109,63 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {
u.RawQuery = "ignore_unavailable=true"
}

bts, err := i.queryURL(&u)
if err != nil {
return isr, err
}

if err := json.Unmarshal(bts, &isr); err != nil {
i.jsonParseFailures.Inc()
return isr, err
}

if i.aliases {
isr.Aliases = map[string][]string{}
asr, err := i.fetchAndDecodeAliases()
if err != nil {
_ = level.Error(i.logger).Log("err", err.Error())
return isr, err
}

for indexName, aliases := range asr {
var aliasList []string
for aliasName := range aliases.Aliases {
aliasList = append(aliasList, aliasName)
}

if len(aliasList) > 0 {
sort.Strings(aliasList)
isr.Aliases[indexName] = aliasList
}
}
}

return isr, nil
}

func (i *Indices) fetchAndDecodeAliases() (aliasesResponse, error) {
var asr aliasesResponse

u := *i.url
u.Path = path.Join(u.Path, "/_alias")

bts, err := i.queryURL(&u)
if err != nil {
return asr, err
}

if err := json.Unmarshal(bts, &asr); err != nil {
i.jsonParseFailures.Inc()
return asr, err
}

return asr, nil
}

func (i *Indices) queryURL(u *url.URL) ([]byte, error) {
res, err := i.client.Get(u.String())
if err != nil {
return isr, fmt.Errorf("failed to get index stats from %s://%s:%s%s: %s",
return []byte{}, fmt.Errorf("failed to get resource from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

Expand All @@ -1087,21 +1180,15 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {
}()

if res.StatusCode != http.StatusOK {
return isr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
return []byte{}, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

bts, err := ioutil.ReadAll(res.Body)
if err != nil {
i.jsonParseFailures.Inc()
return isr, err
}

if err := json.Unmarshal(bts, &isr); err != nil {
i.jsonParseFailures.Inc()
return isr, err
return []byte{}, err
}

return isr, nil
return bts, nil
}

// Collect gets Indices metric values
Expand All @@ -1125,6 +1212,24 @@ func (i *Indices) Collect(ch chan<- prometheus.Metric) {
}
i.up.Set(1)

// Alias stats
if i.aliases {
for _, metric := range i.aliasMetrics {
for indexName, aliases := range indexStatsResp.Aliases {
for _, alias := range aliases {
labelValues := metric.Labels.values(i.lastClusterInfo, indexName, alias)

ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(),
labelValues...,
)
}
}
}
}

// Index stats
for indexName, indexStats := range indexStatsResp.Indices {
for _, metric := range i.indexMetrics {
Expand Down
9 changes: 9 additions & 0 deletions collector/indices_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ type indexStatsResponse struct {
Shards IndexStatsShardsResponse `json:"_shards"`
All IndexStatsIndexResponse `json:"_all"`
Indices map[string]IndexStatsIndexResponse `json:"indices"`
Aliases map[string][]string
}

// aliasesResponse is a representation of a Elasticsearch Alias Query
type aliasesResponse map[string]aliasMapping

// aliasMapping is a mapping of index names to a map of associated alias names where the alias names are keys
type aliasMapping struct {
Aliases map[string]map[string]interface{} `json:"aliases"`
}

// IndexStatsShardsResponse defines index stats shards information structure
Expand Down
Loading

0 comments on commit b536294

Please sign in to comment.