From 1291be16d17b93fe4ba8420a04fa60a8ed868e9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E9=87=91=E8=99=8E?= <1050780355@qq.com> Date: Wed, 29 Nov 2023 11:58:09 +0800 Subject: [PATCH 1/3] update kafka monitor --- service/database/api/req.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/database/api/req.go b/service/database/api/req.go index 218b09f9488..efddb29dddc 100644 --- a/service/database/api/req.go +++ b/service/database/api/req.go @@ -120,10 +120,10 @@ var ( } Kafka = map[string]string{ - "cpu": "round(max by (pod) (rate(container_cpu_usage_seconds_total{namespace=~\"#\",pod=~\"@-kafka-\\\\d\" ,container=\"kafka\"}[5m])) / on (pod) (max by (pod) (container_spec_cpu_quota{namespace=~\"#\", pod=~\"@-kafka-\\\\d\",container=\"kafka\"} / 100000)) * 100,0.01)", - "memory": "round(max by (pod)(container_memory_usage_bytes{namespace=~\"#\",pod=~\"@-kafka-\\\\d\",container=\"kafka\" })/ on (pod) (max by (pod) (container_spec_memory_limit_bytes{namespace=~\"#\", pod=~\"@-kafka-\\\\d\",container=\"kafka\"})) * 100,0.01)", - "disk_capacity": "(max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-kafka-\\\\d\"}))", - "disk": "round((max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-kafka-\\\\d\"})) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-kafka-\\\\d\"})) * 100, 0.01)", - "disk_used": "(max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-kafka-\\\\d\"}))", + "cpu": "round(max by (pod) (rate(container_cpu_usage_seconds_total{namespace=~\"#\",pod=~\"@-(kafka-broker|kafka-server|controller)-\\\\d\" ,container=\"kafka\"}[5m])) / on (pod) (max by (pod) (container_spec_cpu_quota{namespace=~\"#\", pod=~\"@-(kafka-broker|kafka-server|controller)-\\\\d\",container=\"kafka\"} / 100000)) * 100,0.01)", + "memory": "round(max by (pod)(container_memory_usage_bytes{namespace=~\"#\",pod=~\"@-(kafka-broker|kafka-server|controller)-\\\\d\",container=\"kafka\" })/ on (pod) (max by (pod) (container_spec_memory_limit_bytes{namespace=~\"#\", pod=~\"@-(kafka-broker|kafka-server|controller)-\\\\d\",container=\"kafka\"})) * 100,0.01)", + "disk_capacity": "(max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-(kafka-broker|kafka-server)-\\\\d\"}))", + "disk": "round((max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-(kafka-broker|kafka-server)-\\\\d\"})) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-(kafka-broker|kafka-server)-\\\\d\"})) * 100, 0.01)", + "disk_used": "(max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes {namespace=~\"#\", persistentvolumeclaim=~\"data-@-(kafka-broker|kafka-server)-\\\\d\"}))", } ) From 8d9d070c8c842797b7c276fa4515724e951cb186 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E9=87=91=E8=99=8E?= <1050780355@qq.com> Date: Tue, 23 Jul 2024 14:58:46 +0800 Subject: [PATCH 2/3] update feishu notification --- service/exceptionmonitor/api/api.go | 9 ++ .../deploy/manifests/deploy.yaml.tmpl | 4 + service/exceptionmonitor/go.mod | 1 + service/exceptionmonitor/go.sum | 8 ++ .../helper/monitor/database_backup_monitor.go | 10 +- .../helper/monitor/database_monitor.go | 23 ++-- .../monitor/database_performance_monitor.go | 4 +- .../helper/notification/feishu.go | 109 +++++++++++++----- service/exceptionmonitor/main.go | 3 + 9 files changed, 129 insertions(+), 42 deletions(-) diff --git a/service/exceptionmonitor/api/api.go b/service/exceptionmonitor/api/api.go index 818c0ee2e15..fa271716692 100644 --- a/service/exceptionmonitor/api/api.go +++ b/service/exceptionmonitor/api/api.go @@ -67,11 +67,20 @@ var ( DatabaseExceptionMonitorThreshold float64 DatabaseCPUMonitorThreshold float64 DatabaseMemMonitorThreshold float64 + APPID string + APPSECRET string + DatabaseStatusMessageIDMap = make(map[string]string) + DatabaseDiskMessageIDMap = make(map[string]string) + DatabaseCPUMessageIDMap = make(map[string]string) + DatabaseMemMessageIDMap = make(map[string]string) + DatabaseBackupMessageIDMap = make(map[string]string) ) func GetENV() error { var missingEnvVars []string + APPID = getEnvWithCheck("APPID", &missingEnvVars) + APPSECRET = getEnvWithCheck("APPSECRET", &missingEnvVars) BaseURL = getEnvWithCheck("BaseURL", &missingEnvVars) ClusterName = getEnvWithCheck("ClusterName", &missingEnvVars) MonitorType = getEnvWithCheck("MonitorType", &missingEnvVars) diff --git a/service/exceptionmonitor/deploy/manifests/deploy.yaml.tmpl b/service/exceptionmonitor/deploy/manifests/deploy.yaml.tmpl index 32246ee0e8c..b524281d152 100644 --- a/service/exceptionmonitor/deploy/manifests/deploy.yaml.tmpl +++ b/service/exceptionmonitor/deploy/manifests/deploy.yaml.tmpl @@ -94,6 +94,10 @@ spec: value: "" - name: DatabaseMemMonitorThreshold value: "" + - name: APPID + value: "" + - name: APPSECRET + value: "" volumeMounts: - name: kubeconfig mountPath: /home/nonroot/kubeconfig diff --git a/service/exceptionmonitor/go.mod b/service/exceptionmonitor/go.mod index 63e655056fd..c3b3589e9e7 100644 --- a/service/exceptionmonitor/go.mod +++ b/service/exceptionmonitor/go.mod @@ -40,6 +40,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/larksuite/oapi-sdk-go/v3 v3.2.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matoous/go-nanoid/v2 v2.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/service/exceptionmonitor/go.sum b/service/exceptionmonitor/go.sum index 8db5571f777..90fef7e8a9f 100644 --- a/service/exceptionmonitor/go.sum +++ b/service/exceptionmonitor/go.sum @@ -63,8 +63,10 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= @@ -93,6 +95,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/larksuite/oapi-sdk-go v1.1.48 h1:RHRr5LW68AibBzXVRXObUpkbS6TXapl4TAyhITVvB4w= +github.com/larksuite/oapi-sdk-go v1.1.48/go.mod h1:7ybKAbVdKBjXuX0YrMTfnWUyCaIe/zeI1wqjNfN9XOk= +github.com/larksuite/oapi-sdk-go/v3 v3.2.9 h1:9zQAGrzhibNwdaGRkWUP1cAd2k2dJJDpbSffcfK0wPw= +github.com/larksuite/oapi-sdk-go/v3 v3.2.9/go.mod h1:ZEplY+kwuIrj/nqw5uSCINNATcH3KdxSN7y+UxYY5fI= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= @@ -175,6 +181,7 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8= golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI= +golang.org/x/oauth2 v0.12.0/go.mod h1:A74bZ3aGXgCY0qaIC9Ahg6Lglin4AMAco8cIv9baba4= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -271,3 +278,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/service/exceptionmonitor/helper/monitor/database_backup_monitor.go b/service/exceptionmonitor/helper/monitor/database_backup_monitor.go index af43254bbb7..de7de2971e2 100644 --- a/service/exceptionmonitor/helper/monitor/database_backup_monitor.go +++ b/service/exceptionmonitor/helper/monitor/database_backup_monitor.go @@ -50,9 +50,17 @@ func processBackup(backup unstructured.Unstructured) { if !found || status != "Failed" { return } + notificationInfo := notification.Info{ + DatabaseClusterName: backupName, + Namespace: namespace, + Status: status, + ExceptionType: "备份", + PerformanceType: "Backup", + NotificationType: "exception", + } if _, ok := api.LastBackupStatusMap[backupName]; !ok { message := notification.GetBackupMessage("exception", namespace, backupName, status, startTime, "") - if err := notification.SendFeishuNotification(message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil { + if err := notification.SendFeishuNotification(notificationInfo, message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil { log.Printf("Error sending exception notification:%v", err) } } else { diff --git a/service/exceptionmonitor/helper/monitor/database_monitor.go b/service/exceptionmonitor/helper/monitor/database_monitor.go index 8e288499f84..de1d3ada2dc 100644 --- a/service/exceptionmonitor/helper/monitor/database_monitor.go +++ b/service/exceptionmonitor/helper/monitor/database_monitor.go @@ -90,8 +90,8 @@ func processCluster(cluster metav1unstructured.Unstructured) { if _, ok := api.LastDatabaseClusterStatus[databaseClusterName]; !ok { api.LastDatabaseClusterStatus[databaseClusterName] = status api.ExceptionDatabaseMap[databaseClusterName] = true - alertMessage, feishuWebHook := prepareAlertMessage(databaseClusterName, namespace, status, "", "status is empty", 0) - if err := sendAlert(alertMessage, feishuWebHook, databaseClusterName); err != nil { + alertMessage, feishuWebHook, notification := prepareAlertMessage(databaseClusterName, namespace, status, "", "status is empty", 0) + if err := sendAlert(alertMessage, feishuWebHook, notification); err != nil { log.Printf("Failed to send feishu %s in ns %s: %v", databaseClusterName, namespace, err) } } @@ -110,7 +110,7 @@ func handleClusterRecovery(databaseClusterName, namespace, status string) { NotificationType: "recovery", } recoveryMessage := notification.GetNotificationMessage(notificationInfo) - if err := notification.SendFeishuNotification(recoveryMessage, api.FeishuWebHookMap[databaseClusterName]); err != nil { + if err := notification.SendFeishuNotification(notificationInfo, recoveryMessage, api.FeishuWebHookMap[databaseClusterName]); err != nil { log.Printf("Error sending recovery notification: %v", err) } cleanClusterStatus(databaseClusterName) @@ -146,9 +146,8 @@ func processClusterException(databaseClusterName, namespace, databaseType, statu if err != nil { return err } - - alertMessage, feishuWebHook := prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, databaseEvents, maxUsage) - if err := sendAlert(alertMessage, feishuWebHook, databaseClusterName); err != nil { + alertMessage, feishuWebHook, notification := prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, databaseEvents, maxUsage) + if err := sendAlert(alertMessage, feishuWebHook, notification); err != nil { return err } } else { @@ -183,7 +182,7 @@ func databaseQuotaExceptionFilter(databaseEvents string) bool { return !strings.Contains(databaseEvents, api.ExceededQuotaException) } -func prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, databaseEvents string, maxUsage float64) (string, string) { +func prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, databaseEvents string, maxUsage float64) (string, string, notification.Info) { alertMessage, feishuWebHook := "", "" notificationInfo := notification.Info{ DatabaseClusterName: databaseClusterName, @@ -211,12 +210,12 @@ func prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, data } api.DiskFullNamespaceMap[databaseClusterName] = true } - return alertMessage, feishuWebHook + return alertMessage, feishuWebHook, notificationInfo } -func sendAlert(alertMessage, feishuWebHook, databaseClusterName string) error { - api.FeishuWebHookMap[databaseClusterName] = feishuWebHook - return notification.SendFeishuNotification(alertMessage, feishuWebHook) +func sendAlert(alertMessage, feishuWebHook string, notificationInfo notification.Info) error { + api.FeishuWebHookMap[notificationInfo.DatabaseClusterName] = feishuWebHook + return notification.SendFeishuNotification(notificationInfo, alertMessage, feishuWebHook) } func notifyQuotaExceeded(databaseClusterName, namespace, status, debtLevel string) error { @@ -231,5 +230,5 @@ func notifyQuotaExceeded(databaseClusterName, namespace, status, debtLevel strin } alertMessage := notification.GetNotificationMessage(notificationInfo) notification.CreateNotification(namespace, databaseClusterName, status, api.ExceededQuotaException) - return notification.SendFeishuNotification(alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLOther"]) + return notification.SendFeishuNotification(notificationInfo, alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLOther"]) } diff --git a/service/exceptionmonitor/helper/monitor/database_performance_monitor.go b/service/exceptionmonitor/helper/monitor/database_performance_monitor.go index 7139c0c78aa..25523538d75 100644 --- a/service/exceptionmonitor/helper/monitor/database_performance_monitor.go +++ b/service/exceptionmonitor/helper/monitor/database_performance_monitor.go @@ -118,7 +118,7 @@ func processUsage(usage float64, threshold float64, performanceType, UID string, } if usage >= threshold && !monitorMap[UID] { alertMessage := notification.GetNotificationMessage(info) - if err := notification.SendFeishuNotification(alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]); err != nil { + if err := notification.SendFeishuNotification(info, alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]); err != nil { log.Printf("Failed to send notification: %v", err) } monitorMap[UID] = true @@ -132,7 +132,7 @@ func processUsage(usage float64, threshold float64, performanceType, UID string, } else if usage < threshold && monitorMap[UID] { info.NotificationType = "recovery" alertMessage := notification.GetNotificationMessage(info) - if err := notification.SendFeishuNotification(alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]); err != nil { + if err := notification.SendFeishuNotification(info, alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]); err != nil { log.Printf("Failed to send notification: %v", err) } delete(monitorMap, UID) diff --git a/service/exceptionmonitor/helper/notification/feishu.go b/service/exceptionmonitor/helper/notification/feishu.go index d4d8adc0e14..54c7322e744 100644 --- a/service/exceptionmonitor/helper/notification/feishu.go +++ b/service/exceptionmonitor/helper/notification/feishu.go @@ -1,16 +1,24 @@ package notification import ( - "bytes" + "context" "encoding/json" "fmt" - "net/http" + "log" + "regexp" "github.com/labring/sealos/service/exceptionmonitor/api" + lark "github.com/larksuite/oapi-sdk-go/v3" + larkcore "github.com/larksuite/oapi-sdk-go/v3/core" + larkim "github.com/larksuite/oapi-sdk-go/v3/service/im/v1" ) const ExceptionType = "exception" +var ( + feiShuClient *lark.Client +) + type Info struct { DatabaseClusterName string Namespace string @@ -26,6 +34,10 @@ type Info struct { ExceptionType string } +func InitFeishuClient() { + feiShuClient = lark.NewClient(api.APPID, api.APPSECRET) +} + func GetNotificationMessage(notificationInfo Info) string { headerTemplate := "red" titleContent := "数据库" + notificationInfo.ExceptionType + "告警" @@ -148,50 +160,93 @@ func GetNotificationMessage(notificationInfo Info) string { return string(databaseMessage) } -func SendFeishuNotification(message, feishuWebHook string) error { - //log.Print(message, feishuWebHook) +func SendFeishuNotification(notification Info, message, feishuWebHook string) error { if api.MonitorType != "all" { feishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLImportant"] } - // Create a map to hold the POST request body - bodyMap := map[string]interface{}{ - "msg_type": "interactive", - "card": message, + messageIDMap := getMessageIDMap(notification.PerformanceType) + + if messageID, ok := messageIDMap[notification.DatabaseClusterName]; ok { + if err := updateFeishuNotification(messageID, message); err != nil { + return err + } + delete(messageIDMap, notification.DatabaseClusterName) + } else { + if err := createFeishuNotification(notification, message, feishuWebHook, messageIDMap); err != nil { + return err + } } + return nil +} - // Convert the map to a JSON byte slice - bodyBytes, err := json.Marshal(bodyMap) - if err != nil { - return err +func getMessageIDMap(performanceType string) map[string]string { + switch performanceType { + case "磁盘": + return api.DatabaseDiskMessageIDMap + case "内存": + return api.DatabaseMemMessageIDMap + case "CPU": + return api.DatabaseCPUMessageIDMap + case "Backup": + return api.DatabaseBackupMessageIDMap + default: + return api.DatabaseStatusMessageIDMap } +} - // Create a new HTTP request - req, err := http.NewRequest("POST", feishuWebHook, bytes.NewBuffer(bodyBytes)) - if err != nil { +func updateFeishuNotification(messageID, message string) error { + req := larkim.NewPatchMessageReqBuilder(). + MessageId(messageID). + Body(larkim.NewPatchMessageReqBodyBuilder(). + Content(message).Build()).Build() + + resp, err := feiShuClient.Im.Message.Patch(context.Background(), req) + if err != nil || !resp.Success() { + log.Println("Error:", resp.Code, resp.Msg, resp.RequestId()) return err } + return nil +} - // Set the request header - req.Header.Set("Content-Type", "application/json") +func createFeishuNotification(notification Info, message, feishuWebHook string, messageIDMap map[string]string) error { + req := larkim.NewCreateMessageReqBuilder(). + ReceiveIdType("chat_id"). + Body(larkim.NewCreateMessageReqBodyBuilder(). + ReceiveId(feishuWebHook). + MsgType("interactive"). + Content(message).Build()).Build() - // Send the request using the default client - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { + resp, err := feiShuClient.Im.Message.Create(context.Background(), req) + + if err != nil || !resp.Success() { + fmt.Println(111) + log.Println("Error:", resp.Code, resp.Msg, resp.RequestId()) return err } - defer resp.Body.Close() - // Print the status and response body - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Body) - if err != nil { - return err + respStr := larkcore.Prettify(resp) + messageID := extractAndPrintMessageID(respStr) + if notification.DatabaseClusterName == "Backup" { + return nil + } + if messageID == "" { + log.Printf("send databaseName %s feishu notification, return no messageID", notification.DatabaseClusterName) + } else { + messageIDMap[notification.DatabaseClusterName] = messageID } return nil } +func extractAndPrintMessageID(str string) string { + re := regexp.MustCompile(`MessageId:\s*"([^"]+)"`) + match := re.FindStringSubmatch(str) + if len(match) > 1 { + return match[1] + } + return "" +} + func createCard(headerTemplate, headerTitle string, elements []map[string]string) map[string]interface{} { card := map[string]interface{}{ "config": map[string]bool{ diff --git a/service/exceptionmonitor/main.go b/service/exceptionmonitor/main.go index 0da7b443eae..00990fa736e 100644 --- a/service/exceptionmonitor/main.go +++ b/service/exceptionmonitor/main.go @@ -3,6 +3,8 @@ package main import ( "log" + "github.com/labring/sealos/service/exceptionmonitor/helper/notification" + "github.com/labring/sealos/service/exceptionmonitor/api" "github.com/labring/sealos/service/exceptionmonitor/dao" "github.com/labring/sealos/service/exceptionmonitor/helper/client" @@ -20,6 +22,7 @@ func main() { } func initialize() error { + notification.InitFeishuClient() if err := api.GetENV(); err != nil { return err } From 2d03042d3d1d2d7bde53f12037dc4ea023d89ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E9=87=91=E8=99=8E?= <1050780355@qq.com> Date: Wed, 24 Jul 2024 17:55:23 +0800 Subject: [PATCH 3/3] update feishu notification --- service/exceptionmonitor/api/api.go | 1 + .../helper/monitor/database_backup_monitor.go | 102 +++++------------- .../helper/monitor/database_monitor.go | 19 +++- .../helper/notification/desktop.go | 13 +-- .../helper/notification/feishu.go | 28 ++++- service/exceptionmonitor/main.go | 2 +- 6 files changed, 80 insertions(+), 85 deletions(-) diff --git a/service/exceptionmonitor/api/api.go b/service/exceptionmonitor/api/api.go index fa271716692..0a13961c5ff 100644 --- a/service/exceptionmonitor/api/api.go +++ b/service/exceptionmonitor/api/api.go @@ -48,6 +48,7 @@ var ( MemMonitorNamespaceMap = make(map[string]bool) LastBackupStatusMap = make(map[string]string) IsSendBackupStatusMap = make(map[string]string) + DatabaseNamespaceMap = make(map[string]string) ExceededQuotaException = "exceeded quota" DiskException = "Writing to log file failed" OwnerLabel = "user.sealos.io/owner" diff --git a/service/exceptionmonitor/helper/monitor/database_backup_monitor.go b/service/exceptionmonitor/helper/monitor/database_backup_monitor.go index de7de2971e2..b6d0ed33cec 100644 --- a/service/exceptionmonitor/helper/monitor/database_backup_monitor.go +++ b/service/exceptionmonitor/helper/monitor/database_backup_monitor.go @@ -2,9 +2,13 @@ package monitor import ( "context" + "fmt" "log" + "strings" "time" + "k8s.io/apimachinery/pkg/api/errors" + "github.com/labring/sealos/service/exceptionmonitor/api" "github.com/labring/sealos/service/exceptionmonitor/helper/notification" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,7 +29,7 @@ func DatabaseBackupMonitor() { if err := checkDatabaseBackups(); err != nil { log.Printf("Failed to check database backups: %v", err) } - time.Sleep(1 * time.Hour) + time.Sleep(1 * time.Minute) } } @@ -50,6 +54,23 @@ func processBackup(backup unstructured.Unstructured) { if !found || status != "Failed" { return } + fmt.Println(backupName, namespace) + debt, _, _ := checkDebt(namespace) + if !debt { + return + } + backupPolicyName, _, _ := unstructured.NestedString(backup.Object, "spec", "backupPolicyName") + databaseName := getPrefix(backupPolicyName) + fmt.Println(databaseName) + cluster, err := api.DynamicClient.Resource(databaseClusterGVR).Namespace(namespace).Get(context.Background(), databaseName, metav1.GetOptions{}) + if cluster != nil && errors.IsNotFound(err) { + return + } + dbStatus, _, _ := unstructured.NestedString(cluster.Object, "status", "phase") + if dbStatus == "Stopped" { + return + } + fmt.Println(dbStatus) notificationInfo := notification.Info{ DatabaseClusterName: backupName, Namespace: namespace, @@ -63,79 +84,14 @@ func processBackup(backup unstructured.Unstructured) { if err := notification.SendFeishuNotification(notificationInfo, message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil { log.Printf("Error sending exception notification:%v", err) } - } else { api.LastBackupStatusMap[backupName] = status } - //backupPolicyName, found, err := unstructured.NestedString(backup.Object, "spec", "backupPolicyName") - //if err != nil || !found { - // log.Printf("Unable to get %s backupPolicyName in ns %s:%v", backupName, namespace, err) - // return - //} - //handleBackupStatus(backupName, namespace, status, startTime, backupPolicyName) } -//func handleBackupStatus(backupName, namespace, status, startTime, backupPolicyName string) { -// //if status == "Completed" { -// // handleBackupCompletion(backupName, namespace, status, startTime) -// // return -// //} -// //if CheckackupFailure(backupName, namespace, status, backupPolicyName) { -// // err := api.DynamicClient.Resource(backupGVR).Namespace(namespace).Delete(context.Background(), backupName, metav1.DeleteOptions{}) -// // if err != nil { -// // log.Printf("Failed to delete%s in ns %s:%v", backupName, namespace, err) -// // } -// //} -// -//} - -//func handleBackupCompletion(backupName, namespace, status, startTime string) { -// if _, ok := api.LastBackupStatusMap[backupName]; ok { -// message := notification.GetBackupMessage("recovery", namespace, backupName, status, startTime, "") -// if err := notification.SendFeishuNotification(message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil { -// log.Printf("Error sending recovery notification:%v", err) -// } -// delete(api.LastBackupStatusMap, backupName) -// delete(api.IsSendBackupStatusMap, backupName) -// } -//} -// -//func CheckackupFailure(backupName, namespace, status, backupPolicyName string) bool { -// if _, ok := api.LastBackupStatusMap[backupName]; !ok { -// api.LastBackupStatusMap[backupName] = status -// return false -// } -// if _, ok := api.IsSendBackupStatusMap[backupName]; ok { -// return false -// } -// -// if ok, _ := checkFailedBackup(backupPolicyName, namespace); ok { -// message := notification.GetBackupMessage("exception", namespace, backupName, "Failed", "", "") -// if err := notification.SendFeishuNotification(message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil { -// log.Printf("Error sending exception notification:%v", err) -// } -// return true -// } -// return false -//} -// -//func checkFailedBackup(backupPolicyName, namespace string) (bool, error) { -// databaseName := getPrefix(backupPolicyName) -// podList, err := api.ClientSet.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{}) -// if err != nil { -// return false, err -// } -// for _, pod := range podList.Items { -// if strings.HasPrefix(pod.GetName(), databaseName) { -// return true, nil -// } -// } -// return false, nil -//} -// -//func getPrefix(backupPolicyName string) string { -// parts := strings.Split(backupPolicyName, "-") -// if len(parts) < 3 { -// return "" -// } -// return strings.Join(parts[:len(parts)-2], "-") -//} +func getPrefix(backupPolicyName string) string { + parts := strings.Split(backupPolicyName, "-") + if len(parts) < 3 { + return "" + } + return strings.Join(parts[:len(parts)-3], "-") +} diff --git a/service/exceptionmonitor/helper/monitor/database_monitor.go b/service/exceptionmonitor/helper/monitor/database_monitor.go index de1d3ada2dc..31b6f0aaa41 100644 --- a/service/exceptionmonitor/helper/monitor/database_monitor.go +++ b/service/exceptionmonitor/helper/monitor/database_monitor.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/api/errors" + "github.com/labring/sealos/service/exceptionmonitor/api" "github.com/labring/sealos/service/exceptionmonitor/helper/notification" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,6 +37,7 @@ var ( func DatabaseExceptionMonitor() { for api.DatabaseMonitor { + checkDeletedDatabases() if err := checkDatabases(api.ClusterNS); err != nil { log.Printf("Failed to check databases: %v", err) } @@ -42,6 +45,15 @@ func DatabaseExceptionMonitor() { } } +func checkDeletedDatabases() { + for databaseName := range api.LastDatabaseClusterStatus { + cluster, err := api.DynamicClient.Resource(databaseClusterGVR).Namespace(api.DatabaseNamespaceMap[databaseName]).Get(context.Background(), databaseName, metav1.GetOptions{}) + if cluster == nil && errors.IsNotFound(err) { + handleClusterRecovery(databaseName, "", "Deleted") + } + } +} + func checkDatabases(namespaces []string) error { if api.MonitorType == api.MonitorTypeALL { if err := checkDatabasesInNamespace(""); err != nil { @@ -89,6 +101,7 @@ func processCluster(cluster metav1unstructured.Unstructured) { case api.StatusUnknown: if _, ok := api.LastDatabaseClusterStatus[databaseClusterName]; !ok { api.LastDatabaseClusterStatus[databaseClusterName] = status + api.DatabaseNamespaceMap[databaseClusterName] = namespace api.ExceptionDatabaseMap[databaseClusterName] = true alertMessage, feishuWebHook, notification := prepareAlertMessage(databaseClusterName, namespace, status, "", "status is empty", 0) if err := sendAlert(alertMessage, feishuWebHook, notification); err != nil { @@ -122,11 +135,13 @@ func cleanClusterStatus(databaseClusterName string) { delete(api.DiskFullNamespaceMap, databaseClusterName) delete(api.FeishuWebHookMap, databaseClusterName) delete(api.ExceptionDatabaseMap, databaseClusterName) + delete(api.DatabaseNamespaceMap, databaseClusterName) } func handleClusterException(databaseClusterName, namespace, databaseType, status string) { if _, ok := api.LastDatabaseClusterStatus[databaseClusterName]; !ok && !api.DebtNamespaceMap[namespace] { api.LastDatabaseClusterStatus[databaseClusterName] = status + api.DatabaseNamespaceMap[databaseClusterName] = namespace api.ExceptionDatabaseMap[databaseClusterName] = true if err := processClusterException(databaseClusterName, namespace, databaseType, status); err != nil { log.Printf("Failed to process cluster %s exception in ns %s: %v", databaseClusterName, namespace, err) @@ -206,7 +221,7 @@ func prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, data feishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLOther"] notificationInfo.Reason = "disk is full" alertMessage = notification.GetNotificationMessage(notificationInfo) - notification.CreateNotification(namespace, databaseClusterName, status, "disk is full") + notification.CreateNotification(namespace, databaseClusterName, status, "disk is full", "磁盘满了") } api.DiskFullNamespaceMap[databaseClusterName] = true } @@ -229,6 +244,6 @@ func notifyQuotaExceeded(databaseClusterName, namespace, status, debtLevel strin NotificationType: "exception", } alertMessage := notification.GetNotificationMessage(notificationInfo) - notification.CreateNotification(namespace, databaseClusterName, status, api.ExceededQuotaException) + notification.CreateNotification(namespace, databaseClusterName, status, api.ExceededQuotaException, "Quato满了") return notification.SendFeishuNotification(notificationInfo, alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLOther"]) } diff --git a/service/exceptionmonitor/helper/notification/desktop.go b/service/exceptionmonitor/helper/notification/desktop.go index f3d72bab560..bc2c626c4d9 100644 --- a/service/exceptionmonitor/helper/notification/desktop.go +++ b/service/exceptionmonitor/helper/notification/desktop.go @@ -31,7 +31,7 @@ func randString(n int) (string, error) { return string(b), nil } -func CreateNotification(namespace, name, status, notificationMessage string) { +func CreateNotification(namespace, name, status, notificationMessage, zhNotificationMessage string) { gvr := schema.GroupVersionResource{ Group: "notification.sealos.io", Version: "v1", @@ -40,7 +40,8 @@ func CreateNotification(namespace, name, status, notificationMessage string) { randomSuffix, _ := randString(5) now := time.Now().UTC().Unix() - message := fmt.Sprintf("database : %s is %s. Please check in time.", name, status) + message := fmt.Sprintf("Because %s , Database %s current status : %s , Please check in time.", notificationMessage, name, status) + zhMessage := fmt.Sprintf("因为 %s , 数据库 %s 当前状态 : %s , 请及时检查.", zhNotificationMessage, name, status) notification := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "notification.sealos.io/v1", @@ -56,10 +57,10 @@ func CreateNotification(namespace, name, status, notificationMessage string) { "importance": "High", "desktopPopup": true, "i18ns": map[string]interface{}{ - "en": map[string]interface{}{ - "title": "Database Exception", - "message": notificationMessage, - "from": "Database Exception", + "zh": map[string]interface{}{ + "title": "数据库异常告警", + "message": zhMessage, + "from": "数据库异常", }, }, }, diff --git a/service/exceptionmonitor/helper/notification/feishu.go b/service/exceptionmonitor/helper/notification/feishu.go index 54c7322e744..90d419af832 100644 --- a/service/exceptionmonitor/helper/notification/feishu.go +++ b/service/exceptionmonitor/helper/notification/feishu.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "regexp" + "time" "github.com/labring/sealos/service/exceptionmonitor/api" lark "github.com/larksuite/oapi-sdk-go/v3" @@ -137,6 +138,16 @@ func GetNotificationMessage(notificationInfo Info) string { } elements = append(elements, exceptionElements...) } + exceptionElements := []map[string]interface{}{ + { + "tag": "div", + "text": map[string]string{ + "content": fmt.Sprintf("数据库恢复时间:%s", time.Now().Add(8*time.Hour).Format("2006-01-02 15:04:05")), + "tag": "lark_md", + }, + }, + } + elements = append(elements, exceptionElements...) } card := map[string]interface{}{ "config": map[string]bool{ @@ -201,8 +212,14 @@ func updateFeishuNotification(messageID, message string) error { Body(larkim.NewPatchMessageReqBodyBuilder(). Content(message).Build()).Build() + fmt.Println(messageID) resp, err := feiShuClient.Im.Message.Patch(context.Background(), req) - if err != nil || !resp.Success() { + if err != nil { + log.Println("Error:", err) + return err + } + + if !resp.Success() { log.Println("Error:", resp.Code, resp.Msg, resp.RequestId()) return err } @@ -219,8 +236,12 @@ func createFeishuNotification(notification Info, message, feishuWebHook string, resp, err := feiShuClient.Im.Message.Create(context.Background(), req) - if err != nil || !resp.Success() { - fmt.Println(111) + if err != nil { + log.Println("Error:", err) + return err + } + + if !resp.Success() { log.Println("Error:", resp.Code, resp.Msg, resp.RequestId()) return err } @@ -235,6 +256,7 @@ func createFeishuNotification(notification Info, message, feishuWebHook string, } else { messageIDMap[notification.DatabaseClusterName] = messageID } + fmt.Println(messageIDMap) return nil } diff --git a/service/exceptionmonitor/main.go b/service/exceptionmonitor/main.go index 00990fa736e..943d49fdc12 100644 --- a/service/exceptionmonitor/main.go +++ b/service/exceptionmonitor/main.go @@ -22,12 +22,12 @@ func main() { } func initialize() error { - notification.InitFeishuClient() if err := api.GetENV(); err != nil { return err } if err := client.InitClient(); err != nil { return err } + notification.InitFeishuClient() return dao.InitCockroachDB() }