From cc0afa6e411ad7742dc40f134947667784e31af3 Mon Sep 17 00:00:00 2001 From: guogangping <512979011@qq.com> Date: Fri, 5 Nov 2021 14:27:42 +0800 Subject: [PATCH] bugfix: log index got overwrite when multi esurls exists in same k8s cluster --- .../extensions/loghub/index/query/clients.go | 3 +- .../extensions/loghub/index/query/index.go | 30 ++++++++++++++----- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/modules/extensions/loghub/index/query/clients.go b/modules/extensions/loghub/index/query/clients.go index 3b1e3d0b1ae..a7acd961164 100644 --- a/modules/extensions/loghub/index/query/clients.go +++ b/modules/extensions/loghub/index/query/clients.go @@ -220,7 +220,8 @@ func (p *provider) getESClientsFromLogAnalyticsByLogDeployment(addon string, log clients = append(clients, c) if p.C.IndexPreload && indices != nil && len(addons) > 0 { - if indexAddons, ok := indices[d.ClusterName]; ok { + cacheKey := p.calcIndexCacheBucketKey(c.Cluster, c.URLs) + if indexAddons, ok := indices[cacheKey]; ok { for _, addon := range addons { c.Entrys = append(c.Entrys, indexAddons[addon]...) } diff --git a/modules/extensions/loghub/index/query/index.go b/modules/extensions/loghub/index/query/index.go index 0bd6010de01..0e3c693c05e 100644 --- a/modules/extensions/loghub/index/query/index.go +++ b/modules/extensions/loghub/index/query/index.go @@ -16,6 +16,9 @@ package query import ( "context" + "crypto/md5" + "encoding/hex" + "fmt" "sort" "strconv" "strings" @@ -44,22 +47,35 @@ type IndexEntry struct { var requestTimeout = 120 * time.Second +func (p *provider) calcIndexCacheBucketKey(cluster, esUrl string) string { + return fmt.Sprintf("%s-%s", cluster, p.md5V(esUrl)) +} + +func (p *provider) md5V(str string) string { + h := md5.New() // #nosec G401 + h.Write([]byte(str)) + return hex.EncodeToString(h.Sum(nil)) +} + func (p *provider) reloadAllIndices() { clients := p.getAllESClients() indices := make(map[string]map[string][]*IndexEntry) clusters := make(map[string]bool) for _, client := range clients { - clusters[client.Cluster] = true + + cacheKey := p.calcIndexCacheBucketKey(client.Cluster, client.URLs) + + clusters[cacheKey] = true addons, err := p.reloadIndices(client.Client) if err != nil { p.L.Errorf("fail to load indices for cluster %s :", client.Cluster, err) continue } // 查询索引时间范围 - timeRanges := p.timeRanges[client.Cluster] + timeRanges := p.timeRanges[cacheKey] if timeRanges == nil { timeRanges = make(map[string]*timeRange) - p.timeRanges[client.Cluster] = timeRanges + p.timeRanges[cacheKey] = timeRanges } set := make(map[string]bool) for _, list := range addons { @@ -83,11 +99,11 @@ func (p *provider) reloadAllIndices() { delete(timeRanges, index) } } - indices[client.Cluster] = addons + indices[cacheKey] = addons } - for cluster := range p.timeRanges { - if !clusters[cluster] { - delete(p.timeRanges, cluster) + for cacheKey := range p.timeRanges { + if !clusters[cacheKey] { + delete(p.timeRanges, cacheKey) } } for _, addons := range indices {