From f326cfb873d9437e989364f4c631a0dea1ae4d17 Mon Sep 17 00:00:00 2001 From: yangjie727 <78058281@qq.com> Date: Wed, 22 Nov 2023 13:38:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=94=A8=E6=88=B7=E5=B0=81?= =?UTF-8?q?=E9=94=81=E8=A7=A3=E5=B0=81=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 182 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 117 insertions(+), 65 deletions(-) diff --git a/main.go b/main.go index 26407a7..3ce13e6 100644 --- a/main.go +++ b/main.go @@ -456,7 +456,7 @@ func (s *serverUser) BlockUserInAccount(ctx context.Context, in *pb.BlockUserInA return nil, st.Err() } // 关联存在的情况下直接封锁账户 - blockUserCmd := fmt.Sprintf("sacctmgr -i -Q modify user where name=%s account=%s set MaxSubmitJobs=0 MaxJobs=0 MaxWall=00:00:00 GrpJobs=0 GrpSubmit=0 GrpSubmitJobs=0 MaxSubmitJobs=0 GrpWall=00:00:00", in.UserId, in.AccountName) + blockUserCmd := fmt.Sprintf("sacctmgr -i -Q modify user where name=%s account=%s set MaxSubmitJobs=0 MaxJobs=0 GrpJobs=0 GrpSubmit=0 GrpSubmitJobs=0 MaxSubmitJobs=0", in.UserId, in.AccountName) res := utils.ExecuteShellCommand(blockUserCmd) if res == 0 { return &pb.BlockUserInAccountResponse{}, nil @@ -535,7 +535,7 @@ func (s *serverUser) UnblockUserInAccount(ctx context.Context, in *pb.UnblockUse return &pb.UnblockUserInAccountResponse{}, nil } // 用户从账户中解封的操作 - unblockUserCmd := fmt.Sprintf("sacctmgr -i -Q modify user where name='%s' account='%s' set MaxSubmitJobs=-1 MaxJobs=-1 MaxWall=-1 GrpJobs=-1 GrpSubmit=-1 GrpSubmitJobs=-1 MaxSubmitJobs=-1 GrpWall=-1", in.UserId, in.AccountName) + unblockUserCmd := fmt.Sprintf("sacctmgr -i -Q modify user where name='%s' account='%s' set MaxSubmitJobs=-1 MaxJobs=-1 GrpJobs=-1 GrpSubmit=-1 GrpSubmitJobs=-1 MaxSubmitJobs=-1", in.UserId, in.AccountName) res := utils.ExecuteShellCommand(unblockUserCmd) if res == 0 { return &pb.UnblockUserInAccountResponse{}, nil @@ -1508,49 +1508,47 @@ func (s *serverConfig) GetAvailablePartitions(ctx context.Context, in *pb.GetAva func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfoRequest) (*pb.GetClusterInfoResponse, error) { var ( - gpuIdList []int - gpuId int - parts []*pb.PartitionInfo // 定义返回的类型 + gpuId int + parts []*pb.PartitionInfo // 定义返回的类型 ) // 记录日志 logger.Infof("Received request GetClusterInfo: %v", in) clusterName := configValue.MySQLConfig.ClusterName // 集群的名字 partitions, _ := utils.GetPatitionInfo() - // 查询gpu对应的id信息 - gpuSqlConfig := "SELECT id FROM tres_table WHERE type = 'gres' AND deleted = 0" - rows, err := db.Query(gpuSqlConfig) - if err != nil { - errInfo := &errdetails.ErrorInfo{ - Reason: "SQL_QUERY_FAILED", - } - st := status.New(codes.Internal, err.Error()) - st, _ = st.WithDetails(errInfo) - return nil, st.Err() - } - defer rows.Close() - for rows.Next() { - err := rows.Scan(&gpuId) - if err != nil { - errInfo := &errdetails.ErrorInfo{ - Reason: "SQL_QUERY_FAILED", - } - st := status.New(codes.Internal, err.Error()) - st, _ = st.WithDetails(errInfo) - return nil, st.Err() - } - gpuIdList = append(gpuIdList, gpuId) - } - err = rows.Err() - if err != nil { - errInfo := &errdetails.ErrorInfo{ - Reason: "SQL_QUERY_FAILED", - } - st := status.New(codes.Internal, err.Error()) - st, _ = st.WithDetails(errInfo) - return nil, st.Err() - } + // // 查询gpu对应的id信息 + // gpuSqlConfig := "SELECT id FROM tres_table WHERE type = 'gres' AND deleted = 0" + // rows, err := db.Query(gpuSqlConfig) + // if err != nil { + // errInfo := &errdetails.ErrorInfo{ + // Reason: "SQL_QUERY_FAILED", + // } + // st := status.New(codes.Internal, err.Error()) + // st, _ = st.WithDetails(errInfo) + // return nil, st.Err() + // } + // defer rows.Close() + // for rows.Next() { + // err := rows.Scan(&gpuId) + // if err != nil { + // errInfo := &errdetails.ErrorInfo{ + // Reason: "SQL_QUERY_FAILED", + // } + // st := status.New(codes.Internal, err.Error()) + // st, _ = st.WithDetails(errInfo) + // return nil, st.Err() + // } + // gpuIdList = append(gpuIdList, gpuId) + // } + // err = rows.Err() + // if err != nil { + // errInfo := &errdetails.ErrorInfo{ + // Reason: "SQL_QUERY_FAILED", + // } + // st := status.New(codes.Internal, err.Error()) + // st, _ = st.WithDetails(errInfo) + // return nil, st.Err() + // } for _, v := range partitions { - fmt.Print(99999) var tresAlloc string var runningGpus int var idleGpus int @@ -1652,6 +1650,49 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo }) } } else { + var gpuIdList []int + // 查询gpu对应的id信息 + gpuSqlConfig := "SELECT id FROM tres_table WHERE type = 'gres' AND deleted = 0" + rows, err := db.Query(gpuSqlConfig) + if err != nil { + errInfo := &errdetails.ErrorInfo{ + Reason: "SQL_QUERY_FAILED", + } + st := status.New(codes.Internal, err.Error()) + st, _ = st.WithDetails(errInfo) + return nil, st.Err() + } + defer rows.Close() + for rows.Next() { + err := rows.Scan(&gpuId) + if err != nil { + errInfo := &errdetails.ErrorInfo{ + Reason: "SQL_QUERY_FAILED", + } + st := status.New(codes.Internal, err.Error()) + st, _ = st.WithDetails(errInfo) + return nil, st.Err() + } + gpuIdList = append(gpuIdList, gpuId) + } + err = rows.Err() + if err != nil { + errInfo := &errdetails.ErrorInfo{ + Reason: "SQL_QUERY_FAILED", + } + st := status.New(codes.Internal, err.Error()) + st, _ = st.WithDetails(errInfo) + return nil, st.Err() + } + if len(gpuIdList) == 0 { + // 没有设置gpu配置 + errInfo := &errdetails.ErrorInfo{ + Reason: "GRES_NOT_FOUND", + } + st := status.New(codes.NotFound, "The gres set error.") + st, _ = st.WithDetails(errInfo) + return nil, st.Err() + } // 第三列是gpu卡的信息 singerNodeGpusInfo := strings.Split(gpuInfo, ":") singerNodeGpus := singerNodeGpusInfo[len(singerNodeGpusInfo)-1] // 获取最后一个元素 @@ -1659,7 +1700,8 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo totalGpus = singerNodeGpusInt * totalNodes // 总的GPU卡数 // 排队作业统计 pdJobNumSqlConfig := fmt.Sprintf("SELECT count(*) FROM %s_job_table WHERE state=0 AND `partition` = ?", clusterName) - err := db.QueryRow(pdJobNumSqlConfig, v).Scan(&pdJobNum) + // err := db.QueryRow(pdJobNumSqlConfig, v).Scan(&pdJobNum) + err = db.QueryRow(pdJobNumSqlConfig, v).Scan(&pdJobNum) if err != nil { errInfo := &errdetails.ErrorInfo{ Reason: err.Error(), @@ -1669,19 +1711,19 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo return nil, st.Err() } // 正在运行的作业不只要统计数量,还要统计GPU卡的使用情况 - runningJobNumSqlConfig := fmt.Sprintf("SELECT tres_alloc FROM %s_job_table WHERE state=1 AND `partition` = ?", clusterName) - err = db.QueryRow(runningJobNumSqlConfig, v).Scan(&tresAlloc) // 查询到多个数据 + runningJobNumSql := fmt.Sprintf("SELECT count(*) FROM %s_job_table WHERE state=1 AND `partition` = ?", clusterName) + err = db.QueryRow(runningJobNumSql, v).Scan(&runningJobNum) if err != nil { errInfo := &errdetails.ErrorInfo{ - Reason: "SQL_QUERY_FAILED", + Reason: err.Error(), } - st := status.New(codes.Internal, err.Error()) + st := status.New(codes.NotFound, err.Error()) st, _ = st.WithDetails(errInfo) return nil, st.Err() } - defer rows.Close() - for rows.Next() { - err := rows.Scan(&gpuId) + if runningJobNum != 0 { + runningJobNumSqlConfig := fmt.Sprintf("SELECT tres_alloc FROM %s_job_table WHERE state=1 AND `partition` = ?", clusterName) + rows, err = db.Query(runningJobNumSqlConfig, v) // 查询到多个数据 if err != nil { errInfo := &errdetails.ErrorInfo{ Reason: "SQL_QUERY_FAILED", @@ -1690,31 +1732,41 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo st, _ = st.WithDetails(errInfo) return nil, st.Err() } - tresAllocList = append(tresAllocList, tresAlloc) // 记录了所有的gpu作业的信息 - } - err = rows.Err() - if err != nil { - errInfo := &errdetails.ErrorInfo{ - Reason: "SQL_QUERY_FAILED", + defer rows.Close() + for rows.Next() { + err := rows.Scan(&tresAlloc) + if err != nil { + errInfo := &errdetails.ErrorInfo{ + Reason: "SQL_QUERY_FAILED", + } + st := status.New(codes.Internal, err.Error()) + st, _ = st.WithDetails(errInfo) + return nil, st.Err() + } + tresAllocList = append(tresAllocList, tresAlloc) // 记录了所有的gpu作业的信息 + } + err = rows.Err() + if err != nil { + errInfo := &errdetails.ErrorInfo{ + Reason: "SQL_QUERY_FAILED", + } + st := status.New(codes.Internal, err.Error()) + st, _ = st.WithDetails(errInfo) + return nil, st.Err() } - st := status.New(codes.Internal, err.Error()) - st, _ = st.WithDetails(errInfo) - return nil, st.Err() - } - if len(tresAllocList) == 0 { - // 没有运行中的GPU作业 - idleGpus = idleNodes * singerNodeGpusInt // 空闲的GPU卡 - noAvailableGpus = noAvailableNodes * singerNodeGpusInt - runningGpus = 0 - runningJobNum = 0 // 正在运行中的作业 - } else { // 有运行中的GPU作业 - runningJobNum = len(tresAllocList) // 运行中GPU的作业数 + logger.Infof("%v %v xxx", tresAllocList, v) for _, resValue := range tresAllocList { + logger.Infof("%v", resValue) runningGpus += int(utils.GetGpuAllocsFromGpuIdList(resValue, gpuIdList)) } + logger.Infof("runningGpus %v ", runningGpus) noAvailableGpus = noAvailableNodes * singerNodeGpusInt idleGpus = totalGpus - runningGpus - noAvailableGpus + } else { + runningGpus = 0 + noAvailableGpus = noAvailableNodes * singerNodeGpusInt + idleGpus = idleNodes * singerNodeGpusInt } resultRatio := float64(runningNodes) / float64(totalNodes) percentage := int(resultRatio * 100) // 保留整数