Skip to content

Commit

Permalink
完删集群信息接口
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjie727 committed Nov 17, 2023
1 parent 8365cb7 commit 6890380
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ARCH ?= amd64

protos:
buf generate --template buf.gen.yaml https://github.com/PKUHPC/scow-scheduler-adapter-interface.git#subdir=protos,tag=v1.2.0
buf generate --template buf.gen.yaml https://github.com/PKUHPC/scow-scheduler-adapter-interface.git#subdir=protos,tag=v1.3.0

run:
go run *.go
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# slurm adapter for SCOW

当前实现的`scow-scheduluer-adapter-interface`版本:v1.2.0
当前实现的`scow-scheduluer-adapter-interface`版本:v1.3.0

## Build

Expand Down
261 changes: 224 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bufio"
"context"
"database/sql"
"io"
Expand Down Expand Up @@ -63,38 +62,7 @@ func init() {

// version
func (s *serverVersion) GetVersion(ctx context.Context, in *pb.GetVersionRequest) (*pb.GetVersionResponse, error) {
var version string
// 记录日志
logger.Infof("Received request GetVersion: %v", in)
file, _ := os.Open("Makefile")
defer file.Close()
// 创建一个 bufio 读取器
reader := bufio.NewReader(file)
// 逐行读取文件内容
for {
line, err := reader.ReadString('\n')
if err != nil {
break // 文件读取完毕或出现错误
}

// 在这里对每一行进行解析
// 这里只是简单地打印每一行的内容,你可以根据实际需求进行解析处理
// fmt.Println("Line:", line)
// 匹配字符串
tagPresent := strings.Contains(line, "tag=")
if tagPresent {
version = line[len(line)-6:]
break
}
}
if version == "" {
return &pb.GetVersionResponse{Major: 1, Minor: 2, Patch: 0}, nil
}
list := strings.Split(version, ".")
major, _ := strconv.Atoi(list[0])
minor, _ := strconv.Atoi(list[1])
patch, _ := strconv.Atoi(list[2])
return &pb.GetVersionResponse{Major: uint32(major), Minor: uint32(minor), Patch: uint32(patch)}, nil
return &pb.GetVersionResponse{Major: 1, Minor: 2, Patch: 0}, nil
}

// UserService
Expand Down Expand Up @@ -1538,6 +1506,218 @@ func (s *serverConfig) GetAvailablePartitions(ctx context.Context, in *pb.GetAva
return &pb.GetAvailablePartitionsResponse{Partitions: parts}, nil
}

func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfoRequest) (*pb.GetClusterInfoResponse, error) {

Check failure on line 1509 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.GetClusterInfoRequest

Check failure on line 1509 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.GetClusterInfoResponse
var (
gpuIdList []int
gpuId int
parts []*pb.PartitionInfo // 定义返回的类型

Check failure on line 1513 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.PartitionInfo
)
clusterName := configValue.MySQLConfig.ClusterName // 集群的名字
partitions, _ := utils.GetPatitionInfo()
// 查询gpu对应的id信息
gpuSqlConfig := "SELECT id FROM tres_table WHERE type = 'gres' AND deleted = 0"
rows, err := db.Query(gpuSqlConfig)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&gpuId)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
gpuIdList = append(gpuIdList, gpuId)
}
err = rows.Err()
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
for _, v := range partitions {
var tresAlloc string
var runningGpus int
var idleGpus int
var noAvailableGpus int
var totalGpus int
var totalCores int
var idleCores int
var runningCores int
var noAvailableCores int
var pdJobNum int
var runningJobNum int
var tresAllocList []string
getPartitionStatusCmd := fmt.Sprintf("sinfo -p %s --noheader", v)
fullCmd := getPartitionStatusCmd + " --format='%P %c %C %G %a %D %F'"
result, _ := utils.RunCommand(fullCmd) // 状态
resultList := strings.Split(result, " ")
state := resultList[4]
gpuInfo := resultList[3]
nodeInfo := strings.Split(resultList[6], "/")
coresInfo := strings.Split(resultList[2], "/")
runningNodes, _ := strconv.Atoi(nodeInfo[0])
idleNodes, _ := strconv.Atoi(nodeInfo[1])
noAvailableNodes, _ := strconv.Atoi(nodeInfo[2])
totalNodes, _ := strconv.Atoi(nodeInfo[3])
totalCores, _ = strconv.Atoi(coresInfo[3])
runningCores, _ = strconv.Atoi(coresInfo[0])
idleCores, _ = strconv.Atoi(coresInfo[1])
noAvailableCores, _ = strconv.Atoi(coresInfo[2])
if resultList[3] == "(null)" {
// 没有GPU卡
runningGpus = 0
idleGpus = 0
noAvailableGpus = 0
totalGpus = 0
// 获取作业信息
pdJobNumSqlConfig := fmt.Sprintf("SELECT count(*) FROM %s_job_table WHERE state=0 AND partition = ?", clusterName)
err := db.QueryRow(pdJobNumSqlConfig, v).Scan(&pdJobNum)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: err.Error(),
}
st := status.New(codes.NotFound, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
runningJobNumSqlConfig := fmt.Sprintf("SELECT count(*) FROM %s_job_table WHERE state=1 AND partition = ?", clusterName)
err = db.QueryRow(runningJobNumSqlConfig, v).Scan(&runningJobNum)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: err.Error(),
}
st := status.New(codes.NotFound, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
} else {
// 第三列是gpu卡的信息
singerNodeGpusInfo := strings.Split(gpuInfo, ":")
singerNodeGpus := singerNodeGpusInfo[len(singerNodeGpusInfo)-1] // 获取最后一个元素
singerNodeGpusInt, _ := strconv.Atoi(singerNodeGpus)
totalGpus = singerNodeGpusInt * totalNodes // 总的GPU卡数
// 排队作业统计
pdJobNumSqlConfig := fmt.Sprintf("SELECT count(*) FROM %s_job_table WHERE state=0 AND partition = ?", clusterName)
err := db.QueryRow(pdJobNumSqlConfig, v).Scan(&pdJobNum)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: err.Error(),
}
st := status.New(codes.NotFound, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
// 正在运行的作业不只要统计数量,还要统计GPU卡的使用情况
runningJobNumSqlConfig := fmt.Sprintf("SELECT tres_alloc FROM %s_job_table WHERE state=1 AND partition = ?", clusterName)
err = db.QueryRow(runningJobNumSqlConfig, v).Scan(&tresAlloc) // 查询到多个数据
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&gpuId)
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
tresAllocList = append(tresAllocList, tresAlloc) // 记录了所有的gpu作业的信息
}
err = rows.Err()
if err != nil {
errInfo := &errdetails.ErrorInfo{
Reason: "SQL_QUERY_FAILED",
}
st := status.New(codes.Internal, err.Error())
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
if len(tresAllocList) == 0 {
// 没有运行中的GPU作业
idleGpus = idleNodes * singerNodeGpusInt // 空闲的GPU卡
noAvailableGpus = noAvailableNodes * singerNodeGpusInt
runningGpus = 0
runningJobNum = 0 // 正在运行中的作业
} else {
// 有运行中的GPU作业
runningJobNum = len(tresAllocList) // 运行中GPU的作业数
for _, resValue := range tresAllocList {
runningGpus += int(utils.GetGpuAllocsFromGpuIdList(resValue, gpuIdList))
}
noAvailableGpus = noAvailableNodes * singerNodeGpusInt
idleGpus = totalGpus - runningGpus - noAvailableGpus
}
resultRatio := float64(runningNodes) / float64(totalNodes)
percentage := int(resultRatio * 100) // 保留整数
if state == "up" {
parts = append(parts, &pb.PartitionInfo{

Check failure on line 1674 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.PartitionInfo
PartitionName: v,
NodeCount: uint32(totalNodes),
RunningNodeCount: uint32(runningNodes),
IdleNodeCount: uint32(idleNodes),
NotAvailableNodeCount: uint32(noAvailableNodes),
CpuCoreCount: uint32(totalCores),
RunningCpuCount: uint32(runningCores),
IdleCpuCount: uint32(idleCores),
NotAvailableCpuCount: uint32(noAvailableCores),
GpuCoreCount: uint32(totalGpus),
RunningGpuCount: uint32(runningGpus),
IdleGpuCount: uint32(idleGpus),
NotAvailableGpuCount: uint32(noAvailableGpus),
JobCount: uint32(pdJobNum) + uint32(runningJobNum),
RunningJobCount: uint32(runningJobNum),
PendingJobCount: uint32(pdJobNum),
UsageRatePercentage: uint32(percentage),
PartitionStatus: pb.PartitionInfo_AVAILABLE,

Check failure on line 1692 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.PartitionInfo_AVAILABLE
})
} else {
parts = append(parts, &pb.PartitionInfo{

Check failure on line 1695 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.PartitionInfo
PartitionName: v,
NodeCount: uint32(totalNodes),
RunningNodeCount: uint32(runningNodes),
IdleNodeCount: uint32(idleNodes),
NotAvailableNodeCount: uint32(noAvailableNodes),
CpuCoreCount: uint32(totalCores),
RunningCpuCount: uint32(runningCores),
IdleCpuCount: uint32(idleCores),
NotAvailableCpuCount: uint32(noAvailableCores),
GpuCoreCount: uint32(totalGpus),
RunningGpuCount: uint32(runningGpus),
IdleGpuCount: uint32(idleGpus),
NotAvailableGpuCount: uint32(noAvailableGpus),
JobCount: uint32(pdJobNum) + uint32(runningJobNum),
RunningJobCount: uint32(runningJobNum),
PendingJobCount: uint32(pdJobNum),
UsageRatePercentage: uint32(percentage),
PartitionStatus: pb.PartitionInfo_NOT_AVAILABLE,

Check failure on line 1713 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.PartitionInfo_NOT_AVAILABLE
})
}
}
}
return &pb.GetClusterInfoResponse{ClusterName: clusterName, Partitions: parts}, nil

