Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add index metrics #85

Merged
172 changes: 172 additions & 0 deletions collector/indices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package collector

import (
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

var (
defaultIndexLabels = []string{"cluster", "index"}
defaultIndexLabelValues = func(clusterName string, indexName string) []string {
return []string{clusterName, indexName}
}
)

type indexMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexStats IndexStatsIndexResponse) float64
Labels func(clusterName string, indexName string) []string
}

type Indices struct {
logger log.Logger
client *http.Client
url *url.URL

up prometheus.Gauge
totalScrapes prometheus.Counter
jsonParseFailures prometheus.Counter

indexMetrics []*indexMetric
}

func NewIndices(logger log.Logger, client *http.Client, url *url.URL) *Indices {
return &Indices{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "index_stats", "up"),
Help: "Was the last scrape of the ElasticSearch index endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "index_stats", "total_scrapes"),
Help: "Current total ElasticSearch index scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "index_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),

indexMetrics: []*indexMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indices", "docs_primary"),
"Count of documents which only primary shards",
defaultIndexLabels, nil,
),
Value: func(indexStats IndexStatsIndexResponse) float64 {
return float64(indexStats.Primaries.Docs.Count)
},
Labels: defaultIndexLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indices", "store_size_bytes_primary"),
"Current total size of stored index data in bytes which only primary shards on all nodes",
defaultIndexLabels, nil,
),
Value: func(indexStats IndexStatsIndexResponse) float64 {
return float64(indexStats.Primaries.Store.SizeInBytes)
},
Labels: defaultIndexLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indices", "store_size_bytes_total"),
"Current total size of stored index data in bytes which all shards on all nodes",
defaultIndexLabels, nil,
),
Value: func(indexStats IndexStatsIndexResponse) float64 {
return float64(indexStats.Total.Store.SizeInBytes)
},
Labels: defaultIndexLabelValues,
},
},
}
}

func (i *Indices) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range i.indexMetrics {
ch <- metric.Desc
}
ch <- i.up.Desc()
ch <- i.totalScrapes.Desc()
ch <- i.jsonParseFailures.Desc()
}

func (c *Indices) fetchAndDecodeStats() (indexStatsResponse, error) {
var isr indexStatsResponse

u := *c.url
u.Path = "/_all/_stats"

res, err := c.client.Get(u.String())
if err != nil {
return isr, fmt.Errorf("failed to get index stats from %s://%s:%s/%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return isr, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

if err := json.NewDecoder(res.Body).Decode(&isr); err != nil {
c.jsonParseFailures.Inc()
return isr, err
}
return isr, nil
}

func (i *Indices) Collect(ch chan<- prometheus.Metric) {
i.totalScrapes.Inc()
defer func() {
ch <- i.up
ch <- i.totalScrapes
ch <- i.jsonParseFailures
}()

clusterHealth := NewClusterHealth(i.logger, i.client, i.url)
clusterHealthResponse, err := clusterHealth.fetchAndDecodeClusterHealth()
if err != nil {
i.up.Set(0)
level.Warn(i.logger).Log(
"msg", "failed to fetch and decode cluster health",
"err", err,
)
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clusterHealth block in the indices collector is probably a copy'n' paste error, isn't it?!


indexStatsResponse, err := i.fetchAndDecodeStats()
if err != nil {
i.up.Set(0)
level.Warn(i.logger).Log(
"msg", "failed to fetch and decode index stats",
"err", err,
)
return
}
i.up.Set(1)

for indexName, indexStats := range indexStatsResponse.Indices {
for _, metric := range i.indexMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexStats),
metric.Labels(clusterHealthResponse.ClusterName, indexName)...,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy'n'paste error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review the labels you want to attach to the indices metrics. It seems there is not cluster name available

)
}
}
}
169 changes: 169 additions & 0 deletions collector/indices_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package collector

// indexStatsResponse is a representation of a Elasticsearch Index Stats
type indexStatsResponse struct {
Shards IndexStatsShardsResponse `json:"_shards"`
All IndexStatsIndexResponse `json:"_all"`
Indices map[string]IndexStatsIndexResponse `json:"indices"`
}

