From 51b01f447565628cedeeaf78bda869347cb139e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=88=9A=E5=B9=B3?= <512979011@qq.com> Date: Fri, 26 Nov 2021 17:27:52 +0800 Subject: [PATCH] bugfix: microservice projects statistics error (#3196) * bugfix: microservice projects statistics * bugfix: fix unit test --- modules/msp/tenant/project/project.service.go | 149 +++---------- .../tenant/project/project.service_test.go | 2 + .../msp/tenant/project/project_statistics.go | 207 ++++++++++++++++++ 3 files changed, 239 insertions(+), 119 deletions(-) create mode 100644 modules/msp/tenant/project/project_statistics.go diff --git a/modules/msp/tenant/project/project.service.go b/modules/msp/tenant/project/project.service.go index 4177b32c5b4..6a62b180f77 100644 --- a/modules/msp/tenant/project/project.service.go +++ b/modules/msp/tenant/project/project.service.go @@ -16,14 +16,11 @@ package project import ( context "context" - "fmt" "net/url" "sort" "strconv" "time" - "google.golang.org/protobuf/types/known/structpb" - "github.com/erda-project/erda-infra/providers/i18n" metricpb "github.com/erda-project/erda-proto-go/core/monitor/metric/pb" tenantpb "github.com/erda-project/erda-proto-go/msp/tenant/pb" @@ -96,6 +93,36 @@ func (p Projects) Less(i, j int) bool { return activeTime1 > activeTime2 } +func (p Projects) Select(selector func(item *pb.Project) interface{}) []interface{} { + var result []interface{} + for _, project := range p { + result = append(result, selector(project)) + } + return result +} + +func (p Projects) SelectMany(selector func(item *pb.Project) []interface{}) []interface{} { + var result []interface{} + for _, project := range p { + list := selector(project) + for _, item := range list { + result = append(result, item) + } + } + return result +} + +func (p Projects) Where(filter func(item *pb.Project) bool) Projects { + result := Projects{} + for _, project := range p { + if !filter(project) { + continue + } + result = append(result, project) + } + return result +} + func (s *projectService) GetProjects(ctx context.Context, req *pb.GetProjectsRequest) (*pb.GetProjectsResponse, error) { projects, err := s.GetProjectList(ctx, req.ProjectId, req.WithStats) if err != nil { @@ -167,122 +194,6 @@ func (s *projectService) GetProjectList(ctx context.Context, projectIDs []string return projects, nil } -func (s *projectService) getProjectsStatistics(projects Projects) error { - if len(projects) == 0 { - return nil - } - endMillSeconds := time.Now().UnixNano() / int64(time.Millisecond) - oneDayAgoMillSeconds := endMillSeconds - int64(24*time.Hour/time.Millisecond) - sevenDayAgoMillSeconds := endMillSeconds - int64(7*24*time.Hour/time.Millisecond) - var projectIds []interface{} - var terminusKeys []interface{} - for _, project := range projects { - projectIds = append(projectIds, project.Id) - for _, workspace := range project.Relationship { - terminusKeys = append(terminusKeys, workspace.TenantID) - } - } - projectIdList, err := structpb.NewList(projectIds) - terminusKeyList, err := structpb.NewList(terminusKeys) - if err != nil { - return fmt.Errorf("failed to generate pb valuelist") - } - - servicesCountMap := map[string]int64{} - activeTimeMap := map[string]int64{} - alertCountMap := map[string]int64{} - - // get services count - req := &metricpb.QueryWithInfluxFormatRequest{ - Start: strconv.FormatInt(sevenDayAgoMillSeconds, 10), - End: strconv.FormatInt(endMillSeconds, 10), - Filters: []*metricpb.Filter{ - { - Key: "tags.terminus_key", - Op: "in", - Value: structpb.NewListValue(terminusKeyList), - }, - }, - Statement: ` - SELECT terminus_key::tag, distinct(service_id::tag), max(timestamp) - FROM application_service_node - WHERE _metric_scope::tag = 'micro_service' - GROUP BY terminus_key::tag - `, - } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - resp, err := s.metricq.QueryWithInfluxFormat(ctx, req) - if err != nil { - return fmt.Errorf("failed to do metrics: %s", err) - } - if len(resp.Results) > 0 && - len(resp.Results[0].Series) > 0 && - len(resp.Results[0].Series[0].Rows) > 0 { - - for _, row := range resp.Results[0].Series[0].Rows { - terminusKey := row.Values[0].GetStringValue() - servicesCount := row.Values[1].GetNumberValue() - activeTime := row.Values[2].GetNumberValue() - - servicesCountMap[terminusKey] = int64(servicesCount) - activeTimeMap[terminusKey] = int64(activeTime) / int64(time.Millisecond) - } - } - - // get alert count - req = &metricpb.QueryWithInfluxFormatRequest{ - Start: strconv.FormatInt(oneDayAgoMillSeconds, 10), - End: strconv.FormatInt(endMillSeconds, 10), - Filters: []*metricpb.Filter{ - { - Key: "tags.project_id", - Op: "in", - Value: structpb.NewListValue(projectIdList), - }, - }, - Statement: ` - SELECT project_id::tag, count(project_id::tag) - FROM analyzer_alert - WHERE alert_scope::tag = 'micro_service' - GROUP BY project_id::tag - `, - } - ctx, cancel = context.WithTimeout(context.Background(), time.Minute) - defer cancel() - resp, err = s.metricq.QueryWithInfluxFormat(ctx, req) - if err != nil { - return fmt.Errorf("failed to do metrics: %s", err) - } - if len(resp.Results) > 0 && - len(resp.Results[0].Series) > 0 && - len(resp.Results[0].Series[0].Rows) > 0 { - - for _, row := range resp.Results[0].Series[0].Rows { - projectId := row.Values[0].GetStringValue() - alertCount := row.Values[1].GetNumberValue() - - alertCountMap[projectId] = int64(alertCount) - } - } - - for _, project := range projects { - for _, workspace := range project.Relationship { - if serviceCount, ok := servicesCountMap[workspace.TenantID]; ok { - project.ServiceCount += serviceCount - } - if activeTime, ok := activeTimeMap[workspace.TenantID]; ok && activeTime > project.LastActiveTime { - project.LastActiveTime = activeTime - } - } - if alertCount, ok := alertCountMap[project.Id]; ok { - project.Last24HAlertCount = alertCount - } - } - - return nil -} - func (s *projectService) GetHistoryProjects(ctx context.Context, projectIDs []string, projects Projects) ([]apistructs.MicroServiceProjectResponseData, error) { params := url.Values{} for _, id := range projectIDs { diff --git a/modules/msp/tenant/project/project.service_test.go b/modules/msp/tenant/project/project.service_test.go index 3bffe60b888..5d8eee53d76 100644 --- a/modules/msp/tenant/project/project.service_test.go +++ b/modules/msp/tenant/project/project.service_test.go @@ -659,6 +659,7 @@ func (m *mockInfluxQl) QueryWithInfluxFormat(context.Context, *metricpb.QueryWit Rows: []*metricpb.Row{ &metricpb.Row{ Values: []*structpb.Value{ + structpb.NewStringValue("1"), structpb.NewStringValue("1"), structpb.NewNumberValue(10), structpb.NewNumberValue(100), @@ -666,6 +667,7 @@ func (m *mockInfluxQl) QueryWithInfluxFormat(context.Context, *metricpb.QueryWit }, &metricpb.Row{ Values: []*structpb.Value{ + structpb.NewStringValue("2"), structpb.NewStringValue("2"), structpb.NewNumberValue(101), structpb.NewNumberValue(1001), diff --git a/modules/msp/tenant/project/project_statistics.go b/modules/msp/tenant/project/project_statistics.go new file mode 100644 index 00000000000..66498e986fd --- /dev/null +++ b/modules/msp/tenant/project/project_statistics.go @@ -0,0 +1,207 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package project + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/protobuf/types/known/structpb" + + metricpb "github.com/erda-project/erda-proto-go/core/monitor/metric/pb" + "github.com/erda-project/erda-proto-go/msp/tenant/project/pb" +) + +type projectStats struct { + serviceCount int64 + lastActiveTime int64 + alertCount int64 +} + +func (p *projectStats) merge(data *projectStats) *projectStats { + p.serviceCount += data.serviceCount + p.alertCount += data.alertCount + if data.lastActiveTime > p.lastActiveTime { + p.lastActiveTime = data.lastActiveTime + } + return p +} + +type projectStatisticMap map[string]map[string]*projectStats + +func (ps projectStatisticMap) initOrUpdate(projectId, terminusKey string, updateFunc func(*projectStats)) { + if ps[projectId] == nil { + ps[projectId] = map[string]*projectStats{} + } + if ps[projectId][terminusKey] == nil { + ps[projectId][terminusKey] = &projectStats{serviceCount: 0, lastActiveTime: 0, alertCount: 0} + } + updateFunc(ps[projectId][terminusKey]) +} + +func (ps projectStatisticMap) statForProjectId(projectId string) (*projectStats, bool) { + tkMap, ok := ps[projectId] + if !ok { + return nil, false + } + result := &projectStats{} + for _, stats := range tkMap { + result.merge(stats) + } + return result, true +} + +func (ps projectStatisticMap) statForTerminusKeys(projectId string, terminusKeys ...string) (*projectStats, bool) { + tkMap, ok := ps[projectId] + if !ok { + return nil, false + } + result := &projectStats{} + for _, terminusKey := range terminusKeys { + if stats, ok := tkMap[terminusKey]; ok { + result.merge(stats) + } + } + return result, true +} + +func (s *projectService) getProjectsStatistics(projects Projects) error { + if len(projects) == 0 { + return nil + } + endMillSeconds := time.Now().UnixNano() / int64(time.Millisecond) + oneDayAgoMillSeconds := endMillSeconds - int64(24*time.Hour/time.Millisecond) + sevenDayAgoMillSeconds := endMillSeconds - int64(7*24*time.Hour/time.Millisecond) + + projectIdList, _ := structpb.NewList(projects. + Select(func(item *pb.Project) interface{} { return item.Id })) + terminusKeyList, _ := structpb.NewList(projects. + SelectMany(func(item *pb.Project) []interface{} { + var list []interface{} + for _, relationship := range item.Relationship { + list = append(list, relationship.TenantID) + } + return list + })) + statisticMap := projectStatisticMap{} + + // get services count and last active time + req := &metricpb.QueryWithInfluxFormatRequest{ + Start: strconv.FormatInt(sevenDayAgoMillSeconds, 10), + End: strconv.FormatInt(endMillSeconds, 10), + Filters: []*metricpb.Filter{ + { + Key: "tags.project_id", + Op: "or_in", + Value: structpb.NewListValue(projectIdList), + }, + { + Key: "tags.terminus_key", + Op: "or_in", + Value: structpb.NewListValue(terminusKeyList), + }, + }, + Statement: ` + SELECT project_id::tag, terminus_key::tag, distinct(service_id::tag), max(timestamp) + FROM application_service_node + WHERE _metric_scope::tag = 'micro_service' + GROUP BY project_id::tag, terminus_key::tag + `, + } + if err := s.doInfluxQuery(req, func(row *metricpb.Row) { + projectId := row.Values[0].GetStringValue() + terminusKey := row.Values[1].GetStringValue() + servicesCount := row.Values[2].GetNumberValue() + activeTime := row.Values[3].GetNumberValue() + + statisticMap.initOrUpdate(projectId, terminusKey, func(stats *projectStats) { + stats.serviceCount = int64(servicesCount) + stats.lastActiveTime = int64(activeTime) / int64(time.Millisecond) + }) + }); err != nil { + return err + } + + // get alert count + req = &metricpb.QueryWithInfluxFormatRequest{ + Start: strconv.FormatInt(oneDayAgoMillSeconds, 10), + End: strconv.FormatInt(endMillSeconds, 10), + Filters: []*metricpb.Filter{ + { + Key: "tags.project_id", + Op: "in", + Value: structpb.NewListValue(projectIdList), + }, + }, + Statement: ` + SELECT project_id::tag, terminus_key::tag, count(project_id::tag) + FROM analyzer_alert + WHERE alert_scope::tag = 'micro_service' + GROUP BY project_id::tag, terminus_key::tag + `, + } + if err := s.doInfluxQuery(req, func(row *metricpb.Row) { + projectId := row.Values[0].GetStringValue() + terminusKey := row.Values[1].GetStringValue() + alertCount := row.Values[2].GetNumberValue() + + statisticMap.initOrUpdate(projectId, terminusKey, func(stats *projectStats) { + stats.alertCount = int64(alertCount) + }) + }); err != nil { + return err + } + + // merge results + for _, project := range projects { + stats, ok := statisticMap.statForProjectId(project.Id) + if !ok { + var tks []string + for _, relationship := range project.Relationship { + tks = append(tks, relationship.TenantID) + } + stats, ok = statisticMap.statForTerminusKeys("", tks...) + } + if !ok { + continue + } + + project.ServiceCount = stats.serviceCount + project.Last24HAlertCount = stats.alertCount + project.LastActiveTime = stats.lastActiveTime + } + + return nil +} + +func (s *projectService) doInfluxQuery(req *metricpb.QueryWithInfluxFormatRequest, rowCallback func(row *metricpb.Row)) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + resp, err := s.metricq.QueryWithInfluxFormat(ctx, req) + if err != nil { + return fmt.Errorf("failed to do metrics: %s", err) + } + if len(resp.Results) > 0 && + len(resp.Results[0].Series) > 0 && + len(resp.Results[0].Series[0].Rows) > 0 { + + for _, row := range resp.Results[0].Series[0].Rows { + rowCallback(row) + } + } + return nil +}