Skip to content

Commit

Permalink
update feishu notification
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyxjh committed Jul 31, 2024
1 parent 8d9d070 commit 945c314
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 84 deletions.
1 change: 1 addition & 0 deletions service/exceptionmonitor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
MemMonitorNamespaceMap = make(map[string]bool)
LastBackupStatusMap = make(map[string]string)
IsSendBackupStatusMap = make(map[string]string)
DatabaseNamespaceMap = make(map[string]string)
ExceededQuotaException = "exceeded quota"
DiskException = "Writing to log file failed"
OwnerLabel = "user.sealos.io/owner"
Expand Down
102 changes: 30 additions & 72 deletions service/exceptionmonitor/helper/monitor/database_backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package monitor

import (
"context"
"fmt"
"log"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"github.com/labring/sealos/service/exceptionmonitor/api"
"github.com/labring/sealos/service/exceptionmonitor/helper/notification"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,7 +29,7 @@ func DatabaseBackupMonitor() {
if err := checkDatabaseBackups(); err != nil {
log.Printf("Failed to check database backups: %v", err)
}
time.Sleep(1 * time.Hour)
time.Sleep(1 * time.Minute)
}
}

Expand All @@ -50,6 +54,23 @@ func processBackup(backup unstructured.Unstructured) {
if !found || status != "Failed" {
return
}
fmt.Println(backupName, namespace)
debt, _, _ := checkDebt(namespace)
if !debt {
return
}
backupPolicyName, _, _ := unstructured.NestedString(backup.Object, "spec", "backupPolicyName")
databaseName := getPrefix(backupPolicyName)
fmt.Println(databaseName)
cluster, err := api.DynamicClient.Resource(databaseClusterGVR).Namespace(namespace).Get(context.Background(), databaseName, metav1.GetOptions{})
if cluster != nil && errors.IsNotFound(err) {
return
}
dbStatus, _, _ := unstructured.NestedString(cluster.Object, "status", "phase")
if dbStatus == "Stopped" {
return
}
fmt.Println(dbStatus)
notificationInfo := notification.Info{
DatabaseClusterName: backupName,
Namespace: namespace,
Expand All @@ -60,82 +81,19 @@ func processBackup(backup unstructured.Unstructured) {
}
if _, ok := api.LastBackupStatusMap[backupName]; !ok {
message := notification.GetBackupMessage("exception", namespace, backupName, status, startTime, "")
fmt.Println(message)
if err := notification.SendFeishuNotification(notificationInfo, message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil {
log.Printf("Error sending exception notification:%v", err)
}
} else {
api.LastBackupStatusMap[backupName] = status
}
//backupPolicyName, found, err := unstructured.NestedString(backup.Object, "spec", "backupPolicyName")
//if err != nil || !found {
// log.Printf("Unable to get %s backupPolicyName in ns %s:%v", backupName, namespace, err)
// return
//}
//handleBackupStatus(backupName, namespace, status, startTime, backupPolicyName)
}

//func handleBackupStatus(backupName, namespace, status, startTime, backupPolicyName string) {
// //if status == "Completed" {
// // handleBackupCompletion(backupName, namespace, status, startTime)
// // return
// //}
// //if CheckackupFailure(backupName, namespace, status, backupPolicyName) {
// // err := api.DynamicClient.Resource(backupGVR).Namespace(namespace).Delete(context.Background(), backupName, metav1.DeleteOptions{})
// // if err != nil {
// // log.Printf("Failed to delete%s in ns %s:%v", backupName, namespace, err)
// // }
// //}
//
//}

//func handleBackupCompletion(backupName, namespace, status, startTime string) {
// if _, ok := api.LastBackupStatusMap[backupName]; ok {
// message := notification.GetBackupMessage("recovery", namespace, backupName, status, startTime, "")
// if err := notification.SendFeishuNotification(message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil {
// log.Printf("Error sending recovery notification:%v", err)
// }
// delete(api.LastBackupStatusMap, backupName)
// delete(api.IsSendBackupStatusMap, backupName)
// }
//}
//
//func CheckackupFailure(backupName, namespace, status, backupPolicyName string) bool {
// if _, ok := api.LastBackupStatusMap[backupName]; !ok {
// api.LastBackupStatusMap[backupName] = status
// return false
// }
// if _, ok := api.IsSendBackupStatusMap[backupName]; ok {
// return false
// }
//
// if ok, _ := checkFailedBackup(backupPolicyName, namespace); ok {
// message := notification.GetBackupMessage("exception", namespace, backupName, "Failed", "", "")
// if err := notification.SendFeishuNotification(message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil {
// log.Printf("Error sending exception notification:%v", err)
// }
// return true
// }
// return false
//}
//
//func checkFailedBackup(backupPolicyName, namespace string) (bool, error) {
// databaseName := getPrefix(backupPolicyName)
// podList, err := api.ClientSet.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{})
// if err != nil {
// return false, err
// }
// for _, pod := range podList.Items {
// if strings.HasPrefix(pod.GetName(), databaseName) {
// return true, nil
// }
// }
// return false, nil
//}
//
//func getPrefix(backupPolicyName string) string {
// parts := strings.Split(backupPolicyName, "-")
// if len(parts) < 3 {
// return ""
// }
// return strings.Join(parts[:len(parts)-2], "-")
//}
func getPrefix(backupPolicyName string) string {
parts := strings.Split(backupPolicyName, "-")
if len(parts) < 3 {
return ""
}
return strings.Join(parts[:len(parts)-3], "-")
}
19 changes: 17 additions & 2 deletions service/exceptionmonitor/helper/monitor/database_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"github.com/labring/sealos/service/exceptionmonitor/api"
"github.com/labring/sealos/service/exceptionmonitor/helper/notification"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -35,13 +37,23 @@ var (

func DatabaseExceptionMonitor() {
for api.DatabaseMonitor {
checkDeletedDatabases()
if err := checkDatabases(api.ClusterNS); err != nil {
log.Printf("Failed to check databases: %v", err)
}
time.Sleep(5 * time.Minute)
}
}

func checkDeletedDatabases() {
for databaseName := range api.LastDatabaseClusterStatus {
cluster, err := api.DynamicClient.Resource(databaseClusterGVR).Namespace(api.DatabaseNamespaceMap[databaseName]).Get(context.Background(), databaseName, metav1.GetOptions{})
if cluster == nil && errors.IsNotFound(err) {
handleClusterRecovery(databaseName, "", "Deleted")
}
}
}

func checkDatabases(namespaces []string) error {
if api.MonitorType == api.MonitorTypeALL {
if err := checkDatabasesInNamespace(""); err != nil {
Expand Down Expand Up @@ -89,6 +101,7 @@ func processCluster(cluster metav1unstructured.Unstructured) {
case api.StatusUnknown:
if _, ok := api.LastDatabaseClusterStatus[databaseClusterName]; !ok {
api.LastDatabaseClusterStatus[databaseClusterName] = status
api.DatabaseNamespaceMap[databaseClusterName] = namespace
api.ExceptionDatabaseMap[databaseClusterName] = true
alertMessage, feishuWebHook, notification := prepareAlertMessage(databaseClusterName, namespace, status, "", "status is empty", 0)
if err := sendAlert(alertMessage, feishuWebHook, notification); err != nil {
Expand Down Expand Up @@ -122,11 +135,13 @@ func cleanClusterStatus(databaseClusterName string) {
delete(api.DiskFullNamespaceMap, databaseClusterName)
delete(api.FeishuWebHookMap, databaseClusterName)
delete(api.ExceptionDatabaseMap, databaseClusterName)
delete(api.DatabaseNamespaceMap, databaseClusterName)
}

func handleClusterException(databaseClusterName, namespace, databaseType, status string) {
if _, ok := api.LastDatabaseClusterStatus[databaseClusterName]; !ok && !api.DebtNamespaceMap[namespace] {
api.LastDatabaseClusterStatus[databaseClusterName] = status
api.DatabaseNamespaceMap[databaseClusterName] = namespace
api.ExceptionDatabaseMap[databaseClusterName] = true
if err := processClusterException(databaseClusterName, namespace, databaseType, status); err != nil {
log.Printf("Failed to process cluster %s exception in ns %s: %v", databaseClusterName, namespace, err)
Expand Down Expand Up @@ -206,7 +221,7 @@ func prepareAlertMessage(databaseClusterName, namespace, status, debtLevel, data
feishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLOther"]
notificationInfo.Reason = "disk is full"
alertMessage = notification.GetNotificationMessage(notificationInfo)
notification.CreateNotification(namespace, databaseClusterName, status, "disk is full")
notification.CreateNotification(namespace, databaseClusterName, status, "disk is full", "磁盘满了")
}
api.DiskFullNamespaceMap[databaseClusterName] = true
}
Expand All @@ -229,6 +244,6 @@ func notifyQuotaExceeded(databaseClusterName, namespace, status, debtLevel strin
NotificationType: "exception",
}
alertMessage := notification.GetNotificationMessage(notificationInfo)
notification.CreateNotification(namespace, databaseClusterName, status, api.ExceededQuotaException)
notification.CreateNotification(namespace, databaseClusterName, status, api.ExceededQuotaException, "Quato满了")
return notification.SendFeishuNotification(notificationInfo, alertMessage, api.FeishuWebhookURLMap["FeishuWebhookURLOther"])
}
13 changes: 7 additions & 6 deletions service/exceptionmonitor/helper/notification/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func randString(n int) (string, error) {
return string(b), nil
}

func CreateNotification(namespace, name, status, notificationMessage string) {
func CreateNotification(namespace, name, status, notificationMessage, zhNotificationMessage string) {
gvr := schema.GroupVersionResource{
Group: "notification.sealos.io",
Version: "v1",
Expand All @@ -40,7 +40,8 @@ func CreateNotification(namespace, name, status, notificationMessage string) {

randomSuffix, _ := randString(5)
now := time.Now().UTC().Unix()
message := fmt.Sprintf("database : %s is %s. Please check in time.", name, status)
message := fmt.Sprintf("Because %s , Database %s current status : %s , Please check in time.", notificationMessage, name, status)
zhMessage := fmt.Sprintf("因为 %s , 数据库 %s 当前状态 : %s , 请及时检查.", zhNotificationMessage, name, status)
notification := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "notification.sealos.io/v1",
Expand All @@ -56,10 +57,10 @@ func CreateNotification(namespace, name, status, notificationMessage string) {
"importance": "High",
"desktopPopup": true,
"i18ns": map[string]interface{}{
"en": map[string]interface{}{
"title": "Database Exception",
"message": notificationMessage,
"from": "Database Exception",
"zh": map[string]interface{}{
"title": "数据库异常告警",
"message": zhMessage,
"from": "数据库异常",
},
},
},
Expand Down
28 changes: 25 additions & 3 deletions service/exceptionmonitor/helper/notification/feishu.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"regexp"
"time"

"github.com/labring/sealos/service/exceptionmonitor/api"
lark "github.com/larksuite/oapi-sdk-go/v3"
Expand Down Expand Up @@ -137,6 +138,16 @@ func GetNotificationMessage(notificationInfo Info) string {
}
elements = append(elements, exceptionElements...)
}
exceptionElements := []map[string]interface{}{
{
"tag": "div",
"text": map[string]string{
"content": fmt.Sprintf("数据库恢复时间:%s", time.Now().Add(8*time.Hour).Format("2006-01-02 15:04:05")),
"tag": "lark_md",
},
},
}
elements = append(elements, exceptionElements...)
}
card := map[string]interface{}{
"config": map[string]bool{
Expand Down Expand Up @@ -201,8 +212,14 @@ func updateFeishuNotification(messageID, message string) error {
Body(larkim.NewPatchMessageReqBodyBuilder().
Content(message).Build()).Build()

fmt.Println(messageID)
resp, err := feiShuClient.Im.Message.Patch(context.Background(), req)
if err != nil || !resp.Success() {
if err != nil {
log.Println("Error:", err)
return err
}

if !resp.Success() {
log.Println("Error:", resp.Code, resp.Msg, resp.RequestId())
return err
}
Expand All @@ -219,8 +236,12 @@ func createFeishuNotification(notification Info, message, feishuWebHook string,

resp, err := feiShuClient.Im.Message.Create(context.Background(), req)

if err != nil || !resp.Success() {
fmt.Println(111)
if err != nil {
log.Println("Error:", err)
return err
}

if !resp.Success() {
log.Println("Error:", resp.Code, resp.Msg, resp.RequestId())
return err
}
Expand All @@ -235,6 +256,7 @@ func createFeishuNotification(notification Info, message, feishuWebHook string,
} else {
messageIDMap[notification.DatabaseClusterName] = messageID
}
fmt.Println(messageIDMap)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion service/exceptionmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func main() {
}

func initialize() error {
notification.InitFeishuClient()
if err := api.GetENV(); err != nil {
return err
}
if err := client.InitClient(); err != nil {
return err
}
notification.InitFeishuClient()
return dao.InitCockroachDB()
}

0 comments on commit 945c314

Please sign in to comment.