From 981473505eab1f14e92c35a80b2a0398e2aa9b7c Mon Sep 17 00:00:00 2001 From: kakj <18579115540@163.com> Date: Thu, 28 Oct 2021 17:23:46 +0800 Subject: [PATCH 1/8] autotest plan action filter archived (#2663) * update projectID to projectId (#2534) * Automated test plan execution filters out archived plans Co-authored-by: littlejian <17816869670@163.com> --- .../components/actionForm/testplan-run.go | 7 ++- .../actionForm/testplan-run_test.go | 61 +++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run.go b/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run.go index 912a24fd11c..5070ea4a485 100644 --- a/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run.go +++ b/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run.go @@ -46,9 +46,10 @@ func testPlanRun(ctx context.Context, c *apistructs.Component, scenario apistruc // get testplan testPlanRequest := apistructs.TestPlanV2PagingRequest{ - ProjectID: uint64(projectId), - PageNo: 1, - PageSize: 999, + ProjectID: uint64(projectId), + PageNo: 1, + PageSize: 999, + IsArchived: &[]bool{false}[0], } testPlanRequest.UserID = bdl.Identity.UserID plans, err := bdl.Bdl.PagingTestPlansV2(testPlanRequest) diff --git a/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run_test.go b/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run_test.go index 02b6a3376e5..cbdadea79d5 100644 --- a/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run_test.go +++ b/modules/openapi/component-protocol/scenarios/action/components/actionForm/testplan-run_test.go @@ -15,11 +15,16 @@ package action import ( + "context" "fmt" "reflect" "testing" + "bou.ke/monkey" + "github.com/erda-project/erda/apistructs" + "github.com/erda-project/erda/bundle" + protocol "github.com/erda-project/erda/modules/openapi/component-protocol" ) func Test_fillTestPlanFields(t *testing.T) { @@ -139,3 +144,59 @@ func Test_fillTestPlanFields(t *testing.T) { }) } } + +func Test_testPlanRun(t *testing.T) { + type args struct { + ctx context.Context + c *apistructs.Component + scenario apistructs.ComponentProtocolScenario + event apistructs.ComponentEvent + globalStateData *apistructs.GlobalStateData + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "test empty plan", + wantErr: false, + args: args{ + c: &apistructs.Component{ + Props: map[string]interface{}{ + "fields": []apistructs.FormPropItem{}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var contextBundle = protocol.ContextBundle{} + var bdl = &bundle.Bundle{} + patch1 := monkey.PatchInstanceMethod(reflect.TypeOf(bdl), "PagingTestPlansV2", func(bdl *bundle.Bundle, req apistructs.TestPlanV2PagingRequest) (*apistructs.TestPlanV2PagingResponseData, error) { + return &apistructs.TestPlanV2PagingResponseData{ + Total: 0, + List: []*apistructs.TestPlanV2{}, + }, nil + }) + defer patch1.Unpatch() + + patch2 := monkey.PatchInstanceMethod(reflect.TypeOf(bdl), "ListAutoTestGlobalConfig", func(bdl *bundle.Bundle, req apistructs.AutoTestGlobalConfigListRequest) ([]apistructs.AutoTestGlobalConfig, error) { + return []apistructs.AutoTestGlobalConfig{}, nil + }) + defer patch2.Unpatch() + + contextBundle.Bdl = bdl + contextBundle.InParams = map[string]interface{}{ + "projectId": "1", + } + + tt.args.ctx = context.WithValue(context.Background(), protocol.GlobalInnerKeyCtxBundle.String(), contextBundle) + + if err := testPlanRun(tt.args.ctx, tt.args.c, tt.args.scenario, tt.args.event, tt.args.globalStateData); (err != nil) != tt.wantErr { + t.Errorf("testPlanRun() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} From 747bd98c2c5bebefcd9496f5856f0c08dfad45ae Mon Sep 17 00:00:00 2001 From: littlejian <17816869670@163.com> Date: Thu, 28 Oct 2021 17:43:50 +0800 Subject: [PATCH 2/8] fix: limit the labels' length of release (#2662) * limit the labels' length of release * polish the code --- modules/dicehub/service/release/release.go | 25 ++++++++ .../dicehub/service/release/release_test.go | 60 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 modules/dicehub/service/release/release_test.go diff --git a/modules/dicehub/service/release/release.go b/modules/dicehub/service/release/release.go index 059474a710f..f6a0345c222 100644 --- a/modules/dicehub/service/release/release.go +++ b/modules/dicehub/service/release/release.go @@ -83,6 +83,10 @@ func WithBundle(bdl *bundle.Bundle) Option { // Create 创建 Release func (r *Release) Create(req *apistructs.ReleaseCreateRequest) (string, error) { + if err := limitLabelsLength(req); err != nil { + return "", err + } + // 确保Version在应用层面唯一,若存在,则更新 if req.Version != "" && req.ApplicationID > 0 { releases, err := r.db.GetReleasesByAppAndVersion(req.OrgID, req.ProjectID, req.ApplicationID, req.Version) @@ -134,6 +138,27 @@ func (r *Release) Create(req *apistructs.ReleaseCreateRequest) (string, error) { return release.ReleaseID, nil } +func limitLabelsLength(req *apistructs.ReleaseCreateRequest) error { + if len(req.Labels) == 0 { + return nil + } + labelBytes, err := json.Marshal(req.Labels) + if err != nil { + return err + } + if len([]rune(string(labelBytes))) <= 1000 { + return nil + } + + for k, v := range req.Labels { + runes := []rune(v) + if len(runes) > 100 { + req.Labels[k] = string(runes[:100]) + "..." + } + } + return nil +} + // Update 更新 Release func (r *Release) Update(orgID int64, releaseID string, req *apistructs.ReleaseUpdateRequestData) error { release, err := r.db.GetRelease(releaseID) diff --git a/modules/dicehub/service/release/release_test.go b/modules/dicehub/service/release/release_test.go new file mode 100644 index 00000000000..f13dc706a2f --- /dev/null +++ b/modules/dicehub/service/release/release_test.go @@ -0,0 +1,60 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package release + +import ( + "testing" + + "k8s.io/apimachinery/pkg/util/rand" + + "github.com/erda-project/erda/apistructs" +) + +func TestLimitLabelsLength(t *testing.T) { + req1 := &apistructs.ReleaseCreateRequest{ + Labels: nil, + } + if err := limitLabelsLength(req1); err != nil { + t.Error(err) + } + + req2 := &apistructs.ReleaseCreateRequest{ + Labels: map[string]string{ + "a": rand.String(100), + "b": rand.String(101), + "c": rand.String(98) + "中文的", + }, + } + if err := limitLabelsLength(req2); err != nil { + t.Error(err) + } + + req3 := &apistructs.ReleaseCreateRequest{ + Labels: map[string]string{ + "a": rand.String(1000), + "b": rand.String(100), + "c": rand.String(98) + "中文的", + }, + } + if err := limitLabelsLength(req3); err != nil { + t.Error(err) + } + for _, v := range req3.Labels { + // end with ... + if len([]rune(v)) > 100+3 { + t.Error("fail") + } + } +} From 5aeaf8f27a88d075c140767443366971e11b4fa4 Mon Sep 17 00:00:00 2001 From: chengjoey <30427474+chengjoey@users.noreply.github.com> Date: Thu, 28 Oct 2021 19:03:57 +0800 Subject: [PATCH 3/8] Fix/autotest step value replace (#2670) * replace step value again after created * modify file record table --- .../scenes-import-record/table/render.go | 4 ++-- modules/dop/services/autotest_v2/export.go | 12 +++++++----- modules/dop/services/autotest_v2/space_data.go | 17 +++++++++++++++++ .../components/recordTable/render.go | 4 ++-- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/modules/dop/component-protocol/components/scenes-import-record/table/render.go b/modules/dop/component-protocol/components/scenes-import-record/table/render.go index b699d47f279..a350913935c 100644 --- a/modules/dop/component-protocol/components/scenes-import-record/table/render.go +++ b/modules/dop/component-protocol/components/scenes-import-record/table/render.go @@ -183,7 +183,6 @@ func (ca *ComponentAction) setData() error { Result: Result{ RenderType: "downloadUrl", URL: fmt.Sprintf("%s/api/files/%s", conf.RootDomain(), fileRecord.ApiFileUUID), - Value: fileRecord.FileName, }, }) } @@ -216,10 +215,11 @@ func (ca *ComponentAction) Render(ctx context.Context, c *cptype.Component, scen }, Column{ DataIndex: "time", Title: ca.sdk.I18n("time"), - Width: 150, + Width: 170, }, Column{ DataIndex: "desc", Title: ca.sdk.I18n("desc"), + Width: 200, }, Column{ DataIndex: "status", Title: ca.sdk.I18n("status"), diff --git a/modules/dop/services/autotest_v2/export.go b/modules/dop/services/autotest_v2/export.go index e3ad52124e3..ce6fdfa498a 100644 --- a/modules/dop/services/autotest_v2/export.go +++ b/modules/dop/services/autotest_v2/export.go @@ -19,13 +19,13 @@ import ( "fmt" "io/ioutil" "strconv" + "time" "github.com/sirupsen/logrus" "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/dop/dao" "github.com/erda-project/erda/modules/dop/services/apierrors" - "github.com/erda-project/erda/modules/dop/services/i18n" ) type AutoTestSpaceDB struct { @@ -159,8 +159,7 @@ func (svc *Service) Export(req apistructs.AutoTestSpaceExportRequest) (uint64, e return 0, apierrors.ErrExportAutoTestSpace.InvalidParameter("fileType") } - l := svc.bdl.GetLocale(req.Locale) - fileName := l.Get(i18n.I18nKeySpaceSheetName) + fileName := svc.MakeAutotestFileName(req.SpaceName) if req.FileType == apistructs.TestSpaceFileTypeExcel { fileName += ".xlsx" } @@ -256,8 +255,7 @@ func (svc *Service) ExportSceneSet(req apistructs.AutoTestSceneSetExportRequest) return 0, apierrors.ErrExportAutoTestSceneSet.InvalidParameter("fileType") } - l := svc.bdl.GetLocale(req.Locale) - fileName := fmt.Sprintf("%s-%s", l.Get(i18n.I18nKeySceneSetSheetName), req.SceneSetName) + fileName := svc.MakeAutotestFileName(req.SceneSetName) if req.FileType == apistructs.TestSceneSetFileTypeExcel { fileName += ".xlsx" } @@ -345,3 +343,7 @@ func (svc *Service) ExportSceneSetFile(record *dao.TestFileRecord) { return } } + +func (svc *Service) MakeAutotestFileName(origin string) string { + return fmt.Sprintf("%s_%s", origin, time.Now().Format("20060102150405")) +} diff --git a/modules/dop/services/autotest_v2/space_data.go b/modules/dop/services/autotest_v2/space_data.go index 93b1c088715..64560a3cf5c 100644 --- a/modules/dop/services/autotest_v2/space_data.go +++ b/modules/dop/services/autotest_v2/space_data.go @@ -705,6 +705,15 @@ func (a *AutoTestSpaceData) CopySceneSteps() error { return err } a.stepIDAssociationMap[each.ID] = newStep.ID + // replace value again because loop strategy may use self id + tmpValue := replacePreStepValue(newStep.Value, a.stepIDAssociationMap) + if tmpValue != newStep.Value { + newStep.Value = tmpValue + if err = a.svc.db.UpdateAutotestSceneStep(newStep); err != nil { + return err + } + } + head = newStep.ID pHead := newStep.ID @@ -728,6 +737,14 @@ func (a *AutoTestSpaceData) CopySceneSteps() error { } pHead = newPStep.ID a.stepIDAssociationMap[pv.ID] = newPStep.ID + // replace value again because loop strategy may use self id + tmpValue = replacePreStepValue(newPStep.Value, a.stepIDAssociationMap) + if tmpValue != newPStep.Value { + newPStep.Value = tmpValue + if err = a.svc.db.UpdateAutotestSceneStep(newPStep); err != nil { + return err + } + } } } } diff --git a/modules/openapi/component-protocol/scenarios/auto-test-space-list/components/recordTable/render.go b/modules/openapi/component-protocol/scenarios/auto-test-space-list/components/recordTable/render.go index 17d30fff007..9c166d9e5bb 100644 --- a/modules/openapi/component-protocol/scenarios/auto-test-space-list/components/recordTable/render.go +++ b/modules/openapi/component-protocol/scenarios/auto-test-space-list/components/recordTable/render.go @@ -122,7 +122,7 @@ func (r *RecordTable) setProps() { }, Column{ DataIndex: "operator", Title: i18nLocale.Get(i18n.I18nKeyTableOperator), - Width: 150, + Width: 170, }, Column{ DataIndex: "time", Title: i18nLocale.Get(i18n.I18nKeyTableTime), @@ -130,6 +130,7 @@ func (r *RecordTable) setProps() { }, Column{ DataIndex: "desc", Title: i18nLocale.Get(i18n.I18nKeyTableDesc), + Width: 200, }, Column{ DataIndex: "status", Title: i18nLocale.Get(i18n.I18nKeyTableStatus), @@ -199,7 +200,6 @@ func (r *RecordTable) setData() error { Result: Result{ RenderType: "downloadUrl", URL: fmt.Sprintf("%s/api/files/%s", conf.RootDomain(), fileRecord.ApiFileUUID), - Value: fileRecord.FileName, }, }) } From 6b117e67126704e692b5a00966555d6b2648ab1c Mon Sep 17 00:00:00 2001 From: erda-bot <81558540+erda-bot@users.noreply.github.com> Date: Thu, 28 Oct 2021 19:26:15 +0800 Subject: [PATCH 4/8] fix(scheduler): compose deployment with imagepullsecrets (#2456) (#2673) Co-authored-by: Muzry --- .../executor/plugins/k8s/deployment.go | 24 ++++- .../executor/plugins/k8s/deployment_test.go | 98 +++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 modules/scheduler/executor/plugins/k8s/deployment_test.go diff --git a/modules/scheduler/executor/plugins/k8s/deployment.go b/modules/scheduler/executor/plugins/k8s/deployment.go index 334a0224d57..afe5f63ebcd 100644 --- a/modules/scheduler/executor/plugins/k8s/deployment.go +++ b/modules/scheduler/executor/plugins/k8s/deployment.go @@ -17,6 +17,7 @@ package k8s import ( "context" "fmt" + "os" "strconv" "strings" @@ -29,6 +30,7 @@ import ( "github.com/erda-project/erda/apistructs" "github.com/erda-project/erda/modules/scheduler/executor/plugins/k8s/k8sapi" + "github.com/erda-project/erda/modules/scheduler/executor/plugins/k8s/k8serror" "github.com/erda-project/erda/modules/scheduler/executor/plugins/k8s/toleration" "github.com/erda-project/erda/pkg/parser/diceyml" "github.com/erda-project/erda/pkg/schedule/schedulepolicy/constraintbuilders" @@ -42,6 +44,7 @@ const ( shardDirSuffix = "-shard-dir" sidecarNamePrefix = "sidecar-" EnableServiceLinks = "ENABLE_SERVICE_LINKS" + RegistrySecretName = "REGISTRY_SECRET_NAME" ) func (k *Kubernetes) createDeployment(ctx context.Context, service *apistructs.Service, sg *apistructs.ServiceGroup) error { @@ -478,6 +481,25 @@ func (k *Kubernetes) newDeployment(service *apistructs.Service, sg *apistructs.S }, } + // need to set the secret in default namespace which named with REGISTRY_SECRET_NAME env + registryName := os.Getenv(RegistrySecretName) + if registryName == "" { + registryName = AliyunRegistry + } + + _, err := k.secret.Get(service.Namespace, registryName) + if err == nil { + deployment.Spec.Template.Spec.ImagePullSecrets = []apiv1.LocalObjectReference{ + { + Name: registryName, + }, + } + } else { + if !k8serror.NotFound(err) { + return nil, fmt.Errorf("get secret %s in namespace %s err: %v", registryName, service.Namespace, err) + } + } + if v := k.options["FORCE_BLUE_GREEN_DEPLOY"]; v != "true" && (strutil.ToUpper(service.Env[DiceWorkSpace]) == apistructs.DevWorkspace.String() || strutil.ToUpper(service.Env[DiceWorkSpace]) == apistructs.TestWorkspace.String()) { @@ -503,7 +525,7 @@ func (k *Kubernetes) newDeployment(service *apistructs.Service, sg *apistructs.S Image: service.Image, } - err := k.setContainerResources(*service, &container) + err = k.setContainerResources(*service, &container) if err != nil { errMsg := fmt.Sprintf("set container resource err: %v", err) logrus.Errorf(errMsg) diff --git a/modules/scheduler/executor/plugins/k8s/deployment_test.go b/modules/scheduler/executor/plugins/k8s/deployment_test.go new file mode 100644 index 00000000000..b6324fe9ae5 --- /dev/null +++ b/modules/scheduler/executor/plugins/k8s/deployment_test.go @@ -0,0 +1,98 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s + +import ( + "reflect" + "testing" + + "bou.ke/monkey" + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + + "github.com/erda-project/erda/apistructs" + "github.com/erda-project/erda/modules/scheduler/executor/plugins/k8s/secret" + "github.com/erda-project/erda/pkg/parser/diceyml" +) + +func TestNewDeployment(t *testing.T) { + service := &apistructs.Service{ + Name: "test-service", + Namespace: "test", + Image: "test", + ImageUsername: "", + ImagePassword: "", + Cmd: "", + Ports: nil, + ProxyPorts: nil, + Vip: "", + ShortVIP: "", + ProxyIp: "", + PublicIp: "", + Scale: 0, + Resources: apistructs.Resources{ + Cpu: 0.1, + Mem: 512, + }, + Depends: nil, + Env: nil, + Labels: nil, + DeploymentLabels: nil, + Selectors: nil, + Binds: nil, + Volumes: nil, + Hosts: nil, + HealthCheck: nil, + NewHealthCheck: nil, + SideCars: nil, + InitContainer: nil, + InstanceInfos: nil, + MeshEnable: nil, + TrafficSecurity: diceyml.TrafficSecurity{}, + WorkLoad: "", + ProjectServiceName: "", + K8SSnippet: nil, + StatusDesc: apistructs.StatusDesc{}, + } + + servicegroup := &apistructs.ServiceGroup{ + Dice: apistructs.Dice{ + ID: "test", + Type: "service", + Labels: map[string]string{}, + Services: []apistructs.Service{*service}, + ServiceDiscoveryKind: "", + ServiceDiscoveryMode: "", + ProjectNamespace: "", + }, + } + + k := &Kubernetes{ + secret: &secret.Secret{}, + } + + monkey.PatchInstanceMethod(reflect.TypeOf(k.secret), "Get", func(sec *secret.Secret, namespace, name string) (*apiv1.Secret, error) { + return &apiv1.Secret{}, nil + }) + monkey.PatchInstanceMethod(reflect.TypeOf(k), "CopyErdaSecrets", func(kube *Kubernetes, originns, dstns string) ([]apiv1.Secret, error) { + return []apiv1.Secret{}, nil + }) + monkey.PatchInstanceMethod(reflect.TypeOf(k), "AddPodMountVolume", func(kube *Kubernetes, service *apistructs.Service, podSpec *apiv1.PodSpec, + secretvolmounts []apiv1.VolumeMount, secretvolumes []apiv1.Volume) error { + return nil + }) + _, err := k.newDeployment(service, servicegroup) + assert.Equal(t, err, nil) +} From 59c4a7061e7431ec6389412c288ab5ba43c3fab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Fri, 29 Oct 2021 09:49:23 +0800 Subject: [PATCH 5/8] =?UTF-8?q?fix:=20query=20project=20request=20resource?= =?UTF-8?q?:=20from=20s=5Fpod=5Finfo=20not=20from=20cmp;=20pr=E2=80=A6=20(?= =?UTF-8?q?#2677)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: query project request resource: from s_pod_info not from cmp; project quota migration; not allowed to udpate the project with quota to empty quota * feature: add ut * feature: allows delete stmt * feature: fix ut --- .../20211028-delete-quota-without-project.sql | 1 + .erda/migrations/config.yml | 2 +- modules/cmp/resource/report_table.go | 16 +- modules/cmp/tasks/collect_daily_quota.go | 20 +- modules/cmp/tasks/ticker.go | 26 +- modules/cmp/tasks/ticker_test.go | 4 +- .../core-services/services/project/project.go | 275 +++++++++--------- .../services/project/project_test.go | 28 ++ pkg/resourcecalculator/calculator.go | 110 +++---- 9 files changed, 248 insertions(+), 234 deletions(-) create mode 100644 .erda/migrations/cmdb/20211028-delete-quota-without-project.sql diff --git a/.erda/migrations/cmdb/20211028-delete-quota-without-project.sql b/.erda/migrations/cmdb/20211028-delete-quota-without-project.sql new file mode 100644 index 00000000000..3d008596283 --- /dev/null +++ b/.erda/migrations/cmdb/20211028-delete-quota-without-project.sql @@ -0,0 +1 @@ +DELETE FROM ps_group_projects_quota WHERE project_id NOT IN (SELECT id FROM ps_group_projects); diff --git a/.erda/migrations/config.yml b/.erda/migrations/config.yml index 408435d2dd2..8f0a9d61fd0 100644 --- a/.erda/migrations/config.yml +++ b/.erda/migrations/config.yml @@ -65,7 +65,7 @@ allowed_dml: union_stmt: true load_data_stmt: false insert_stmt: true - delete_stmt: false + delete_stmt: true update_stmt: true show_stmt: true split_region_stmt: false diff --git a/modules/cmp/resource/report_table.go b/modules/cmp/resource/report_table.go index 39437b8a683..55226a2feb1 100644 --- a/modules/cmp/resource/report_table.go +++ b/modules/cmp/resource/report_table.go @@ -39,11 +39,11 @@ type ReportTable struct { } func NewReportTable(opts ...ReportTableOption) *ReportTable { - var t ReportTable - for _, f := range opts { - f(&t) + var table ReportTable + for _, opt := range opts { + opt(&table) } - return &t + return &table } func (rt *ReportTable) GetResourceOverviewReport(ctx context.Context, orgID int64, clusterNames []string, @@ -193,13 +193,13 @@ func ReportTableWithCMP(cmp interface { ListSteveResource(ctx context.Context, req *apistructs.SteveRequest) ([]types.APIObject, error) GetNamespacesResources(ctx context.Context, nReq *pb.GetNamespacesResourcesRequest) (*pb.GetNamespacesResourcesResponse, error) }) ReportTableOption { - return func(theTable *ReportTable) { - theTable.cmp = cmp + return func(table *ReportTable) { + table.cmp = cmp } } func ReportTableWithTrans(trans i18n.Translator) ReportTableOption { - return func(theTable *ReportTable) { - theTable.trans = trans + return func(table *ReportTable) { + table.trans = trans } } diff --git a/modules/cmp/tasks/collect_daily_quota.go b/modules/cmp/tasks/collect_daily_quota.go index 32ef6e78a9e..2ee851c5dc3 100644 --- a/modules/cmp/tasks/collect_daily_quota.go +++ b/modules/cmp/tasks/collect_daily_quota.go @@ -43,11 +43,11 @@ type DailyQuotaCollector struct { } func NewDailyQuotaCollector(opts ...DailyQuotaCollectorOption) *DailyQuotaCollector { - var d DailyQuotaCollector - for _, opt := range opts { - opt(&d) + var collector DailyQuotaCollector + for _, f := range opts { + f(&collector) } - return &d + return &collector } func (d *DailyQuotaCollector) Task() (bool, error) { @@ -261,19 +261,19 @@ func DailyQuotaCollectorWithDBClient(client *dbclient.DBClient) DailyQuotaCollec } } -func DailyQuotaCollectorWithBundle(bdl *bundle.Bundle) DailyQuotaCollectorOption { - return func(c *DailyQuotaCollector) { - c.bdl = bdl +func DailyQuotaCollectorWithBundle(bndl *bundle.Bundle) DailyQuotaCollectorOption { + return func(collector *DailyQuotaCollector) { + collector.bdl = bndl } } -func DailyQuotaCollectorWithCMPAPI(cmp interface { +func DailyQuotaCollectorWithCMPAPI(cmpCli interface { ListSteveResource(ctx context.Context, req *apistructs.SteveRequest) ([]types.APIObject, error) GetNamespacesResources(ctx context.Context, nReq *pb.GetNamespacesResourcesRequest) (*pb.GetNamespacesResourcesResponse, error) GetClustersResources(ctx context.Context, cReq *pb.GetClustersResourcesRequest) (*pb.GetClusterResourcesResponse, error) GetAllClusters() []string }) DailyQuotaCollectorOption { - return func(c *DailyQuotaCollector) { - c.cmp = cmp + return func(collector *DailyQuotaCollector) { + collector.cmp = cmpCli } } diff --git a/modules/cmp/tasks/ticker.go b/modules/cmp/tasks/ticker.go index 72c810f542d..184c1a287c5 100644 --- a/modules/cmp/tasks/ticker.go +++ b/modules/cmp/tasks/ticker.go @@ -20,11 +20,11 @@ import ( ) type ExitError struct { - Msg string + Message string } func (e ExitError) Error() string { - return e.Msg + return e.Message } type Task func() error @@ -36,26 +36,26 @@ type Ticker struct { done chan bool } -func New(interval time.Duration, task func() (bool, error)) *Ticker { +func New(intervals time.Duration, f func() (bool, error)) *Ticker { return &Ticker{ - Interval: interval, - Task: task, + Interval: intervals, + Task: f, done: make(chan bool), } } func (d *Ticker) Run() error { - t := time.NewTicker(d.Interval) - defer t.Stop() + tick := time.NewTicker(d.Interval) + defer tick.Stop() var ( err error - over bool + done bool ) fmt.Printf("the interval task %s is running right now: %s\n", d.Name, time.Now().Format(time.RFC3339)) - over, err = d.Task() + done, err = d.Task() fmt.Printf("the interval task %s is complete this time, err: %v\n", d.Name, err) - if over { + if done { d.Close() return err } @@ -65,11 +65,11 @@ func (d *Ticker) Run() error { case <-d.done: fmt.Printf("the interval task %s is done!\n", d.Name) return err - case t := <-t.C: + case t := <-tick.C: fmt.Printf("the interval task %s is running at: %s\n", d.Name, t.Format(time.RFC3339)) - over, err = d.Task() + done, err = d.Task() fmt.Printf("the interval task %s is complete this time, err: %v\n", d.Name, err) - if over { + if done { d.Close() } } diff --git a/modules/cmp/tasks/ticker_test.go b/modules/cmp/tasks/ticker_test.go index c1cb4b2eb9d..7524f6d4b13 100644 --- a/modules/cmp/tasks/ticker_test.go +++ b/modules/cmp/tasks/ticker_test.go @@ -25,7 +25,7 @@ import ( ) func TestExitError_Error(t *testing.T) { - var e = tasks.ExitError{Msg: "something wrong"} + var e = tasks.ExitError{Message: "something wrong"} t.Log(e.Error()) } @@ -35,7 +35,7 @@ func TestTicker_Close(t *testing.T) { times++ fmt.Println("times:", times) if times > 5 { - return true, &tasks.ExitError{Msg: "time over"} + return true, &tasks.ExitError{Message: "time over"} } if times > 3 { return false, errors.New("normal error") diff --git a/modules/core-services/services/project/project.go b/modules/core-services/services/project/project.go index cce74b158b8..64a1823a1c3 100644 --- a/modules/core-services/services/project/project.go +++ b/modules/core-services/services/project/project.go @@ -59,45 +59,45 @@ type Option func(*Project) // New 新建 Project 实例,通过 Project 实例操作企业资源 func New(options ...Option) *Project { - p := &Project{} - for _, op := range options { - op(p) + project := &Project{} + for _, f := range options { + f(project) } - return p + return project } // WithDBClient 配置 db client func WithDBClient(db *dao.DBClient) Option { - return func(p *Project) { - p.db = db + return func(project *Project) { + project.db = db } } // WithUCClient 配置 uc client func WithUCClient(uc *ucauth.UCClient) Option { - return func(p *Project) { - p.uc = uc + return func(project *Project) { + project.uc = uc } } // WithBundle 配置 bundle func WithBundle(bdl *bundle.Bundle) Option { - return func(p *Project) { - p.bdl = bdl + return func(project *Project) { + project.bdl = bdl } } // WithClusterResourceClient set the gRPC client of CMP cluster resource func WithClusterResourceClient(cli dashboardPb.ClusterResourceServer) Option { - return func(p *Project) { - p.clusterResourceClient = cli + return func(project *Project) { + project.clusterResourceClient = cli } } // WithI18n set the translator func WithI18n(translator i18n.Translator) Option { - return func(p *Project) { - p.trans = translator + return func(project *Project) { + project.trans = translator } } @@ -140,6 +140,10 @@ func (p *Project) Create(userID string, createReq *apistructs.ProjectCreateReque if createReq.OrgID == 0 { return nil, errors.Errorf("failed to create project(org id is empty)") } + // 只有 DevOps 类型的项目,才能配置 quota + if createReq.Template != apistructs.DevopsTemplate { + createReq.ResourceConfigs = nil + } var clusterConfig []byte if createReq.ResourceConfigs != nil { if err := createReq.ResourceConfigs.Check(); err != nil { @@ -349,34 +353,38 @@ func (p *Project) Update(orgID, projectID int64, userID string, updateReq *apist return nil, errors.Errorf("failed to update project") } + var oldQuota = new(model.ProjectQuota) + err = p.db.First(oldQuota, map[string]interface{}{"project_id": projectID}).Error + hasOldQuota := err == nil + if updateReq.ResourceConfigs == nil { + if hasOldQuota { + return nil, errors.Errorf("cant not update project quota to empty") + } tx.Commit() return &project, nil } // create or update quota - var ( - oldQuota = new(model.ProjectQuota) - quota = model.ProjectQuota{ - ProjectID: uint64(projectID), - ProjectName: updateReq.Name, - ProdClusterName: updateReq.ResourceConfigs.PROD.ClusterName, - StagingClusterName: updateReq.ResourceConfigs.STAGING.ClusterName, - TestClusterName: updateReq.ResourceConfigs.TEST.ClusterName, - DevClusterName: updateReq.ResourceConfigs.DEV.ClusterName, - ProdCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.PROD.CPUQuota), - ProdMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.PROD.MemQuota), - StagingCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.PROD.CPUQuota), - StagingMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.STAGING.MemQuota), - TestCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.TEST.CPUQuota), - TestMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.TEST.MemQuota), - DevCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.DEV.CPUQuota), - DevMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.DEV.MemQuota), - CreatorID: userID, - UpdaterID: userID, - } - ) - if err = p.db.First(oldQuota, map[string]interface{}{"project_id": projectID}).Error; err == nil { + var quota = model.ProjectQuota{ + ProjectID: uint64(projectID), + ProjectName: updateReq.Name, + ProdClusterName: updateReq.ResourceConfigs.PROD.ClusterName, + StagingClusterName: updateReq.ResourceConfigs.STAGING.ClusterName, + TestClusterName: updateReq.ResourceConfigs.TEST.ClusterName, + DevClusterName: updateReq.ResourceConfigs.DEV.ClusterName, + ProdCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.PROD.CPUQuota), + ProdMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.PROD.MemQuota), + StagingCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.PROD.CPUQuota), + StagingMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.STAGING.MemQuota), + TestCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.TEST.CPUQuota), + TestMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.TEST.MemQuota), + DevCPUQuota: calcu.CoreToMillcore(updateReq.ResourceConfigs.DEV.CPUQuota), + DevMemQuota: calcu.GibibyteToByte(updateReq.ResourceConfigs.DEV.MemQuota), + CreatorID: userID, + UpdaterID: userID, + } + if hasOldQuota { quota.ID = oldQuota.ID quota.CreatorID = oldQuota.CreatorID err = tx.Debug().Save("a).Error @@ -614,134 +622,101 @@ func (p *Project) Get(ctx context.Context, projectID int64) (*apistructs.Project projectDTO.Owners = append(projectDTO.Owners, v.UserID) } + // 查询项目 quota + p.fetchQuota(&projectDTO) + // 查询项目下的 pod 的 request 数据 + p.fetchPodInfo(&projectDTO) + // 根据已有统计值计算比率 + p.calcuRequestRate(&projectDTO) + // 查询对应的集群实际可用资源 + p.fetchAvailable(ctx, &projectDTO) + // 对比 quota 和实际可用资源,打 tips + p.makeProjectDtoTips(&projectDTO, langCodes) + + return &projectDTO, nil +} + +func (p *Project) fetchQuota(dto *apistructs.ProjectDTO) { var projectQuota model.ProjectQuota - if err := p.db.First(&projectQuota, map[string]interface{}{"project_id": projectID}).Error; err != nil { - logrus.WithError(err).WithField("project_id", projectID). + if err := p.db.First(&projectQuota, map[string]interface{}{"project_id": dto.ID}).Error; err != nil { + logrus.WithError(err).WithField("project_id", dto.ID). Warnln("failed to select the quota record of the project") - return &projectDTO, nil - } - projectDTO.ClusterConfig = make(map[string]string) - projectDTO.ResourceConfig = apistructs.NewResourceConfig() - projectDTO.ClusterConfig["PROD"] = projectQuota.ProdClusterName - projectDTO.ClusterConfig["STAGING"] = projectQuota.StagingClusterName - projectDTO.ClusterConfig["TEST"] = projectQuota.TestClusterName - projectDTO.ClusterConfig["DEV"] = projectQuota.DevClusterName - projectDTO.ResourceConfig.PROD.ClusterName = projectQuota.ProdClusterName - projectDTO.ResourceConfig.STAGING.ClusterName = projectQuota.StagingClusterName - projectDTO.ResourceConfig.TEST.ClusterName = projectQuota.TestClusterName - projectDTO.ResourceConfig.DEV.ClusterName = projectQuota.DevClusterName - projectDTO.ResourceConfig.PROD.CPUQuota = calcu.MillcoreToCore(projectQuota.ProdCPUQuota, 3) - projectDTO.ResourceConfig.STAGING.CPUQuota = calcu.MillcoreToCore(projectQuota.StagingCPUQuota, 3) - projectDTO.ResourceConfig.TEST.CPUQuota = calcu.MillcoreToCore(projectQuota.TestCPUQuota, 3) - projectDTO.ResourceConfig.DEV.CPUQuota = calcu.MillcoreToCore(projectQuota.DevCPUQuota, 3) - projectDTO.ResourceConfig.PROD.MemQuota = calcu.ByteToGibibyte(projectQuota.ProdMemQuota, 3) - projectDTO.ResourceConfig.STAGING.MemQuota = calcu.ByteToGibibyte(projectQuota.StagingMemQuota, 3) - projectDTO.ResourceConfig.TEST.MemQuota = calcu.ByteToGibibyte(projectQuota.TestMemQuota, 3) - projectDTO.ResourceConfig.DEV.MemQuota = calcu.ByteToGibibyte(projectQuota.DevMemQuota, 3) - projectDTO.CpuQuota = calcu.MillcoreToCore(projectQuota.ProdCPUQuota+projectQuota.StagingCPUQuota+projectQuota.TestCPUQuota+projectQuota.DevCPUQuota, 3) - projectDTO.MemQuota = calcu.ByteToGibibyte(projectQuota.ProdMemQuota+projectQuota.StagingMemQuota+projectQuota.TestMemQuota+projectQuota.DevMemQuota, 3) + return + } + dto.ClusterConfig = make(map[string]string) + dto.ResourceConfig = apistructs.NewResourceConfig() + dto.ClusterConfig["PROD"] = projectQuota.ProdClusterName + dto.ClusterConfig["STAGING"] = projectQuota.StagingClusterName + dto.ClusterConfig["TEST"] = projectQuota.TestClusterName + dto.ClusterConfig["DEV"] = projectQuota.DevClusterName + dto.ResourceConfig.PROD.ClusterName = projectQuota.ProdClusterName + dto.ResourceConfig.STAGING.ClusterName = projectQuota.StagingClusterName + dto.ResourceConfig.TEST.ClusterName = projectQuota.TestClusterName + dto.ResourceConfig.DEV.ClusterName = projectQuota.DevClusterName + dto.ResourceConfig.PROD.CPUQuota = calcu.MillcoreToCore(projectQuota.ProdCPUQuota, 3) + dto.ResourceConfig.STAGING.CPUQuota = calcu.MillcoreToCore(projectQuota.StagingCPUQuota, 3) + dto.ResourceConfig.TEST.CPUQuota = calcu.MillcoreToCore(projectQuota.TestCPUQuota, 3) + dto.ResourceConfig.DEV.CPUQuota = calcu.MillcoreToCore(projectQuota.DevCPUQuota, 3) + dto.ResourceConfig.PROD.MemQuota = calcu.ByteToGibibyte(projectQuota.ProdMemQuota, 3) + dto.ResourceConfig.STAGING.MemQuota = calcu.ByteToGibibyte(projectQuota.StagingMemQuota, 3) + dto.ResourceConfig.TEST.MemQuota = calcu.ByteToGibibyte(projectQuota.TestMemQuota, 3) + dto.ResourceConfig.DEV.MemQuota = calcu.ByteToGibibyte(projectQuota.DevMemQuota, 3) + dto.CpuQuota = calcu.MillcoreToCore(projectQuota.ProdCPUQuota+projectQuota.StagingCPUQuota+projectQuota.TestCPUQuota+projectQuota.DevCPUQuota, 3) + dto.MemQuota = calcu.ByteToGibibyte(projectQuota.ProdMemQuota+projectQuota.StagingMemQuota+projectQuota.TestMemQuota+projectQuota.DevMemQuota, 3) +} +func (p *Project) fetchPodInfo(dto *apistructs.ProjectDTO) { var podInfos []apistructs.PodInfo - if err := p.db.Find(&podInfos, map[string]interface{}{"project_id": projectID}).Error; err != nil { - logrus.WithError(err).WithField("project_id", projectID). + if err := p.db.Find(&podInfos, map[string]interface{}{"project_id": dto.ID}).Error; err != nil { + logrus.WithError(err).WithField("project_id", dto.ID). Warnln("failed to Find the namespaces info in the project") - return &projectDTO, nil + return } - var ( - // {"cluster_name": {"namespace": "workspace"}} - namespaces = make(map[string]map[string]string) - addonNamespaces = make(map[string]bool) // key: k8s_namespace - serviceNamespaces = make(map[string]bool) // key: k8s_namespace - ) for _, podInfo := range podInfos { - if _, ok := namespaces[podInfo.Cluster]; ok { - namespaces[podInfo.Cluster][podInfo.K8sNamespace] = podInfo.Workspace - } else { - namespaces[podInfo.Cluster] = map[string]string{podInfo.K8sNamespace: podInfo.Workspace} - } - - switch podInfo.ServiceType { - case "addon": - addonNamespaces[podInfo.K8sNamespace] = true - case "stateless-service": - serviceNamespaces[podInfo.K8sNamespace] = true - } - } - var resourceRequest dashboardPb.GetNamespacesResourcesRequest - for clusterName, v := range namespaces { - if len(v) == 0 { - continue - } - for namespace := range v { - resourceRequest.Namespaces = append(resourceRequest.Namespaces, &dashboardPb.ClusterNamespacePair{ - ClusterName: clusterName, - Namespace: namespace, - }) - } - } - - resources, err := p.clusterResourceClient.GetNamespacesResources(ctx, &resourceRequest) - if err != nil { - logrus.WithError(err).Errorln("failed to GetNamespacesResources from CMP") - return nil, errors.Wrap(err, "failed to GetNamespacesResources from CMP") - } - - for _, clusterItem := range resources.List { - if !clusterItem.GetSuccess() { - logrus.WithField("cluster_name", clusterItem.GetClusterName()).WithField("err", clusterItem.GetErr()). - Warnln("the cluster is not valid now") - continue - } - - for workspace, source := range map[string]*apistructs.ResourceConfigInfo{ - "prod": projectDTO.ResourceConfig.PROD, - "staging": projectDTO.ResourceConfig.STAGING, - "test": projectDTO.ResourceConfig.TEST, - "dev": projectDTO.ResourceConfig.DEV, + for workspace, resourceConfig := range map[string]*apistructs.ResourceConfigInfo{ + "prod": dto.ResourceConfig.PROD, + "staging": dto.ResourceConfig.STAGING, + "test": dto.ResourceConfig.TEST, + "dev": dto.ResourceConfig.DEV, } { - if clusterItem.GetClusterName() != source.ClusterName { + if !strings.EqualFold(podInfo.Workspace, workspace) { continue } - if _, ok := namespaces[clusterItem.GetClusterName()]; !ok { + if podInfo.Cluster != resourceConfig.ClusterName { continue } - for _, namespaceItem := range clusterItem.List { - // 如果 namespace 不是要求的 workspace 下的,忽略 - if w := namespaces[clusterItem.GetClusterName()][namespaceItem.GetNamespace()]; w != workspace { - continue - } - - cpuRequest := calcu.MillcoreToCore(namespaceItem.GetCpuRequest(), 3) - memRequest := calcu.ByteToGibibyte(namespaceItem.GetMemRequest(), 3) - source.CPURequest += cpuRequest - source.MemRequest += memRequest - if _, ok := addonNamespaces[namespaceItem.GetNamespace()]; ok { - source.CPURequestByAddon += cpuRequest - source.MemRequestByAddon += memRequest - } - if _, ok := serviceNamespaces[namespaceItem.GetNamespace()]; ok { - source.CPURequestByService += cpuRequest - source.MemRequestByService += memRequest - } + resourceConfig.CPURequest += podInfo.CPURequest + resourceConfig.MemRequest += podInfo.MemRequest / 1024 + switch podInfo.ServiceType { + case "addon": + resourceConfig.CPURequestByAddon += podInfo.CPURequest + resourceConfig.MemRequestByAddon += podInfo.MemRequest / 1024 + case "stateless-service": + resourceConfig.CPURequestByService += podInfo.CPURequest + resourceConfig.MemRequestByService += podInfo.MemRequest / 1024 } } } - - p.fetchAvailable(ctx, &projectDTO, projectQuota.ClustersNames()) - - // 根据已有统计值计算其他统计值 - p.patchProjectDto(&projectDTO, langCodes) - - return &projectDTO, nil } // 查出各环境的实际可用资源 // 各环境的实际可用资源 = 有该环境标签的所有集群的可用资源之和 // 每台机器的可用资源 = 该机器的 allocatable - 该机器的 request -func (p *Project) fetchAvailable(ctx context.Context, dto *apistructs.ProjectDTO, clusterNames []string) { - clustersResources, err := p.clusterResourceClient.GetClustersResources(ctx, - &dashboardPb.GetClustersResourcesRequest{ClusterNames: strutil.DedupSlice(clusterNames)}) +func (p *Project) fetchAvailable(ctx context.Context, dto *apistructs.ProjectDTO) { + if dto.ResourceConfig == nil { + return + } + clusterNames := strutil.DedupSlice([]string{ + dto.ResourceConfig.PROD.ClusterName, + dto.ResourceConfig.STAGING.ClusterName, + dto.ResourceConfig.TEST.ClusterName, + dto.ResourceConfig.DEV.ClusterName, + }) + req := &dashboardPb.GetClustersResourcesRequest{ClusterNames: clusterNames} + ctx, cancel := context.WithTimeout(ctx, time.Second*15) + defer cancel() + clustersResources, err := p.clusterResourceClient.GetClustersResources(ctx, req) if err != nil { logrus.WithError(err).WithField("func", "fetchAvailable"). Errorf("failed to GetClustersResources, clusterNames: %v", clusterNames) @@ -775,8 +750,8 @@ func (p *Project) fetchAvailable(ctx context.Context, dto *apistructs.ProjectDTO } } -// 根据已有统计值计算其他统计值 -func (p *Project) patchProjectDto(dto *apistructs.ProjectDTO, langCodes i18n.LanguageCodes) { +// 根据已有统计值计算比率 +func (p *Project) calcuRequestRate(dto *apistructs.ProjectDTO) { for _, source := range []*apistructs.ResourceConfigInfo{ dto.ResourceConfig.PROD, dto.ResourceConfig.STAGING, @@ -793,6 +768,16 @@ func (p *Project) patchProjectDto(dto *apistructs.ProjectDTO, langCodes i18n.Lan source.MemRequestByAddonRate = source.MemRequestByAddon / source.MemQuota source.MemRequestByServiceRate = source.MemRequestByService / source.MemQuota } + } +} + +func (p *Project) makeProjectDtoTips(dto *apistructs.ProjectDTO, langCodes i18n.LanguageCodes) { + for _, source := range []*apistructs.ResourceConfigInfo{ + dto.ResourceConfig.PROD, + dto.ResourceConfig.STAGING, + dto.ResourceConfig.TEST, + dto.ResourceConfig.DEV, + } { if source.CPUAvailable < source.CPUQuota || source.MemAvailable < source.MemQuota { source.Tips = p.trans.Text(langCodes, "AvailableIsLessThanQuota") } diff --git a/modules/core-services/services/project/project_test.go b/modules/core-services/services/project/project_test.go index 4d6d7601d4f..aeb0037f272 100644 --- a/modules/core-services/services/project/project_test.go +++ b/modules/core-services/services/project/project_test.go @@ -196,6 +196,34 @@ func Test_getMemberFromMembers(t *testing.T) { } } +func Test_calcuRequestRate(t *testing.T) { + var ( + prod = apistructs.ResourceConfigInfo{ + CPUQuota: 100, + CPURequest: 50, + CPURequestByAddon: 30, + CPURequestByService: 10, + MemQuota: 100, + MemRequest: 500, + MemRequestByAddon: 30, + MemRequestByService: 10, + } + staging = prod + test = prod + dev = prod + ) + var dto = &apistructs.ProjectDTO{ + ResourceConfig: &apistructs.ResourceConfigsInfo{ + PROD: &prod, + STAGING: &staging, + TEST: &test, + DEV: &dev, + }, + } + p := new(Project) + p.calcuRequestRate(dto) +} + // TODO We need to turn this ut on after adding the delete portal to the UI // func TestDeleteProjectWhenAddonExists(t *testing.T) { // db := &dao.DBClient{} diff --git a/pkg/resourcecalculator/calculator.go b/pkg/resourcecalculator/calculator.go index f018cf36951..3ec70048586 100644 --- a/pkg/resourcecalculator/calculator.go +++ b/pkg/resourcecalculator/calculator.go @@ -36,69 +36,69 @@ var Workspaces = []Workspace{Prod, Staging, Test, Dev} type Workspace int type Calculator struct { - ClusterName string - allocatableCpu *ResourceCalculator - allocatableMemory *ResourceCalculator - availableCpu *ResourceCalculator - availableMemory *ResourceCalculator + ClusterName string + allocatableCPU *ResourceCalculator + allocatableMem *ResourceCalculator + availableCPU *ResourceCalculator + availableMem *ResourceCalculator } func New(clusterName string) *Calculator { return &Calculator{ ClusterName: clusterName, - allocatableCpu: &ResourceCalculator{ - Type: "CPU", - Container: make(map[string]uint64), - tackUpMmap: make(map[Workspace]uint64), + allocatableCPU: &ResourceCalculator{ + Type: "CPU", + WorkspacesValues: make(map[string]uint64), + tackUpM: make(map[Workspace]uint64), }, - availableCpu: &ResourceCalculator{ - Type: "CPU", - Container: make(map[string]uint64), - tackUpMmap: make(map[Workspace]uint64), + availableCPU: &ResourceCalculator{ + Type: "CPU", + WorkspacesValues: make(map[string]uint64), + tackUpM: make(map[Workspace]uint64), }, - allocatableMemory: &ResourceCalculator{ - Type: "Memory", - Container: make(map[string]uint64), - tackUpMmap: make(map[Workspace]uint64), + allocatableMem: &ResourceCalculator{ + Type: "Memory", + WorkspacesValues: make(map[string]uint64), + tackUpM: make(map[Workspace]uint64), }, - availableMemory: &ResourceCalculator{ - Type: "Memory", - Container: make(map[string]uint64), - tackUpMmap: make(map[Workspace]uint64), + availableMem: &ResourceCalculator{ + Type: "Memory", + WorkspacesValues: make(map[string]uint64), + tackUpM: make(map[Workspace]uint64), }, } } func (c *Calculator) AddValue(cpu, mem uint64, workspace ...Workspace) { - c.allocatableCpu.addValue(cpu, workspace...) - c.availableCpu.addValue(cpu, workspace...) - c.allocatableMemory.addValue(mem, workspace...) - c.availableMemory.addValue(mem, workspace...) + c.allocatableCPU.addValue(cpu, workspace...) + c.availableCPU.addValue(cpu, workspace...) + c.allocatableMem.addValue(mem, workspace...) + c.availableMem.addValue(mem, workspace...) } func (c *Calculator) DeductionQuota(workspace Workspace, cpu, mem uint64) { - c.availableCpu.deductionQuota(workspace, cpu) - c.availableMemory.deductionQuota(workspace, mem) + c.availableCPU.deductionQuota(workspace, cpu) + c.availableMem.deductionQuota(workspace, mem) } func (c *Calculator) AllocatableCPU(workspace Workspace) uint64 { - return c.allocatableCpu.totalForWorkspace(workspace) + return c.allocatableCPU.totalForWorkspace(workspace) } func (c *Calculator) AllocatableMem(workspace Workspace) uint64 { - return c.allocatableMemory.totalForWorkspace(workspace) + return c.allocatableMem.totalForWorkspace(workspace) } func (c *Calculator) AlreadyTookUpCPU(workspace Workspace) uint64 { - return c.availableCpu.alreadyTookUp(workspace) + return c.availableCPU.alreadyTookUp(workspace) } func (c *Calculator) AlreadyTookUpMem(workspace Workspace) uint64 { - return c.availableMemory.alreadyTookUp(workspace) + return c.availableMem.alreadyTookUp(workspace) } func (c *Calculator) TotalQuotableCPU() uint64 { - quotable := int(c.allocatableCpu.total) - int(c.availableCpu.deduction) + quotable := int(c.allocatableCPU.total) - int(c.availableCPU.deduction) if quotable < 0 { quotable = 0 } @@ -106,7 +106,7 @@ func (c *Calculator) TotalQuotableCPU() uint64 { } func (c *Calculator) TotalQuotableMem() uint64 { - quotable := int(c.allocatableMemory.total) - int(c.availableMemory.deduction) + quotable := int(c.allocatableMem.total) - int(c.availableMem.deduction) if quotable < 0 { quotable = 0 } @@ -114,19 +114,19 @@ func (c *Calculator) TotalQuotableMem() uint64 { } func (c *Calculator) QuotableCPUForWorkspace(workspace Workspace) uint64 { - return c.availableCpu.totalForWorkspace(workspace) + return c.availableCPU.totalForWorkspace(workspace) } func (c *Calculator) QuotableMemForWorkspace(workspace Workspace) uint64 { - return c.availableMemory.totalForWorkspace(workspace) + return c.availableMem.totalForWorkspace(workspace) } type ResourceCalculator struct { - Type string - Container map[string]uint64 - tackUpMmap map[Workspace]uint64 - deduction uint64 - total uint64 + Type string + WorkspacesValues map[string]uint64 + tackUpM map[Workspace]uint64 + deduction uint64 + total uint64 } func (q *ResourceCalculator) addValue(value uint64, workspace ...Workspace) { @@ -136,7 +136,7 @@ func (q *ResourceCalculator) addValue(value uint64, workspace ...Workspace) { return } w := strings.Join(workspaces, ":") - q.Container[w] += value + q.WorkspacesValues[w] += value } func (q *ResourceCalculator) totalForWorkspace(workspace Workspace) uint64 { @@ -147,7 +147,7 @@ func (q *ResourceCalculator) totalForWorkspace(workspace Workspace) uint64 { if w == "" { return 0 } - for k, v := range q.Container { + for k, v := range q.WorkspacesValues { if strings.Contains(k, w) { sum += v } @@ -160,14 +160,14 @@ func (q *ResourceCalculator) deductionQuota(workspace Workspace, quota uint64) { // 按优先级减扣 p := priority(workspace) for _, workspaces := range p { - if q.Container[workspaces] >= quota { - q.Container[workspaces] -= quota + if q.WorkspacesValues[workspaces] >= quota { + q.WorkspacesValues[workspaces] -= quota q.takeUp(workspaces, quota) return } - quota -= q.Container[workspaces] - q.takeUp(workspaces, q.Container[workspaces]) - q.Container[workspaces] = 0 + quota -= q.WorkspacesValues[workspaces] + q.takeUp(workspaces, q.WorkspacesValues[workspaces]) + q.WorkspacesValues[workspaces] = 0 } q.takeUp(WorkspaceString(workspace), quota) @@ -175,21 +175,21 @@ func (q *ResourceCalculator) deductionQuota(workspace Workspace, quota uint64) { func (q *ResourceCalculator) takeUp(workspaces string, value uint64) { if strings.Contains(workspaces, "prod") { - q.tackUpMmap[Prod] += value + q.tackUpM[Prod] += value } if strings.Contains(workspaces, "staging") { - q.tackUpMmap[Staging] += value + q.tackUpM[Staging] += value } if strings.Contains(workspaces, "test") { - q.tackUpMmap[Test] += value + q.tackUpM[Test] += value } if strings.Contains(workspaces, "dev") { - q.tackUpMmap[Dev] += value + q.tackUpM[Dev] += value } } func (q *ResourceCalculator) alreadyTookUp(workspace Workspace) uint64 { - return q.tackUpMmap[workspace] + return q.tackUpM[workspace] } func WorkspaceString(workspace Workspace) string { @@ -292,9 +292,9 @@ func ResourceToString(res float64, typ string) string { } } -func Accuracy(v float64, accuracy int32) float64 { - v, _ = decimal.NewFromFloat(v).Round(accuracy).Float64() - return v +func Accuracy(value float64, accuracy int32) float64 { + value, _ = decimal.NewFromFloat(value).Round(accuracy).Float64() + return value } func setPrec(f float64, prec int) float64 { From e1ee23b23acc0db3419473542f7a370675c3c9eb Mon Sep 17 00:00:00 2001 From: CMC <49681321+CraigMChen@users.noreply.github.com> Date: Fri, 29 Oct 2021 09:52:49 +0800 Subject: [PATCH 6/8] fix: delta memory calculated wrongly (#2669) --- .../executor/plugins/k8s/check_quota.go | 80 +++++++++---------- .../executor/plugins/k8s/daemonset.go | 8 +- .../executor/plugins/k8s/deployment.go | 35 ++++---- .../executor/plugins/k8s/statefulset.go | 4 +- 4 files changed, 66 insertions(+), 61 deletions(-) diff --git a/modules/scheduler/executor/plugins/k8s/check_quota.go b/modules/scheduler/executor/plugins/k8s/check_quota.go index 18093d4aef9..12ee8c756de 100644 --- a/modules/scheduler/executor/plugins/k8s/check_quota.go +++ b/modules/scheduler/executor/plugins/k8s/check_quota.go @@ -36,7 +36,7 @@ func (k *Kubernetes) GetWorkspaceLeftQuota(ctx context.Context, projectID, works Workspace: workspace, }) if err != nil { - return 0, 0, err + return } logrus.Infof("get workspace %s of project %s quota: cpu: %d. mem: %d", workspace, projectID, cpuQuota, memQuota) @@ -83,21 +83,21 @@ func max(a, b int64) int64 { return b } -func (k *Kubernetes) CheckQuota(ctx context.Context, projectID, workspace, runtimeID string, requestsCPU, requestsMem int64, kind, serviceName string) (bool, error) { +func (k *Kubernetes) CheckQuota(ctx context.Context, projectID, workspace, runtimeID string, requestsCPU, requestsMem int64, kind, serviceName string) (bool, string, error) { if projectID == "" || workspace == "" { - return true, nil + return true, "", nil } if requestsCPU <= 0 && requestsMem <= 0 { - return true, nil + return true, "", nil } leftCPU, leftMem, err := k.GetWorkspaceLeftQuota(ctx, projectID, workspace) if err != nil { - return false, err + return false, "", err } if requestsCPU > leftCPU || requestsMem > leftMem { + humanLog, primevalLog := getLogContent(requestsCPU, requestsMem, leftCPU, leftMem, kind, serviceName) if runtimeID != "" { - humanLog, primevalLog := getLogContent(requestsCPU, requestsMem, leftCPU, leftMem, kind, serviceName) if err = k.bdl.CreateErrorLog(&apistructs.ErrorLogCreateRequest{ ErrorLog: apistructs.ErrorLog{ ResourceType: apistructs.RuntimeError, @@ -113,48 +113,48 @@ func (k *Kubernetes) CheckQuota(ctx context.Context, projectID, workspace, runti logrus.Infof("Create/Update quota error log for runtime %s succeeded", runtimeID) } } - return false, nil + return false, primevalLog, nil } - return true, nil + return true, "", nil } -func getLogContent(requestsCPU, requestsMem, leftCPU, leftMem int64, kind, serviceName string) (string, string) { +func getLogContent(deltaCPU, deltaMem, leftCPU, leftMem int64, kind, serviceName string) (string, string) { leftCPU = max(leftCPU, 0) leftMem = max(leftMem, 0) - reqCPUStr := resourceToString(float64(requestsCPU), "cpu") + reqCPUStr := resourceToString(float64(deltaCPU), "cpu") leftCPUStr := resourceToString(float64(leftCPU), "cpu") - reqMemStr := resourceToString(float64(requestsMem), "memory") + reqMemStr := resourceToString(float64(deltaMem), "memory") leftMemStr := resourceToString(float64(leftMem), "memory") logrus.Infof("Checking workspace quota, requests cpu:%s cores, left %s cores; requests memory: %s, left %s", reqCPUStr, leftCPUStr, reqMemStr, leftMemStr) - humanLog := []string{"当前环境资源配额不足"} - primevalLog := []string{"Resource quota is not enough in current workspace"} + humanLogs := []string{"当前环境资源配额不足"} + primevalLogs := []string{"Resource quota is not enough in current workspace"} switch kind { case "stateless": - humanLog = append(humanLog, fmt.Sprintf("服务 %s 部署失败", serviceName)) - primevalLog = append(primevalLog, fmt.Sprintf("failed to deploy service %s", serviceName)) + humanLogs = append(humanLogs, fmt.Sprintf("服务 %s 部署失败", serviceName)) + primevalLogs = append(primevalLogs, fmt.Sprintf("failed to deploy service %s", serviceName)) case "stateful": - humanLog = append(humanLog, fmt.Sprintf("addon %s 部署失败", serviceName)) - primevalLog = append(primevalLog, fmt.Sprintf("failed to deploy addon %s.", serviceName)) + humanLogs = append(humanLogs, fmt.Sprintf("addon %s 部署失败", serviceName)) + primevalLogs = append(primevalLogs, fmt.Sprintf("failed to deploy addon %s.", serviceName)) case "update": - humanLog = append(humanLog, fmt.Sprintf("服务 %s 更新失败", serviceName)) - primevalLog = append(primevalLog, fmt.Sprintf("failed to update service %s", serviceName)) + humanLogs = append(humanLogs, fmt.Sprintf("服务 %s 更新失败", serviceName)) + primevalLogs = append(primevalLogs, fmt.Sprintf("failed to update service %s", serviceName)) case "scale": - humanLog = append(humanLog, fmt.Sprintf("服务 %s 扩容失败", serviceName)) - primevalLog = append(primevalLog, fmt.Sprintf("failed to scale service %s", serviceName)) + humanLogs = append(humanLogs, fmt.Sprintf("服务 %s 扩容失败", serviceName)) + primevalLogs = append(primevalLogs, fmt.Sprintf("failed to scale service %s", serviceName)) } - if requestsCPU > leftCPU { - humanLog = append(humanLog, fmt.Sprintf("请求 CPU 新增 %s 核,大于当前剩余 CPU %s 核", reqCPUStr, leftCPUStr)) - primevalLog = append(primevalLog, fmt.Sprintf("Requests CPU added %s core(s), which is greater than the current remaining CPU %s core(s)", reqCPUStr, leftCPUStr)) + if deltaCPU > leftCPU { + humanLogs = append(humanLogs, fmt.Sprintf("请求 CPU 新增 %s 核,大于当前剩余 CPU %s 核", reqCPUStr, leftCPUStr)) + primevalLogs = append(primevalLogs, fmt.Sprintf("Requests CPU added %s core(s), which is greater than the current remaining CPU %s core(s)", reqCPUStr, leftCPUStr)) } - if requestsMem > leftMem { - humanLog = append(humanLog, fmt.Sprintf("请求内存新增 %s,大于当前环境剩余内存 %s", reqMemStr, leftMemStr)) - primevalLog = append(primevalLog, fmt.Sprintf("Requests memory added %s, which is greater than the current remaining %s", reqMemStr, leftMemStr)) + if deltaMem > leftMem { + humanLogs = append(humanLogs, fmt.Sprintf("请求内存新增 %s,大于当前环境剩余内存 %s", reqMemStr, leftMemStr)) + primevalLogs = append(primevalLogs, fmt.Sprintf("Requests memory added %s, which is greater than the current remaining %s", reqMemStr, leftMemStr)) } - return strings.Join(humanLog, ","), strings.Join(primevalLog, ". ") + return strings.Join(humanLogs, ","), strings.Join(primevalLogs, ". ") } func getRequestsResources(containers []corev1.Container) (cpu, mem int64) { @@ -170,30 +170,30 @@ func getRequestsResources(containers []corev1.Container) (cpu, mem int64) { return cpuQuantity.MilliValue(), memQuantity.Value() } -func resourceToString(resource float64, tp string) string { +func resourceToString(res float64, tp string) string { switch tp { case "cpu": - return strconv.FormatFloat(setPrec(resource/1000, 3), 'f', -1, 64) + return strconv.FormatFloat(setPrec(res/1000, 3), 'f', -1, 64) case "memory": isNegative := 1.0 - if resource < 0 { - resource = -resource + if res < 0 { + res = -res isNegative = -1 } units := []string{"B", "K", "M", "G", "T"} i := 0 - for resource >= 1<<10 && i < len(units)-1 { - resource /= 1 << 10 + for res >= 1<<10 && i < len(units)-1 { + res /= 1 << 10 i++ } - return fmt.Sprintf("%s%s", strconv.FormatFloat(setPrec(resource*isNegative, 3), 'f', -1, 64), units[i]) + return fmt.Sprintf("%s%s", strconv.FormatFloat(setPrec(res*isNegative, 3), 'f', -1, 64), units[i]) default: - return fmt.Sprintf("%.f", resource) + return fmt.Sprintf("%.f", res) } } -func setPrec(f float64, prec int) float64 { - pow := math.Pow10(prec) - f = float64(int64(f*pow)) / pow - return f +func setPrec(v float64, p int) float64 { + pow := math.Pow10(p) + v = float64(int64(v*pow)) / pow + return v } diff --git a/modules/scheduler/executor/plugins/k8s/daemonset.go b/modules/scheduler/executor/plugins/k8s/daemonset.go index e1dfe01e6ce..87b4f33c373 100644 --- a/modules/scheduler/executor/plugins/k8s/daemonset.go +++ b/modules/scheduler/executor/plugins/k8s/daemonset.go @@ -40,12 +40,12 @@ func (k *Kubernetes) createDaemonSet(ctx context.Context, service *apistructs.Se _, projectID, workspace, runtimeID := extractContainerEnvs(daemonset.Spec.Template.Spec.Containers) cpu, mem := getRequestsResources(daemonset.Spec.Template.Spec.Containers) - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, cpu, mem, "stateless", service.Name) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, cpu, mem, "stateless", service.Name) if err != nil { return err } if !ok { - return errors.New("workspace quota is not enough") + return errors.New(reason) } return k.ds.Create(daemonset) @@ -83,12 +83,12 @@ func (k *Kubernetes) updateDaemonSet(ctx context.Context, ds *appsv1.DaemonSet, if err != nil { logrus.Errorf("faield to get delta resource for daemonSet %s, %v", ds.Name, err) } else { - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, deltaCPU, deltaMem, "update", service.Name) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, deltaCPU, deltaMem, "update", service.Name) if err != nil { return err } if !ok { - return errors.New("workspace quota is not enough") + return errors.New(reason) } } return k.ds.Update(ds) diff --git a/modules/scheduler/executor/plugins/k8s/deployment.go b/modules/scheduler/executor/plugins/k8s/deployment.go index afe5f63ebcd..e458995039d 100644 --- a/modules/scheduler/executor/plugins/k8s/deployment.go +++ b/modules/scheduler/executor/plugins/k8s/deployment.go @@ -59,12 +59,12 @@ func (k *Kubernetes) createDeployment(ctx context.Context, service *apistructs.S cpu *= int64(*deployment.Spec.Replicas) mem *= int64(*deployment.Spec.Replicas) } - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, cpu, mem, "stateless", service.Name) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, cpu, mem, "stateless", service.Name) if err != nil { return err } if !ok { - return errors.New("workspace quota is not enough") + return errors.New(reason) } err = k.deploy.Create(deployment) @@ -140,12 +140,12 @@ func (k *Kubernetes) putDeployment(ctx context.Context, deployment *appsv1.Deplo if err != nil { logrus.Errorf("faield to get delta resource for deployment %s, %v", deployment.Name, err) } else { - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, deltaCPU, deltaMem, "update", service.Name) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, deltaCPU, deltaMem, "update", service.Name) if err != nil { return err } if !ok { - return errors.New("workspace quota is not enough") + return errors.New(reason) } } @@ -176,7 +176,7 @@ func (k *Kubernetes) getDeploymentDeltaResource(ctx context.Context, deploy *app newCPU, newMem := getRequestsResources(deploy.Spec.Template.Spec.Containers) if deploy.Spec.Replicas != nil { newCPU *= int64(*deploy.Spec.Replicas) - oldMem *= int64(*deploy.Spec.Replicas) + newMem *= int64(*deploy.Spec.Replicas) } deltaCPU = newCPU - oldCPU @@ -888,6 +888,12 @@ func (k *Kubernetes) scaleDeployment(ctx context.Context, sg *apistructs.Service return getErr } + oldCPU, oldMem := getRequestsResources(deploy.Spec.Template.Spec.Containers) + if deploy.Spec.Replicas != nil { + oldCPU *= int64(*deploy.Spec.Replicas) + oldMem *= int64(*deploy.Spec.Replicas) + } + deploy.Spec.Replicas = func(i int32) *int32 { return &i }(int32(scalingService.Scale)) // only support one container on Erda currently @@ -901,18 +907,17 @@ func (k *Kubernetes) scaleDeployment(ctx context.Context, sg *apistructs.Service deploy.Spec.Template.Spec.Containers[0] = container + newCPU, newMem := getRequestsResources(deploy.Spec.Template.Spec.Containers) + newCPU *= int64(*deploy.Spec.Replicas) + newMem *= int64(*deploy.Spec.Replicas) + _, projectID, workspace, runtimeID := extractContainerEnvs(deploy.Spec.Template.Spec.Containers) - deltaCPU, deltaMem, err := k.getDeploymentDeltaResource(ctx, deploy) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, newCPU-oldCPU, newMem-oldMem, "scale", scalingService.Name) if err != nil { - logrus.Errorf("failed to get delta resource for deployment %s, %v", deploy.Name, err) - } else { - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, deltaCPU, deltaMem, "scale", scalingService.Name) - if err != nil { - return err - } - if !ok { - return errors.New("workspace quota is not enough") - } + return err + } + if !ok { + return errors.New(reason) } err = k.deploy.Put(deploy) diff --git a/modules/scheduler/executor/plugins/k8s/statefulset.go b/modules/scheduler/executor/plugins/k8s/statefulset.go index 55117f115c4..5fe5ae32d10 100644 --- a/modules/scheduler/executor/plugins/k8s/statefulset.go +++ b/modules/scheduler/executor/plugins/k8s/statefulset.go @@ -171,12 +171,12 @@ func (k *Kubernetes) createStatefulSet(ctx context.Context, info StatefulsetInfo reqCPU *= int64(*set.Spec.Replicas) reqMem *= int64(*set.Spec.Replicas) } - ok, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, reqCPU, reqMem, "stateful", service.Name) + ok, reason, err := k.CheckQuota(ctx, projectID, workspace, runtimeID, reqCPU, reqMem, "stateful", service.Name) if err != nil { return err } if !ok { - return errors.New("workspace quota is not enough") + return errors.New(reason) } return k.sts.Create(set) } From 1f524ab8102bebf00becb1a091c74ba204948bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E5=88=9A=E5=B9=B3?= <512979011@qq.com> Date: Fri, 29 Oct 2021 10:09:17 +0800 Subject: [PATCH 7/8] feature: add msp event query grpc api (#2526) * feature: add msp event query grpc api * optimize: add copyright * optimize: goimports * bugfix: relations.res_id * optimize: change proto file name. * optimize: rename unit test func * optimize: make es reader dependency optional --- api/proto-go/all.go | 1 + .../core/monitor/event/client/client.go | 42 ++ .../core/monitor/event/client/provider.go | 93 +++++ .../monitor/event/pb/event_query.form.pb.go | 86 ++++ .../monitor/event/pb/event_query.http.pb.go | 85 ++++ .../monitor/event/pb/event_query.json.pb.go | 75 ++++ .../core/monitor/event/pb/event_query.pb.go | 373 ++++++++++++++++++ .../event/pb/event_query.validator.pb.go | 41 ++ .../monitor/event/pb/event_query_grpc.pb.go | 113 ++++++ .../monitor/event/pb/register.services.pb.go | 53 +++ .../core/monitor/event/event_query.proto | 33 ++ cmd/monitor/monitor/main.go | 1 + conf/monitor/monitor/monitor.yaml | 1 + .../event/query/event.query.service.go | 88 +++++ .../event/query/event.query.service_test.go | 131 ++++++ .../core/monitor/event/query/mock_storage.go | 69 ++++ modules/core/monitor/event/query/provider.go | 69 ++++ 17 files changed, 1354 insertions(+) create mode 100644 api/proto-go/core/monitor/event/client/client.go create mode 100644 api/proto-go/core/monitor/event/client/provider.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query.form.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query.http.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query.json.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query.validator.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/event_query_grpc.pb.go create mode 100644 api/proto-go/core/monitor/event/pb/register.services.pb.go create mode 100644 api/proto/core/monitor/event/event_query.proto create mode 100644 modules/core/monitor/event/query/event.query.service.go create mode 100644 modules/core/monitor/event/query/event.query.service_test.go create mode 100644 modules/core/monitor/event/query/mock_storage.go create mode 100644 modules/core/monitor/event/query/provider.go diff --git a/api/proto-go/all.go b/api/proto-go/all.go index 522ca84c655..e2bc52f98e5 100644 --- a/api/proto-go/all.go +++ b/api/proto-go/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/erda-project/erda-proto-go/core/monitor/alertdetail/pb" _ "github.com/erda-project/erda-proto-go/core/monitor/collector/pb" _ "github.com/erda-project/erda-proto-go/core/monitor/dataview/pb" + _ "github.com/erda-project/erda-proto-go/core/monitor/event/pb" _ "github.com/erda-project/erda-proto-go/core/monitor/log/query/pb" _ "github.com/erda-project/erda-proto-go/core/monitor/metric/pb" _ "github.com/erda-project/erda-proto-go/core/monitor/settings/pb" diff --git a/api/proto-go/core/monitor/event/client/client.go b/api/proto-go/core/monitor/event/client/client.go new file mode 100644 index 00000000000..a7f2e5d539d --- /dev/null +++ b/api/proto-go/core/monitor/event/client/client.go @@ -0,0 +1,42 @@ +// Code generated by protoc-gen-go-client. DO NOT EDIT. +// Sources: event_query.proto + +package client + +import ( + context "context" + + grpc "github.com/erda-project/erda-infra/pkg/transport/grpc" + pb "github.com/erda-project/erda-proto-go/core/monitor/event/pb" + grpc1 "google.golang.org/grpc" +) + +// Client provide all service clients. +type Client interface { + // EventQueryService event_query.proto + EventQueryService() pb.EventQueryServiceClient +} + +// New create client +func New(cc grpc.ClientConnInterface) Client { + return &serviceClients{ + eventQueryService: pb.NewEventQueryServiceClient(cc), + } +} + +type serviceClients struct { + eventQueryService pb.EventQueryServiceClient +} + +func (c *serviceClients) EventQueryService() pb.EventQueryServiceClient { + return c.eventQueryService +} + +type eventQueryServiceWrapper struct { + client pb.EventQueryServiceClient + opts []grpc1.CallOption +} + +func (s *eventQueryServiceWrapper) GetEvents(ctx context.Context, req *pb.GetEventsRequest) (*pb.GetEventsResponse, error) { + return s.client.GetEvents(ctx, req, append(grpc.CallOptionFromContext(ctx), s.opts...)...) +} diff --git a/api/proto-go/core/monitor/event/client/provider.go b/api/proto-go/core/monitor/event/client/provider.go new file mode 100644 index 00000000000..ea3ea1b9a89 --- /dev/null +++ b/api/proto-go/core/monitor/event/client/provider.go @@ -0,0 +1,93 @@ +// Code generated by protoc-gen-go-client. DO NOT EDIT. +// Sources: event_query.proto + +package client + +import ( + fmt "fmt" + reflect "reflect" + strings "strings" + + servicehub "github.com/erda-project/erda-infra/base/servicehub" + grpc "github.com/erda-project/erda-infra/pkg/transport/grpc" + pb "github.com/erda-project/erda-proto-go/core/monitor/event/pb" + grpc1 "google.golang.org/grpc" +) + +var dependencies = []string{ + "grpc-client@erda.core.monitor.event", + "grpc-client", +} + +// +provider +type provider struct { + client Client +} + +func (p *provider) Init(ctx servicehub.Context) error { + var conn grpc.ClientConnInterface + for _, dep := range dependencies { + c, ok := ctx.Service(dep).(grpc.ClientConnInterface) + if ok { + conn = c + break + } + } + if conn == nil { + return fmt.Errorf("not found connector in (%s)", strings.Join(dependencies, ", ")) + } + p.client = New(conn) + return nil +} + +var ( + clientsType = reflect.TypeOf((*Client)(nil)).Elem() + eventQueryServiceClientType = reflect.TypeOf((*pb.EventQueryServiceClient)(nil)).Elem() + eventQueryServiceServerType = reflect.TypeOf((*pb.EventQueryServiceServer)(nil)).Elem() +) + +func (p *provider) Provide(ctx servicehub.DependencyContext, args ...interface{}) interface{} { + var opts []grpc1.CallOption + for _, arg := range args { + if opt, ok := arg.(grpc1.CallOption); ok { + opts = append(opts, opt) + } + } + switch ctx.Service() { + case "erda.core.monitor.event-client": + return p.client + case "erda.core.monitor.event.EventQueryService": + return &eventQueryServiceWrapper{client: p.client.EventQueryService(), opts: opts} + case "erda.core.monitor.event.EventQueryService.client": + return p.client.EventQueryService() + } + switch ctx.Type() { + case clientsType: + return p.client + case eventQueryServiceClientType: + return p.client.EventQueryService() + case eventQueryServiceServerType: + return &eventQueryServiceWrapper{client: p.client.EventQueryService(), opts: opts} + } + return p +} + +func init() { + servicehub.Register("erda.core.monitor.event-client", &servicehub.Spec{ + Services: []string{ + "erda.core.monitor.event.EventQueryService", + "erda.core.monitor.event-client", + }, + Types: []reflect.Type{ + clientsType, + // client types + eventQueryServiceClientType, + // server types + eventQueryServiceServerType, + }, + OptionalDependencies: dependencies, + Creator: func() servicehub.Provider { + return &provider{} + }, + }) +} diff --git a/api/proto-go/core/monitor/event/pb/event_query.form.pb.go b/api/proto-go/core/monitor/event/pb/event_query.form.pb.go new file mode 100644 index 00000000000..b20490b8dd7 --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query.form.pb.go @@ -0,0 +1,86 @@ +// Code generated by protoc-gen-go-form. DO NOT EDIT. +// Source: event_query.proto + +package pb + +import ( + url "net/url" + strconv "strconv" + + urlenc "github.com/erda-project/erda-infra/pkg/urlenc" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the "github.com/erda-project/erda-infra/pkg/urlenc" package it is being compiled against. +var _ urlenc.URLValuesUnmarshaler = (*GetEventsRequest)(nil) +var _ urlenc.URLValuesUnmarshaler = (*GetEventsResponse)(nil) +var _ urlenc.URLValuesUnmarshaler = (*GetEventsResult)(nil) + +// GetEventsRequest implement urlenc.URLValuesUnmarshaler. +func (m *GetEventsRequest) UnmarshalURLValues(prefix string, values url.Values) error { + for key, vals := range values { + if len(vals) > 0 { + switch prefix + key { + case "eventId": + m.EventId = vals[0] + case "traceId": + m.TraceId = vals[0] + case "relationId": + m.RelationId = vals[0] + case "relationType": + m.RelationType = vals[0] + case "start": + val, err := strconv.ParseInt(vals[0], 10, 64) + if err != nil { + return err + } + m.Start = val + case "end": + val, err := strconv.ParseInt(vals[0], 10, 64) + if err != nil { + return err + } + m.End = val + case "pageNo": + val, err := strconv.ParseInt(vals[0], 10, 64) + if err != nil { + return err + } + m.PageNo = val + case "pageSize": + val, err := strconv.ParseInt(vals[0], 10, 64) + if err != nil { + return err + } + m.PageSize = val + case "debug": + val, err := strconv.ParseBool(vals[0]) + if err != nil { + return err + } + m.Debug = val + } + } + } + return nil +} + +// GetEventsResponse implement urlenc.URLValuesUnmarshaler. +func (m *GetEventsResponse) UnmarshalURLValues(prefix string, values url.Values) error { + for key, vals := range values { + if len(vals) > 0 { + switch prefix + key { + case "data": + if m.Data == nil { + m.Data = &GetEventsResult{} + } + } + } + } + return nil +} + +// GetEventsResult implement urlenc.URLValuesUnmarshaler. +func (m *GetEventsResult) UnmarshalURLValues(prefix string, values url.Values) error { + return nil +} diff --git a/api/proto-go/core/monitor/event/pb/event_query.http.pb.go b/api/proto-go/core/monitor/event/pb/event_query.http.pb.go new file mode 100644 index 00000000000..7d3c01caaf0 --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query.http.pb.go @@ -0,0 +1,85 @@ +// Code generated by protoc-gen-go-http. DO NOT EDIT. +// Source: event_query.proto + +package pb + +import ( + context "context" + http1 "net/http" + + transport "github.com/erda-project/erda-infra/pkg/transport" + http "github.com/erda-project/erda-infra/pkg/transport/http" + urlenc "github.com/erda-project/erda-infra/pkg/urlenc" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the "github.com/erda-project/erda-infra/pkg/transport/http" package it is being compiled against. +const _ = http.SupportPackageIsVersion1 + +// EventQueryServiceHandler is the server API for EventQueryService service. +type EventQueryServiceHandler interface { + // GET /api/events + GetEvents(context.Context, *GetEventsRequest) (*GetEventsResponse, error) +} + +// RegisterEventQueryServiceHandler register EventQueryServiceHandler to http.Router. +func RegisterEventQueryServiceHandler(r http.Router, srv EventQueryServiceHandler, opts ...http.HandleOption) { + h := http.DefaultHandleOptions() + for _, op := range opts { + op(h) + } + encodeFunc := func(fn func(http1.ResponseWriter, *http1.Request) (interface{}, error)) http.HandlerFunc { + handler := func(w http1.ResponseWriter, r *http1.Request) { + out, err := fn(w, r) + if err != nil { + h.Error(w, r, err) + return + } + if err := h.Encode(w, r, out); err != nil { + h.Error(w, r, err) + } + } + if h.HTTPInterceptor != nil { + handler = h.HTTPInterceptor(handler) + } + return handler + } + + add_GetEvents := func(method, path string, fn func(context.Context, *GetEventsRequest) (*GetEventsResponse, error)) { + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return fn(ctx, req.(*GetEventsRequest)) + } + var GetEvents_info transport.ServiceInfo + if h.Interceptor != nil { + GetEvents_info = transport.NewServiceInfo("erda.core.monitor.event.EventQueryService", "GetEvents", srv) + handler = h.Interceptor(handler) + } + r.Add(method, path, encodeFunc( + func(w http1.ResponseWriter, r *http1.Request) (interface{}, error) { + ctx := http.WithRequest(r.Context(), r) + ctx = transport.WithHTTPHeaderForServer(ctx, r.Header) + if h.Interceptor != nil { + ctx = context.WithValue(ctx, transport.ServiceInfoContextKey, GetEvents_info) + } + r = r.WithContext(ctx) + var in GetEventsRequest + if err := h.Decode(r, &in); err != nil { + return nil, err + } + var input interface{} = &in + if u, ok := (input).(urlenc.URLValuesUnmarshaler); ok { + if err := u.UnmarshalURLValues("", r.URL.Query()); err != nil { + return nil, err + } + } + out, err := handler(ctx, &in) + if err != nil { + return out, err + } + return out, nil + }), + ) + } + + add_GetEvents("GET", "/api/events", srv.GetEvents) +} diff --git a/api/proto-go/core/monitor/event/pb/event_query.json.pb.go b/api/proto-go/core/monitor/event/pb/event_query.json.pb.go new file mode 100644 index 00000000000..3652daa93ca --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query.json.pb.go @@ -0,0 +1,75 @@ +// Code generated by protoc-gen-go-json. DO NOT EDIT. +// Source: event_query.proto + +package pb + +import ( + bytes "bytes" + json "encoding/json" + + jsonpb "github.com/erda-project/erda-infra/pkg/transport/http/encoding/jsonpb" + protojson "google.golang.org/protobuf/encoding/protojson" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the "encoding/json" package it is being compiled against. +var _ json.Marshaler = (*GetEventsRequest)(nil) +var _ json.Unmarshaler = (*GetEventsRequest)(nil) +var _ json.Marshaler = (*GetEventsResponse)(nil) +var _ json.Unmarshaler = (*GetEventsResponse)(nil) +var _ json.Marshaler = (*GetEventsResult)(nil) +var _ json.Unmarshaler = (*GetEventsResult)(nil) + +// GetEventsRequest implement json.Marshaler. +func (m *GetEventsRequest) MarshalJSON() ([]byte, error) { + buf := &bytes.Buffer{} + err := (&jsonpb.Marshaler{ + OrigName: false, + EnumsAsInts: false, + EmitDefaults: true, + }).Marshal(buf, m) + return buf.Bytes(), err +} + +// GetEventsRequest implement json.Marshaler. +func (m *GetEventsRequest) UnmarshalJSON(b []byte) error { + return (&protojson.UnmarshalOptions{ + DiscardUnknown: true, + }).Unmarshal(b, m) +} + +// GetEventsResponse implement json.Marshaler. +func (m *GetEventsResponse) MarshalJSON() ([]byte, error) { + buf := &bytes.Buffer{} + err := (&jsonpb.Marshaler{ + OrigName: false, + EnumsAsInts: false, + EmitDefaults: true, + }).Marshal(buf, m) + return buf.Bytes(), err +} + +// GetEventsResponse implement json.Marshaler. +func (m *GetEventsResponse) UnmarshalJSON(b []byte) error { + return (&protojson.UnmarshalOptions{ + DiscardUnknown: true, + }).Unmarshal(b, m) +} + +// GetEventsResult implement json.Marshaler. +func (m *GetEventsResult) MarshalJSON() ([]byte, error) { + buf := &bytes.Buffer{} + err := (&jsonpb.Marshaler{ + OrigName: false, + EnumsAsInts: false, + EmitDefaults: true, + }).Marshal(buf, m) + return buf.Bytes(), err +} + +// GetEventsResult implement json.Marshaler. +func (m *GetEventsResult) UnmarshalJSON(b []byte) error { + return (&protojson.UnmarshalOptions{ + DiscardUnknown: true, + }).Unmarshal(b, m) +} diff --git a/api/proto-go/core/monitor/event/pb/event_query.pb.go b/api/proto-go/core/monitor/event/pb/event_query.pb.go new file mode 100644 index 00000000000..3a73c9b1fe1 --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query.pb.go @@ -0,0 +1,373 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.15.8 +// source: event_query.proto + +package pb + +import ( + reflect "reflect" + sync "sync" + + pb "github.com/erda-project/erda-proto-go/oap/event/pb" + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetEventsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + TraceId string `protobuf:"bytes,2,opt,name=traceId,proto3" json:"traceId,omitempty"` + RelationId string `protobuf:"bytes,3,opt,name=relationId,proto3" json:"relationId,omitempty"` + RelationType string `protobuf:"bytes,4,opt,name=relationType,proto3" json:"relationType,omitempty"` + Start int64 `protobuf:"varint,5,opt,name=start,proto3" json:"start,omitempty"` + End int64 `protobuf:"varint,6,opt,name=end,proto3" json:"end,omitempty"` + PageNo int64 `protobuf:"varint,7,opt,name=pageNo,proto3" json:"pageNo,omitempty"` + PageSize int64 `protobuf:"varint,8,opt,name=pageSize,proto3" json:"pageSize,omitempty"` + Debug bool `protobuf:"varint,9,opt,name=debug,proto3" json:"debug,omitempty"` +} + +func (x *GetEventsRequest) Reset() { + *x = GetEventsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_event_query_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetEventsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetEventsRequest) ProtoMessage() {} + +func (x *GetEventsRequest) ProtoReflect() protoreflect.Message { + mi := &file_event_query_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetEventsRequest.ProtoReflect.Descriptor instead. +func (*GetEventsRequest) Descriptor() ([]byte, []int) { + return file_event_query_proto_rawDescGZIP(), []int{0} +} + +func (x *GetEventsRequest) GetEventId() string { + if x != nil { + return x.EventId + } + return "" +} + +func (x *GetEventsRequest) GetTraceId() string { + if x != nil { + return x.TraceId + } + return "" +} + +func (x *GetEventsRequest) GetRelationId() string { + if x != nil { + return x.RelationId + } + return "" +} + +func (x *GetEventsRequest) GetRelationType() string { + if x != nil { + return x.RelationType + } + return "" +} + +func (x *GetEventsRequest) GetStart() int64 { + if x != nil { + return x.Start + } + return 0 +} + +func (x *GetEventsRequest) GetEnd() int64 { + if x != nil { + return x.End + } + return 0 +} + +func (x *GetEventsRequest) GetPageNo() int64 { + if x != nil { + return x.PageNo + } + return 0 +} + +func (x *GetEventsRequest) GetPageSize() int64 { + if x != nil { + return x.PageSize + } + return 0 +} + +func (x *GetEventsRequest) GetDebug() bool { + if x != nil { + return x.Debug + } + return false +} + +type GetEventsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data *GetEventsResult `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *GetEventsResponse) Reset() { + *x = GetEventsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_event_query_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetEventsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetEventsResponse) ProtoMessage() {} + +func (x *GetEventsResponse) ProtoReflect() protoreflect.Message { + mi := &file_event_query_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetEventsResponse.ProtoReflect.Descriptor instead. +func (*GetEventsResponse) Descriptor() ([]byte, []int) { + return file_event_query_proto_rawDescGZIP(), []int{1} +} + +func (x *GetEventsResponse) GetData() *GetEventsResult { + if x != nil { + return x.Data + } + return nil +} + +type GetEventsResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Items []*pb.Event `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"` +} + +func (x *GetEventsResult) Reset() { + *x = GetEventsResult{} + if protoimpl.UnsafeEnabled { + mi := &file_event_query_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetEventsResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetEventsResult) ProtoMessage() {} + +func (x *GetEventsResult) ProtoReflect() protoreflect.Message { + mi := &file_event_query_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetEventsResult.ProtoReflect.Descriptor instead. +func (*GetEventsResult) Descriptor() ([]byte, []int) { + return file_event_query_proto_rawDescGZIP(), []int{2} +} + +func (x *GetEventsResult) GetItems() []*pb.Event { + if x != nil { + return x.Items + } + return nil +} + +var File_event_query_proto protoreflect.FileDescriptor + +var file_event_query_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x17, 0x65, 0x72, 0x64, 0x61, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x6d, + 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x1c, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6f, 0x61, 0x70, 0x2f, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0xfc, 0x01, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, + 0x12, 0x18, 0x0a, 0x07, 0x74, 0x72, 0x61, 0x63, 0x65, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x74, 0x72, 0x61, 0x63, 0x65, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x72, 0x65, + 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, + 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x72, 0x65, + 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x67, 0x65, 0x4e, 0x6f, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x70, 0x61, 0x67, 0x65, 0x4e, 0x6f, 0x12, 0x1a, + 0x0a, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x08, 0x70, 0x61, 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, + 0x62, 0x75, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, + 0x22, 0x51, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x65, 0x72, 0x64, 0x61, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, + 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x47, 0x65, + 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x22, 0x3e, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x2b, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x65, 0x72, 0x64, 0x61, 0x2e, 0x6f, 0x61, 0x70, + 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x69, 0x74, + 0x65, 0x6d, 0x73, 0x32, 0x8c, 0x01, 0x0a, 0x11, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x77, 0x0a, 0x09, 0x47, 0x65, 0x74, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x29, 0x2e, 0x65, 0x72, 0x64, 0x61, 0x2e, 0x63, 0x6f, + 0x72, 0x65, 0x2e, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x2e, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x2a, 0x2e, 0x65, 0x72, 0x64, 0x61, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x6d, 0x6f, + 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x13, 0x82, + 0xd3, 0xe4, 0x93, 0x02, 0x0d, 0x12, 0x0b, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x65, 0x72, 0x64, 0x61, 0x2d, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x2f, 0x65, 0x72, + 0x64, 0x61, 0x2d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6f, 0x72, 0x65, + 0x2f, 0x6d, 0x6f, 0x6e, 0x69, 0x74, 0x6f, 0x72, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2f, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_event_query_proto_rawDescOnce sync.Once + file_event_query_proto_rawDescData = file_event_query_proto_rawDesc +) + +func file_event_query_proto_rawDescGZIP() []byte { + file_event_query_proto_rawDescOnce.Do(func() { + file_event_query_proto_rawDescData = protoimpl.X.CompressGZIP(file_event_query_proto_rawDescData) + }) + return file_event_query_proto_rawDescData +} + +var file_event_query_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_event_query_proto_goTypes = []interface{}{ + (*GetEventsRequest)(nil), // 0: erda.core.monitor.event.GetEventsRequest + (*GetEventsResponse)(nil), // 1: erda.core.monitor.event.GetEventsResponse + (*GetEventsResult)(nil), // 2: erda.core.monitor.event.GetEventsResult + (*pb.Event)(nil), // 3: erda.oap.event.Event +} +var file_event_query_proto_depIdxs = []int32{ + 2, // 0: erda.core.monitor.event.GetEventsResponse.data:type_name -> erda.core.monitor.event.GetEventsResult + 3, // 1: erda.core.monitor.event.GetEventsResult.items:type_name -> erda.oap.event.Event + 0, // 2: erda.core.monitor.event.EventQueryService.GetEvents:input_type -> erda.core.monitor.event.GetEventsRequest + 1, // 3: erda.core.monitor.event.EventQueryService.GetEvents:output_type -> erda.core.monitor.event.GetEventsResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_event_query_proto_init() } +func file_event_query_proto_init() { + if File_event_query_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_event_query_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetEventsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_event_query_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetEventsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_event_query_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetEventsResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_event_query_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_event_query_proto_goTypes, + DependencyIndexes: file_event_query_proto_depIdxs, + MessageInfos: file_event_query_proto_msgTypes, + }.Build() + File_event_query_proto = out.File + file_event_query_proto_rawDesc = nil + file_event_query_proto_goTypes = nil + file_event_query_proto_depIdxs = nil +} diff --git a/api/proto-go/core/monitor/event/pb/event_query.validator.pb.go b/api/proto-go/core/monitor/event/pb/event_query.validator.pb.go new file mode 100644 index 00000000000..7b52e4e36f8 --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query.validator.pb.go @@ -0,0 +1,41 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: event_query.proto + +package pb + +import ( + fmt "fmt" + math "math" + + _ "github.com/erda-project/erda-proto-go/oap/event/pb" + proto "github.com/golang/protobuf/proto" + github_com_mwitkow_go_proto_validators "github.com/mwitkow/go-proto-validators" + _ "google.golang.org/genproto/googleapis/api/annotations" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +func (this *GetEventsRequest) Validate() error { + return nil +} +func (this *GetEventsResponse) Validate() error { + if this.Data != nil { + if err := github_com_mwitkow_go_proto_validators.CallValidatorIfExists(this.Data); err != nil { + return github_com_mwitkow_go_proto_validators.FieldError("Data", err) + } + } + return nil +} +func (this *GetEventsResult) Validate() error { + for _, item := range this.Items { + if item != nil { + if err := github_com_mwitkow_go_proto_validators.CallValidatorIfExists(item); err != nil { + return github_com_mwitkow_go_proto_validators.FieldError("Items", err) + } + } + } + return nil +} diff --git a/api/proto-go/core/monitor/event/pb/event_query_grpc.pb.go b/api/proto-go/core/monitor/event/pb/event_query_grpc.pb.go new file mode 100644 index 00000000000..1b910f3a1eb --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/event_query_grpc.pb.go @@ -0,0 +1,113 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// Source: event_query.proto + +package pb + +import ( + context "context" + + transport "github.com/erda-project/erda-infra/pkg/transport" + grpc1 "github.com/erda-project/erda-infra/pkg/transport/grpc" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion5 + +// EventQueryServiceClient is the client API for EventQueryService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EventQueryServiceClient interface { + GetEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (*GetEventsResponse, error) +} + +type eventQueryServiceClient struct { + cc grpc1.ClientConnInterface +} + +func NewEventQueryServiceClient(cc grpc1.ClientConnInterface) EventQueryServiceClient { + return &eventQueryServiceClient{cc} +} + +func (c *eventQueryServiceClient) GetEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (*GetEventsResponse, error) { + out := new(GetEventsResponse) + err := c.cc.Invoke(ctx, "/erda.core.monitor.event.EventQueryService/GetEvents", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EventQueryServiceServer is the server API for EventQueryService service. +// All implementations should embed UnimplementedEventQueryServiceServer +// for forward compatibility +type EventQueryServiceServer interface { + GetEvents(context.Context, *GetEventsRequest) (*GetEventsResponse, error) +} + +// UnimplementedEventQueryServiceServer should be embedded to have forward compatible implementations. +type UnimplementedEventQueryServiceServer struct { +} + +func (*UnimplementedEventQueryServiceServer) GetEvents(context.Context, *GetEventsRequest) (*GetEventsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetEvents not implemented") +} + +func RegisterEventQueryServiceServer(s grpc1.ServiceRegistrar, srv EventQueryServiceServer, opts ...grpc1.HandleOption) { + s.RegisterService(_get_EventQueryService_serviceDesc(srv, opts...), srv) +} + +var _EventQueryService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "erda.core.monitor.event.EventQueryService", + HandlerType: (*EventQueryServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "event_query.proto", +} + +func _get_EventQueryService_serviceDesc(srv EventQueryServiceServer, opts ...grpc1.HandleOption) *grpc.ServiceDesc { + h := grpc1.DefaultHandleOptions() + for _, op := range opts { + op(h) + } + + _EventQueryService_GetEvents_Handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.GetEvents(ctx, req.(*GetEventsRequest)) + } + var _EventQueryService_GetEvents_info transport.ServiceInfo + if h.Interceptor != nil { + _EventQueryService_GetEvents_info = transport.NewServiceInfo("erda.core.monitor.event.EventQueryService", "GetEvents", srv) + _EventQueryService_GetEvents_Handler = h.Interceptor(_EventQueryService_GetEvents_Handler) + } + + var serviceDesc = _EventQueryService_serviceDesc + serviceDesc.Methods = []grpc.MethodDesc{ + { + MethodName: "GetEvents", + Handler: func(_ interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetEventsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil && h.Interceptor == nil { + return srv.(EventQueryServiceServer).GetEvents(ctx, in) + } + if h.Interceptor != nil { + ctx = context.WithValue(ctx, transport.ServiceInfoContextKey, _EventQueryService_GetEvents_info) + } + if interceptor == nil { + return _EventQueryService_GetEvents_Handler(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/erda.core.monitor.event.EventQueryService/GetEvents", + } + return interceptor(ctx, in, info, _EventQueryService_GetEvents_Handler) + }, + }, + } + return &serviceDesc +} diff --git a/api/proto-go/core/monitor/event/pb/register.services.pb.go b/api/proto-go/core/monitor/event/pb/register.services.pb.go new file mode 100644 index 00000000000..5e37ab5ead9 --- /dev/null +++ b/api/proto-go/core/monitor/event/pb/register.services.pb.go @@ -0,0 +1,53 @@ +// Code generated by protoc-gen-go-register. DO NOT EDIT. +// Sources: event_query.proto + +package pb + +import ( + reflect "reflect" + + transport "github.com/erda-project/erda-infra/pkg/transport" +) + +// RegisterEventQueryServiceImp event_query.proto +func RegisterEventQueryServiceImp(regester transport.Register, srv EventQueryServiceServer, opts ...transport.ServiceOption) { + _ops := transport.DefaultServiceOptions() + for _, op := range opts { + op(_ops) + } + RegisterEventQueryServiceHandler(regester, EventQueryServiceHandler(srv), _ops.HTTP...) + RegisterEventQueryServiceServer(regester, srv, _ops.GRPC...) +} + +// ServiceNames return all service names +func ServiceNames(svr ...string) []string { + return append(svr, + "erda.core.monitor.event.EventQueryService", + ) +} + +var ( + eventQueryServiceClientType = reflect.TypeOf((*EventQueryServiceClient)(nil)).Elem() + eventQueryServiceServerType = reflect.TypeOf((*EventQueryServiceServer)(nil)).Elem() + eventQueryServiceHandlerType = reflect.TypeOf((*EventQueryServiceHandler)(nil)).Elem() +) + +// EventQueryServiceClientType . +func EventQueryServiceClientType() reflect.Type { return eventQueryServiceClientType } + +// EventQueryServiceServerType . +func EventQueryServiceServerType() reflect.Type { return eventQueryServiceServerType } + +// EventQueryServiceHandlerType . +func EventQueryServiceHandlerType() reflect.Type { return eventQueryServiceHandlerType } + +func Types() []reflect.Type { + return []reflect.Type{ + // client types + eventQueryServiceClientType, + // server types + eventQueryServiceServerType, + // handler types + eventQueryServiceHandlerType, + } +} diff --git a/api/proto/core/monitor/event/event_query.proto b/api/proto/core/monitor/event/event_query.proto new file mode 100644 index 00000000000..792a8abbcc0 --- /dev/null +++ b/api/proto/core/monitor/event/event_query.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; +package erda.core.monitor.event; +option go_package = "github.com/erda-project/erda-proto-go/core/monitor/event/pb"; +import "google/api/annotations.proto"; +import "oap/event/event.proto"; + +service EventQueryService { + rpc GetEvents (GetEventsRequest) returns (GetEventsResponse) { + option (google.api.http) = { + get: "/api/events", + }; + } +} + +message GetEventsRequest { + string eventId = 1; + string traceId = 2; + string relationId = 3; + string relationType = 4; + int64 start = 5; + int64 end = 6; + int64 pageNo = 7; + int64 pageSize = 8; + bool debug = 9; +} + +message GetEventsResponse { + GetEventsResult data = 1; +} + +message GetEventsResult { + repeated oap.event.Event items = 1; +} \ No newline at end of file diff --git a/cmd/monitor/monitor/main.go b/cmd/monitor/monitor/main.go index e5545c8dd82..efbd2529ff0 100644 --- a/cmd/monitor/monitor/main.go +++ b/cmd/monitor/monitor/main.go @@ -25,6 +25,7 @@ import ( _ "github.com/erda-project/erda/modules/core/monitor/alert/details-apis" _ "github.com/erda-project/erda/modules/core/monitor/dataview" _ "github.com/erda-project/erda/modules/core/monitor/dataview/v1-chart-block" + _ "github.com/erda-project/erda/modules/core/monitor/event/query" _ "github.com/erda-project/erda/modules/core/monitor/event/storage/elasticsearch" _ "github.com/erda-project/erda/modules/core/monitor/log/query" _ "github.com/erda-project/erda/modules/core/monitor/log/storage/cassandra" diff --git a/conf/monitor/monitor/monitor.yaml b/conf/monitor/monitor/monitor.yaml index db1232994aa..1239c8a22c7 100644 --- a/conf/monitor/monitor/monitor.yaml +++ b/conf/monitor/monitor/monitor.yaml @@ -166,6 +166,7 @@ event-storage-elasticsearch: _enable: ${QUERY_EVENT_FROM_ES_ENABLE:false} query_timeout: "1m" read_page_size: 50 +erda.core.monitor.event.query: # metric elasticsearch.index.loader@metric: diff --git a/modules/core/monitor/event/query/event.query.service.go b/modules/core/monitor/event/query/event.query.service.go new file mode 100644 index 00000000000..a60b78a53ed --- /dev/null +++ b/modules/core/monitor/event/query/event.query.service.go @@ -0,0 +1,88 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package query + +import ( + context "context" + "fmt" + + pb "github.com/erda-project/erda-proto-go/core/monitor/event/pb" + commonPb "github.com/erda-project/erda-proto-go/oap/common/pb" + oapPb "github.com/erda-project/erda-proto-go/oap/event/pb" + "github.com/erda-project/erda/modules/core/monitor/event/storage" +) + +type eventQueryService struct { + p *provider + storageReader storage.Storage +} + +func (s *eventQueryService) GetEvents(ctx context.Context, req *pb.GetEventsRequest) (*pb.GetEventsResponse, error) { + if s.storageReader == nil { + return nil, fmt.Errorf("storage service is nil") + } + sel := &storage.Selector{ + Start: req.Start, + End: req.End, + Debug: req.Debug, + } + if len(req.EventId) > 0 { + sel.Filters = append(sel.Filters, &storage.Filter{ + Key: "event_id", + Op: storage.EQ, + Value: req.EventId, + }) + } + if len(req.RelationType) > 0 { + sel.Filters = append(sel.Filters, &storage.Filter{ + Key: "relations.res_type", + Op: storage.EQ, + Value: req.RelationType, + }) + } + if len(req.RelationId) > 0 { + sel.Filters = append(sel.Filters, &storage.Filter{ + Key: "relations.res_id", + Op: storage.EQ, + Value: req.RelationId, + }) + } + list, err := s.storageReader.QueryPaged(ctx, sel, int(req.PageNo), int(req.PageSize)) + if err != nil { + return nil, err + } + + resp := &pb.GetEventsResponse{Data: &pb.GetEventsResult{}} + for _, item := range list { + data := &oapPb.Event{ + EventID: item.EventID, + Name: item.Name, + Kind: oapPb.Event_EventKind(oapPb.Event_EventKind_value[item.Kind]), + TimeUnixNano: uint64(item.Timestamp), + Attributes: item.Tags, + Message: item.Content, + } + if item.Relations != nil { + data.Relations = &commonPb.Relation{ + ResID: item.Relations.ResID, + ResType: item.Relations.ResType, + TraceID: item.Relations.TraceID, + } + } + resp.Data.Items = append(resp.Data.Items, data) + } + + return resp, nil +} diff --git a/modules/core/monitor/event/query/event.query.service_test.go b/modules/core/monitor/event/query/event.query.service_test.go new file mode 100644 index 00000000000..cecf81a42f0 --- /dev/null +++ b/modules/core/monitor/event/query/event.query.service_test.go @@ -0,0 +1,131 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package query + +import ( + context "context" + reflect "reflect" + testing "testing" + + "github.com/golang/mock/gomock" + + servicehub "github.com/erda-project/erda-infra/base/servicehub" + pb "github.com/erda-project/erda-proto-go/core/monitor/event/pb" + "github.com/erda-project/erda/modules/core/monitor/event" +) + +// -go:generate mockgen -destination=./mock_storage.go -package query -source=../storage/storage.go Storage +func Test_eventQueryService_GetEvents(t *testing.T) { + + type args struct { + ctx context.Context + req *pb.GetEventsRequest + } + tests := []struct { + name string + service string + config string + args args + wantResp *pb.GetEventsResponse + wantErr bool + }{ + // TODO: Add test cases. + //{ + // "case 1", + // "erda.core.monitor.event.EventQueryService", + // ` + // erda.core.monitor.event.query: + // `, + // args{ + // context.TODO(), + // &pb.GetEventsRequest{ + // // TODO: setup fields + // }, + // }, + // &pb.GetEventsResponse{ + // // TODO: setup fields. + // }, + // false, + //}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hub := servicehub.New() + events := hub.Events() + go func() { + hub.RunWithOptions(&servicehub.RunOptions{Content: tt.config}) + }() + err := <-events.Started() + if err != nil { + t.Error(err) + return + } + srv := hub.Service(tt.service).(pb.EventQueryServiceServer) + got, err := srv.GetEvents(tt.args.ctx, tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("eventQueryService.GetEvents() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.wantResp) { + t.Errorf("eventQueryService.GetEvents() = %v, want %v", got, tt.wantResp) + } + }) + } +} + +func Test_eventQueryService_GetEvents_WithValidParams_Should_Return_NonEmptyList(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := NewMockStorage(ctrl) + storage.EXPECT(). + QueryPaged(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return([]*event.Event{ + {EventID: "event-id-1"}, + }, nil) + + querySvc := &eventQueryService{ + storageReader: storage, + } + result, err := querySvc.GetEvents(context.Background(), &pb.GetEventsRequest{ + Start: 1, + End: 2, + TraceId: "trace-id", + RelationId: "res-id", + RelationType: "res-type", + }) + if err != nil { + t.Errorf("should not throw error") + } + if result == nil || len(result.Data.Items) != 1 { + t.Errorf("assert result failed") + } + +} + +func Test_eventQueryService_GetEvents_With_NilStorage_Should_Return_Error(t *testing.T) { + querySvc := &eventQueryService{ + storageReader: nil, + } + result, err := querySvc.GetEvents(context.Background(), &pb.GetEventsRequest{ + Start: 1, + End: 2, + TraceId: "trace-id", + RelationId: "res-id", + RelationType: "res-type", + }) + if result != nil || err == nil { + t.Errorf("should throw error") + } +} diff --git a/modules/core/monitor/event/query/mock_storage.go b/modules/core/monitor/event/query/mock_storage.go new file mode 100644 index 00000000000..821c41c5eda --- /dev/null +++ b/modules/core/monitor/event/query/mock_storage.go @@ -0,0 +1,69 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ../storage/storage.go + +// Package query is a generated GoMock package. +package query + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + event "github.com/erda-project/erda/modules/core/monitor/event" + storage "github.com/erda-project/erda/modules/core/monitor/event/storage" + storekit "github.com/erda-project/erda/modules/core/monitor/storekit" +) + +// MockStorage is a mock of Storage interface. +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage. +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance. +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// NewWriter mocks base method. +func (m *MockStorage) NewWriter(ctx context.Context) (storekit.BatchWriter, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewWriter", ctx) + ret0, _ := ret[0].(storekit.BatchWriter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewWriter indicates an expected call of NewWriter. +func (mr *MockStorageMockRecorder) NewWriter(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewWriter", reflect.TypeOf((*MockStorage)(nil).NewWriter), ctx) +} + +// QueryPaged mocks base method. +func (m *MockStorage) QueryPaged(ctx context.Context, sel *storage.Selector, pageNo, pageSize int) ([]*event.Event, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryPaged", ctx, sel, pageNo, pageSize) + ret0, _ := ret[0].([]*event.Event) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QueryPaged indicates an expected call of QueryPaged. +func (mr *MockStorageMockRecorder) QueryPaged(ctx, sel, pageNo, pageSize interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryPaged", reflect.TypeOf((*MockStorage)(nil).QueryPaged), ctx, sel, pageNo, pageSize) +} diff --git a/modules/core/monitor/event/query/provider.go b/modules/core/monitor/event/query/provider.go new file mode 100644 index 00000000000..f2817b2c83c --- /dev/null +++ b/modules/core/monitor/event/query/provider.go @@ -0,0 +1,69 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package query + +import ( + logs "github.com/erda-project/erda-infra/base/logs" + servicehub "github.com/erda-project/erda-infra/base/servicehub" + transport "github.com/erda-project/erda-infra/pkg/transport" + pb "github.com/erda-project/erda-proto-go/core/monitor/event/pb" + "github.com/erda-project/erda/modules/core/monitor/event/storage" + "github.com/erda-project/erda/pkg/common/apis" +) + +type config struct { +} + +// +provider +type provider struct { + Cfg *config + Log logs.Logger + Register transport.Register + StorageReader storage.Storage `autowired:"event-storage-elasticsearch-reader" optional:"true"` + eventQueryService *eventQueryService +} + +func (p *provider) Init(ctx servicehub.Context) error { + // TODO initialize something ... + + p.eventQueryService = &eventQueryService{p, p.StorageReader} + if p.Register != nil { + pb.RegisterEventQueryServiceImp(p.Register, p.eventQueryService, apis.Options()) + } + return nil +} + +func (p *provider) Provide(ctx servicehub.DependencyContext, args ...interface{}) interface{} { + switch { + case ctx.Service() == "erda.core.monitor.event.EventQueryService" || ctx.Type() == pb.EventQueryServiceServerType() || ctx.Type() == pb.EventQueryServiceHandlerType(): + return p.eventQueryService + } + return p +} + +func init() { + servicehub.Register("erda.core.monitor.event.query", &servicehub.Spec{ + Services: pb.ServiceNames(), + Types: pb.Types(), + OptionalDependencies: []string{"service-register"}, + Description: "", + ConfigFunc: func() interface{} { + return &config{} + }, + Creator: func() servicehub.Provider { + return &provider{} + }, + }) +} From da0feca502caf5293430a690f14f764a2cf3378a Mon Sep 17 00:00:00 2001 From: RecallSong <13607438+recallsong@users.noreply.github.com> Date: Fri, 29 Oct 2021 10:16:14 +0800 Subject: [PATCH 8/8] use search query logs (#2671) * use search query * use search query * use search after --- .../log/storage/elasticsearch/iterator.go | 217 +++++++++++++++--- 1 file changed, 191 insertions(+), 26 deletions(-) diff --git a/modules/core/monitor/log/storage/elasticsearch/iterator.go b/modules/core/monitor/log/storage/elasticsearch/iterator.go index 5bdafae2105..e7dec5542db 100644 --- a/modules/core/monitor/log/storage/elasticsearch/iterator.go +++ b/modules/core/monitor/log/storage/elasticsearch/iterator.go @@ -34,9 +34,9 @@ import ( "github.com/erda-project/erda/modules/core/monitor/storekit/elasticsearch/index/loader" ) -func (p *provider) getSearchSource(sel *storage.Selector) *elastic.SearchSource { +func getSearchSource(start, end int64, sel *storage.Selector) *elastic.SearchSource { searchSource := elastic.NewSearchSource() - query := elastic.NewBoolQuery().Filter(elastic.NewRangeQuery("timestamp").Gte(sel.Start).Lt(sel.End)) + query := elastic.NewBoolQuery().Filter(elastic.NewRangeQuery("timestamp").Gte(start).Lt(end)) for _, filter := range sel.Filters { val, ok := filter.Value.(string) if !ok { @@ -52,24 +52,36 @@ func (p *provider) getSearchSource(sel *storage.Selector) *elastic.SearchSource return searchSource.Query(query) } +const useScrollQuery = false + func (p *provider) Iterator(ctx context.Context, sel *storage.Selector) (storekit.Iterator, error) { // TODO check org indices := p.Loader.Indices(ctx, sel.Start, sel.End, loader.KeyPath{ Recursive: true, }) - searchSource := p.getSearchSource(sel) - if sel.Debug { - source, _ := searchSource.Source() - fmt.Printf("indices: %v\nsearchSource: %s\n", strings.Join(indices, ","), jsonx.MarshalAndIndent(source)) - } - return &scrollIterator{ - ctx: ctx, - sel: sel, - searchSource: searchSource, - client: p.client, - timeout: p.Cfg.QueryTimeout, - pageSize: p.Cfg.ReadPageSize, - indices: indices, + if useScrollQuery { + searchSource := getSearchSource(sel.Start, sel.End, sel) + if sel.Debug { + source, _ := searchSource.Source() + fmt.Printf("indices: %v\nsearchSource: %s\n", strings.Join(indices, ","), jsonx.MarshalAndIndent(source)) + } + return &scrollIterator{ + ctx: ctx, + sel: sel, + searchSource: searchSource, + client: p.client, + timeout: p.Cfg.QueryTimeout, + pageSize: p.Cfg.ReadPageSize, + indices: indices, + }, nil + } + return &searchIterator{ + ctx: ctx, + sel: sel, + client: p.client, + timeout: p.Cfg.QueryTimeout, + pageSize: p.Cfg.ReadPageSize, + indices: indices, }, nil } @@ -199,20 +211,16 @@ func (it *scrollIterator) fetch(dir iteratorDir) error { if it.dir != iteratorBackward { ascending = true } - resp, it.err = it.client.Scroll(it.indices...).KeepAlive(keepalive). IgnoreUnavailable(true).AllowNoIndices(true). SearchSource(it.searchSource).Size(it.pageSize).Sort("timestamp", ascending).Sort("offset", ascending).Do(ctx) - if it.err != nil { - return it.err - } } else { resp, it.err = it.client.Scroll(it.indices...).ScrollId(it.lastScrollID).KeepAlive(keepalive). IgnoreUnavailable(true).AllowNoIndices(true). Size(it.pageSize).Do(ctx) - if it.err != nil { - return it.err - } + } + if it.err != nil { + return it.err } // save scrollID @@ -229,7 +237,7 @@ func (it *scrollIterator) fetch(dir iteratorDir) error { } // parse result - it.buffer = parseHits(resp.Hits.Hits, it.sel.Start, it.sel.End) + it.buffer, _, _ = parseHits(resp.Hits.Hits, it.sel.Start, it.sel.End, "") return nil }() } @@ -269,9 +277,164 @@ func (it *scrollIterator) checkClosed() bool { return false } -func parseHits(hits []*elastic.SearchHit, start, end int64) (list []*pb.LogItem) { +type searchIterator struct { + log logs.Logger + ctx context.Context + sel *storage.Selector + + lastID string + lastSortValues []interface{} + + client *elastic.Client + timeout time.Duration + pageSize int + indices []string + + dir iteratorDir + buffer []*pb.LogItem + value *pb.LogItem + err error + closed bool +} + +func (it *searchIterator) First() bool { + if it.checkClosed() { + return false + } + it.lastSortValues, it.lastID = nil, "" + it.fetch(iteratorForward) + return it.yield() +} + +func (it *searchIterator) Last() bool { + if it.checkClosed() { + return false + } + it.lastSortValues, it.lastID = nil, "" + it.fetch(iteratorBackward) + return it.yield() +} + +func (it *searchIterator) Next() bool { + if it.checkClosed() { + return false + } + if it.dir == iteratorBackward { + it.err = storekit.ErrOpNotSupported + return false + } + if it.yield() { + return true + } + it.fetch(iteratorForward) + return it.yield() +} + +func (it *searchIterator) Prev() bool { + if it.checkClosed() { + return false + } + if it.dir == iteratorForward { + it.err = storekit.ErrOpNotSupported + return false + } + if it.yield() { + return true + } + it.fetch(iteratorBackward) + return it.yield() +} + +func (it *searchIterator) Value() storekit.Data { return it.value } +func (it *searchIterator) Error() error { + if it.err == io.EOF { + return nil + } + return it.err +} + +func (it *searchIterator) fetch(dir iteratorDir) error { + if len(it.indices) <= 0 { + it.err = io.EOF + return it.err + } + ms := int64(it.timeout.Milliseconds()) + if ms < 1 { + ms = 1 + } + timeout := strconv.FormatInt(ms, 10) + "ms" + + var ascending bool + if dir != iteratorBackward { + ascending = true + } + + it.dir = dir + it.buffer = nil + for it.err == nil && len(it.buffer) <= 0 { + func() error { + ctx, cancel := context.WithTimeout(it.ctx, it.timeout) + defer cancel() + var resp *elastic.SearchResult + searchSource := getSearchSource(it.sel.Start, it.sel.End, it.sel) + resp, it.err = it.client.Search(it.indices...).IgnoreUnavailable(true).AllowNoIndices(true).Timeout(timeout). + SearchSource(searchSource).Size(it.pageSize). + Sort("timestamp", ascending).Sort("offset", ascending).SearchAfter(it.lastSortValues...). + Do(ctx) + if it.err != nil { + return it.err + } + if resp == nil || resp.Hits == nil || len(resp.Hits.Hits) <= 0 { + it.err = io.EOF + return it.err + } + it.buffer, it.lastSortValues, it.lastID = parseHits(resp.Hits.Hits, it.sel.Start, it.sel.End, it.lastID) + if len(resp.Hits.Hits) < it.pageSize { + it.err = io.EOF + return it.err + } + return nil + }() + } + return nil +} + +func (it *searchIterator) yield() bool { + if len(it.buffer) > 0 { + it.value = it.buffer[0] + it.buffer = it.buffer[1:] + return true + } + return false +} + +func (it *searchIterator) Close() error { + it.closed = true + return nil +} + +func (it *searchIterator) checkClosed() bool { + if it.closed { + if it.err == nil { + it.err = storekit.ErrIteratorClosed + } + return true + } + select { + case <-it.ctx.Done(): + if it.err == nil { + it.err = storekit.ErrIteratorClosed + } + return true + default: + } + return false +} + +func parseHits(hits []*elastic.SearchHit, start, end int64, removeID string) (list []*pb.LogItem, lastSortValue []interface{}, lastID string) { + checkID := len(removeID) > 0 for _, hit := range hits { - if hit.Source == nil { + if hit.Source == nil || (checkID && hit.Id == removeID) { continue } data, err := parseData(*hit.Source) @@ -281,8 +444,10 @@ func parseHits(hits []*elastic.SearchHit, start, end int64) (list []*pb.LogItem) if start <= data.Timestamp && data.Timestamp < end { list = append(list, data) } + lastSortValue = hit.Sort + lastID = hit.Id } - return list + return list, lastSortValue, lastID } func parseData(byts []byte) (*pb.LogItem, error) {