Skip to content

Commit

Permalink
bugfix: log index got overwrite when multi esurls exists in same k8s …
Browse files Browse the repository at this point in the history
…cluster (erda-project#2887)
  • Loading branch information
snakorse authored and erda-bot committed Nov 5, 2021
1 parent 1eaf0aa commit fbde888
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
3 changes: 2 additions & 1 deletion modules/extensions/loghub/index/query/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func (p *provider) getESClientsFromLogAnalyticsByLogDeployment(addon string, log
clients = append(clients, c)

if p.C.IndexPreload && indices != nil && len(addons) > 0 {
if indexAddons, ok := indices[d.ClusterName]; ok {
cacheKey := p.calcIndexCacheBucketKey(c.Cluster, c.URLs)
if indexAddons, ok := indices[cacheKey]; ok {
for _, addon := range addons {
c.Entrys = append(c.Entrys, indexAddons[addon]...)
}
Expand Down
30 changes: 23 additions & 7 deletions modules/extensions/loghub/index/query/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package query

import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -44,22 +47,35 @@ type IndexEntry struct {

var requestTimeout = 120 * time.Second

func (p *provider) calcIndexCacheBucketKey(cluster, esUrl string) string {
return fmt.Sprintf("%s-%s", cluster, p.md5V(esUrl))
}

func (p *provider) md5V(str string) string {
h := md5.New() // #nosec G401
h.Write([]byte(str))
return hex.EncodeToString(h.Sum(nil))
}

func (p *provider) reloadAllIndices() {
clients := p.getAllESClients()
indices := make(map[string]map[string][]*IndexEntry)
clusters := make(map[string]bool)
for _, client := range clients {
clusters[client.Cluster] = true

cacheKey := p.calcIndexCacheBucketKey(client.Cluster, client.URLs)

clusters[cacheKey] = true
addons, err := p.reloadIndices(client.Client)
if err != nil {
p.L.Errorf("fail to load indices for cluster %s :", client.Cluster, err)
continue
}
// 查询索引时间范围
timeRanges := p.timeRanges[client.Cluster]
timeRanges := p.timeRanges[cacheKey]
if timeRanges == nil {
timeRanges = make(map[string]*timeRange)
p.timeRanges[client.Cluster] = timeRanges
p.timeRanges[cacheKey] = timeRanges
}
set := make(map[string]bool)
for _, list := range addons {
Expand All @@ -83,11 +99,11 @@ func (p *provider) reloadAllIndices() {
delete(timeRanges, index)
}
}
indices[client.Cluster] = addons
indices[cacheKey] = addons
}
for cluster := range p.timeRanges {
if !clusters[cluster] {
delete(p.timeRanges, cluster)
for cacheKey := range p.timeRanges {
if !clusters[cacheKey] {
delete(p.timeRanges, cacheKey)
}
}
for _, addons := range indices {
Expand Down

0 comments on commit fbde888

Please sign in to comment.