type IndexStatsShardsResponse struct {
Total int64 `json:"total"`
Successful int64 `json:"successful"`
Failed int64 `json:"failed"`
}

type IndexStatsIndexResponse struct {
Primaries IndexStatsIndexDetailResponse `json:"primaries"`
Total IndexStatsIndexDetailResponse `json:"total"`
}

type IndexStatsIndexDetailResponse struct {
Docs IndexStatsIndexDocsResponse `json:"docs"`
Store IndexStatsIndexStoreResponse `json:"store"`
Indexing IndexStatsIndexIndexingResponse `json:"indexing"`
Get IndexStatsIndexGetResponse `json:"get"`
Search IndexStatsIndexSearchResponse `json:"search"`
Merges IndexStatsIndexMergesResponse `json:"merges"`
Refresh IndexStatsIndexRefreshResponse `json:"refresh"`
Flush IndexStatsIndexFlushResponse `json:"flush"`
Warmer IndexStatsIndexWarmerResponse `json:"warmer"`
QueryCache IndexStatsIndexQueryCacheResponse `json:"query_cache"`
Fielddata IndexStatsIndexFielddataResponse `json:"fielddata"`
Completion IndexStatsIndexCompletionResponse `json:"completion"`
Segments IndexStatsIndexSegmentsResponse `json:"segments"`
Translog IndexStatsIndexTranslogResponse `json:"translog"`
RequestCache IndexStatsIndexRequestCacheResponse `json:"request_cache"`
Recovery IndexStatsIndexRecoveryResponse `json:"recovery"`
}

type IndexStatsIndexDocsResponse struct {
Count int64 `json:"count"`
Deleted int64 `json:"deleted"`
}

type IndexStatsIndexStoreResponse struct {
SizeInBytes int64 `json:"size_in_bytes"`
ThrottleTimeInMillis int64 `json:"throttle_time_in_millis"`
}

type IndexStatsIndexIndexingResponse struct {
IndexTotal int64 `json:"index_total"`
IndexTimeInMillis int64 `json:"index_time_in_millis"`
IndexCurrent int64 `json:"index_current"`
IndexFailed int64 `json:"index_failed"`
DeleteTotal int64 `json:"delete_total"`
DeleteTimeInMillis int64 `json:"delete_time_in_millis"`
DeleteCurrent int64 `json:"delete_current"`
NoopUpdateTotal int64 `json:"noop_update_total"`
IsThrottled bool `json:"is_throttled"`
ThrottleTimeInMillis int64 `json:"throttle_time_in_millis"`
}

type IndexStatsIndexGetResponse struct {
Total int64 `json:"total"`
TimeInMillis int64 `json:"time_in_millis"`
ExistsTotal int64 `json:"exists_total"`
ExistsTimeInMillis int64 `json:"exists_time_in_millis"`
MissingTotal int64 `json:"missing_total"`
MissingTimeInMillis int64 `json:"missing_time_in_millis"`
Current int64 `json:"current"`
}

type IndexStatsIndexSearchResponse struct {
OpenContexts int64 `json:"open_contexts"`
QueryTotal int64 `json:"query_total"`
QueryTimeInMillis int64 `json:"query_time_in_millis"`
QueryCurrent int64 `json:"query_current"`
FetchTotal int64 `json:"fetch_total"`
FetchTimeInMillis int64 `json:"fetch_time_in_millis"`
FetchCurrent int64 `json:"fetch_current"`
ScrollTotal int64 `json:"scroll_total"`
ScrollTimeInMillis int64 `json:"scroll_time_in_millis"`
ScrollCurrent int64 `json:"scroll_current"`
SuggestTotal int64 `json:"suggest_total"`
SuggestTimeInMillis int64 `json:"suggest_time_in_millis"`
SuggestCurrent int64 `json:"suggest_current"`
}