Check failure on line 1718 in main.go

View workflow job for this annotation

GitHub Actions / build

undefined: pb.GetClusterInfoResponse
}

// job service
func (s *serverJob) CancelJob(ctx context.Context, in *pb.CancelJobRequest) (*pb.CancelJobResponse, error) {
var (
Expand Down Expand Up @@ -1991,6 +2171,7 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
var filterStates = in.Filter.States // 这个是筛选的
var baseStates = []string{"RUNNING", "PENDING", "SUSPEND"}
var submitUser = in.Filter.Users
var getJobInfoCmdLine string // 1117 新加
setBool := utils.IsSubSet(baseStates, filterStates)

pendingUserCmdTemp := fmt.Sprintf("squeue -t pending -u %s", strings.Join(submitUser, ","))
Expand All @@ -2001,7 +2182,13 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
}

if setBool && len(filterStates) != 0 && len(submitUser) != 0 {
getJobInfoCmdLine := fmt.Sprintf("squeue -u %s --noheader", strings.Join(submitUser, ","))
// 新增判断逻辑 1117
if len(in.Filter.Accounts) == 0 {
getJobInfoCmdLine = fmt.Sprintf("squeue -u %s --noheader", strings.Join(submitUser, ","))
} else {
getJobInfoCmdLine = fmt.Sprintf("squeue -u %s -A %s --noheader", strings.Join(submitUser, ","), strings.Join(in.Filter.Accounts, ","))
}
// getJobInfoCmdLine := fmt.Sprintf("squeue -u %s --noheader", strings.Join(submitUser, ","))
// getFullCmdLine := getJobInfoCmdLine + " " + "--format='%a %A %C %D %j %l %m %M %P %q %S %T %u %V %Z %n %N' | tr '\n' ','"
getFullCmdLine := getJobInfoCmdLine + " " + "--format='%b %a %A %C %D %j %l %m %M %P %q %S %T %u %V %Z %n %N' | tr '\n' ','"
runningjobInfo, _ := utils.RunCommand(getFullCmdLine)
Expand Down Expand Up @@ -2661,9 +2848,9 @@ func (s *serverJob) SubmitJob(ctx context.Context, in *pb.SubmitJobRequest) (*pb
homedirTemp, _ := utils.GetUserHomedir(in.UserId)
homedir = homedirTemp + "/" + in.WorkingDirectory
} else {
// homedir = in.WorkingDirectory
homedirTemp, _ := utils.GetUserHomedir(in.UserId)
homedir = homedirTemp + "/" + in.WorkingDirectory
homedir = in.WorkingDirectory
// homedirTemp, _ := utils.GetUserHomedir(in.UserId)
// homedir = homedirTemp + "/" + in.WorkingDirectory
}

// scriptString += "#SBATCH " + "--chdir=" + in.WorkingDirectory + "\n"
Expand Down
30 changes: 30 additions & 0 deletions tests/config/GetClusterInfo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"context"
pb "scow-slurm-adapter/gen/go"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestGetClusterInfo(t *testing.T) {
// Set up a connection to the server
conn, err := grpc.Dial("localhost:8972", grpc.WithInsecure())
if err != nil {
t.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewConfigServiceClient(conn)

// Call the Add RPC with test data
req := &pb.GetClusterInfoRequest{}
res, err := client.GetClusterInfo(context.Background(), req)
if err != nil {
t.Fatalf("GetClusterConfig failed: %v", err)
}

// Check the result
assert.IsType(t, []*pb.PartitionInfo{}, res.Partitions)
}

0 comments on commit 6890380

Please sign in to comment.