type IndexStatsIndexMergesResponse struct {
Current int64 `json:"current"`
CurrentDocs int64 `json:"current_docs"`
CurrentSizeInBytes int64 `json:"current_size_in_bytes"`
Total int64 `json:"total"`
TotalTimeInMillis int64 `json:"total_time_in_millis"`
TotalDocs int64 `json:"total_docs"`
TotalSizeInBytes int64 `json:"total_size_in_bytes"`
TotalStoppedTimeInMillis int64 `json:"total_stopped_time_in_millis"`
TotalThrottledTimeInMillis int64 `json:"total_throttled_time_in_millis"`
TotalAutoThrottleInBytes int64 `json:"total_auto_throttle_in_bytes"`
}

type IndexStatsIndexRefreshResponse struct {
Total int64 `json:"total"`
TotalTimeInMillis int64 `json:"total_time_in_millis"`
Listeners int64 `json:"listeners"`
}

type IndexStatsIndexFlushResponse struct {
Total int64 `json:"total"`
TotalTimeInMillis int64 `json:"total_time_in_millis"`
}

type IndexStatsIndexWarmerResponse struct {
Current int64 `json:"current"`
Total int64 `json:"total"`
TotalTimeInMillis int64 `json:"total_time_in_millis"`
}

type IndexStatsIndexQueryCacheResponse struct {
MemorySizeInBytes int64 `json:"memory_size_in_bytes"`
TotalCount int64 `json:"total_count"`
HitCount int64 `json:"hit_count"`
MissCount int64 `json:"miss_count"`
CacheSize int64 `json:"cache_size"`
CacheCount int64 `json:"cache_count"`
Evictions int64 `json:"evictions"`
}

type IndexStatsIndexFielddataResponse struct {
MemorySizeInBytes int64 `json:"memory_size_in_bytes"`
Evictions int64 `json:"evictions"`
}

type IndexStatsIndexCompletionResponse struct {
SizeInBytes int64 `json:"size_in_bytes"`
}

type IndexStatsIndexSegmentsResponse struct {
Count int64 `json:"count"`
MemoryInBytes int64 `json:"memory_in_bytes"`
TermsMemoryInBytes int64 `json:"terms_memory_in_bytes"`
StoredFieldsMemoryInBytes int64 `json:"stored_fields_memory_in_bytes"`
TermVectorsMemoryInBytes int64 `json:"term_vectors_memory_in_bytes"`
NormsMemoryInBytes int64 `json:"norms_memory_in_bytes"`
PointsMemoryInBytes int64 `json:"points_memory_in_bytes"`
DocValuesMemoryInBytes int64 `json:"doc_values_memory_in_bytes"`
IndexWriterMemoryInBytes int64 `json:"index_writer_memory_in_bytes"`
VersionMapMemoryInBytes int64 `json:"version_map_memory_in_bytes"`
FixedBitSetMemoryInBytes int64 `json:"fixed_bit_set_memory_in_bytes"`
MaxUnsafeAutoIdTimestamp int64 `json:"max_unsafe_auto_id_timestamp"`
}

type IndexStatsIndexTranslogResponse struct {
Operations int64 `json:"operations"`
SizeInBytes int64 `json:"size_in_bytes"`
}

type IndexStatsIndexRequestCacheResponse struct {
MemorySizeInBytes int64 `json:"memory_size_in_bytes"`
Evictions int64 `json:"evictions"`
HitCount int64 `json:"hit_count"`
MissCount int64 `json:"miss_count"`
}

type IndexStatsIndexRecoveryResponse struct {
CurrentAsSource int64 `json:"current_as_source"`
CurrentAsTarget int64 `json:"current_as_target"`
ThrottleTimeInMillis int64 `json:"throttle_time_in_millis"`
}
54 changes: 54 additions & 0 deletions collector/indices_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func main() {

prometheus.MustRegister(collector.NewClusterHealth(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes))
prometheus.MustRegister(collector.NewIndices(logger, httpClient, esURL))

http.Handle(*metricsPath, prometheus.Handler())
http.HandleFunc("/", IndexHandler(*metricsPath))
Expand Down