Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft PR #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
*.log
bin/*
.vscode
external/curve-go-rpc/*
external/website/*
docker/pigeon
docker/website
Expand Down
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[submodule "external/curve-go-rpc"]
path = external/curve-go-rpc
url = git@github.com:SeanHai/curve-go-rpc.git
[submodule "external/website"]
path = external/website
url = git@github.com:opencurve/curve-dashboard.git
6 changes: 3 additions & 3 deletions api/curvebs/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
package agent

import (
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
metrics "github.com/opencurve/curve-manager/internal/metrics/core"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/curve-manager/internal/snapshotclone"
"github.com/opencurve/curve-manager/internal/storage"
"github.com/opencurve/pigeon"
Expand Down Expand Up @@ -75,7 +75,7 @@ func InitClients(logger *pigeon.Logger) error {
}

// init mds rpc client
bsrpc.Init(clusterAddrs.Addrs)
bshttp.Init(clusterAddrs.Addrs)

// init metric client
metrics.Init(clusterAddrs.Addrs)
Expand All @@ -86,7 +86,7 @@ func InitClients(logger *pigeon.Logger) error {
if currentClusterId <= 0 && clusterAddrs.ClusterId > 0 {
currentClusterId = clusterAddrs.ClusterId
return initAlerts(alertExpirationDays, logger)
}
}
if currentClusterId > 0 {
stopAlertTasks()
currentClusterId = clusterAddrs.ClusterId
Expand Down
6 changes: 3 additions & 3 deletions api/curvebs/agent/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

Expand All @@ -47,7 +47,7 @@ type ClusterStatus struct {
func GetClusterSpace(l *pigeon.Logger, rId string) (interface{}, errno.Errno) {
result := Space{}
// get logical pools form mds
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
l.Error("GetClusterSpace bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
Expand Down Expand Up @@ -111,7 +111,7 @@ func GetClusterPerformance(r *pigeon.Request, start, end, interval uint64) (inte
func GetClusterStatus(l *pigeon.Logger, rId string) interface{} {
clusterStatus := ClusterStatus{}
// 1. get pool numbers in cluster
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
clusterStatus.Healthy = false
clusterStatus.PoolNum = 0
Expand Down
12 changes: 6 additions & 6 deletions api/curvebs/agent/copyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

set "github.com/deckarep/golang-set/v2"
"github.com/opencurve/curve-manager/internal/common"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
)

const (
Expand Down Expand Up @@ -121,7 +121,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error {
if err != nil {
return err
}
copysets, err := bsrpc.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port))
copysets, err := bshttp.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port))
if err != nil {
return fmt.Errorf("GetCopySetsInChunkServer failed, %s", err)
}
Expand All @@ -134,7 +134,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error {
if len(copysets) > 0 {
logicalPoolId = copysets[0].LogicalPoolId
}
memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
if err != nil {
return fmt.Errorf("GetChunkServerListInCopySets failed, %s", err)
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (cs *Copyset) ifChunkServerInCopysets(csAddr string, groupIds *set.Set[stri
logicalPoolId = getPoolIdFormGroupId(ngid)
copysetIds = append(copysetIds, getCopysetIdFromGroupId(ngid))
}
memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
if err != nil {
return nil, fmt.Errorf("GetChunkServerListInCopySets failed, %s", err)
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func (cs *Copyset) checkCopysetsOnChunkServer(csAddr string, status []map[string

func (cs *Copyset) checkCopysetsWithMds() (bool, error) {
// get copysets in cluster
csInfos, err := bsrpc.GMdsClient.GetCopySetsInCluster()
csInfos, err := bshttp.GMdsClient.GetCopySetsInCluster()
if err != nil {
return false, fmt.Errorf("GetCopySetsInCluster failed, %s", err)
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func (cs *Copyset) checkCopysetsWithMds() (bool, error) {
func (cs *Copyset) checkCopysetsInCluster() (bool, error) {
healthy := true
// 2.1 get chunkservers in cluster
chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster()
chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster()
if err != nil {
return false, fmt.Errorf("GetChunkServerInCluster failed, %s", err)
}
Expand Down
7 changes: 3 additions & 4 deletions api/curvebs/agent/service_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ package agent

import (
"fmt"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"

"github.com/SeanHai/curve-go-rpc/rpc/curvebs"
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

Expand Down Expand Up @@ -91,7 +90,7 @@ func GetSnapShotCloneServerStatus(r *pigeon.Request) (interface{}, errno.Errno)
func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errno) {
var result ChunkServerStatus
// get chunkserver form mds
chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster()
chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster()
if err != nil {
l.Error("GetChunkServerStatus bsrpc.GetChunkServerInCluster failed",
pigeon.Field("error", err),
Expand All @@ -103,7 +102,7 @@ func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errn
var endponits []string
for _, cs := range chunkservers {
endpoint := fmt.Sprintf("%s:%d", cs.HostIp, cs.Port)
if cs.OnlineStatus == curvebs.ONLINE_STATUS {
if cs.OnlineStatus == bshttp.ONLINE_STATUS {
online += 1
endponits = append(endponits, endpoint)
} else {
Expand Down
40 changes: 40 additions & 0 deletions api/curvebs/agent/test_mdsLeader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package agent

import (
"encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
"github.com/opencurve/curve-manager/internal/common"
"net/url"
"testing"
)

func TestMdsClient_ListPhysicalPool_http(t *testing.T) {

}
func TestGetCurrentClusterServicesAddr() (clusterServicesAddr, error) {
ret := clusterServicesAddr{}
httpClient := common.GetHttpClient()
url := (&url.URL{
Scheme: "http",
Host: "127.0.0.1:11000",
Path: "/",
RawQuery: fmt.Sprintf("%s=%s", "method", CLUSTER_SERVICES_ADDRESS),
}).String()

resp, err := resty.NewWithClient(httpClient).R().
SetHeader("Connection", "Keep-Alive").
SetHeader("Content-Type", "application/json").
SetHeader("User-Agent", "Curve-Manager").
Execute("GET", url)
if err != nil {
return ret, fmt.Errorf("getClusterServicesAddr failed: %v", err)
}

respStruct := admHttpResponse{}
err = json.Unmarshal([]byte(resp.String()), &respStruct)
if err != nil {
return ret, fmt.Errorf("Unmarshal getClusterServicesAddr response failed, resp = %s, err = %v", resp.String(), err)
}
return respStruct.Data, nil
}
18 changes: 9 additions & 9 deletions api/curvebs/agent/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
package agent

import (
"github.com/opencurve/curve-manager/internal/http/curvebs"
"sort"

"github.com/SeanHai/curve-go-rpc/rpc/curvebs"
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
metricomm "github.com/opencurve/curve-manager/internal/metrics/common"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

Expand Down Expand Up @@ -92,7 +92,7 @@ func listChunkServer(pools *[]Pool, size int) error {
for zIndex, zone := range pool.Zones {
for sIndex, server := range zone.Servers {
go func(id uint32, addr *Server) {
chunkservers, err := bsrpc.GMdsClient.ListChunkServer(id)
chunkservers, err := bshttp.GMdsClient.ListChunkServer(id)
ret <- common.QueryResult{
Key: addr,
Result: chunkservers,
Expand Down Expand Up @@ -125,7 +125,7 @@ func listZoneServer(pools *[]Pool, size int) error {
for pIndex, pool := range *pools {
for zIndex, zone := range pool.Zones {
go func(id uint32, addr *Zone) {
servers, err := bsrpc.GMdsClient.ListZoneServer(id)
servers, err := bshttp.GMdsClient.ListZoneServer(id)
ret <- common.QueryResult{
Key: addr,
Result: servers,
Expand Down Expand Up @@ -165,7 +165,7 @@ func listPoolZone(pools *[]Pool) error {
number := 0
for index, pool := range *pools {
go func(id uint32, addr *Pool) {
zones, err := bsrpc.GMdsClient.ListPoolZone(id)
zones, err := bshttp.GMdsClient.ListPoolZone(id)
ret <- common.QueryResult{
Key: addr,
Result: zones,
Expand Down Expand Up @@ -194,7 +194,7 @@ func listPoolZone(pools *[]Pool) error {

func getPoolSpace(pools *[]PoolInfo) error {
// get can be recycled space
_, recycledSize, err := bsrpc.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR)
_, recycledSize, err := bshttp.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR)
if err != nil {
return err
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func sortTopology(pools []Pool) {
func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) {
result := []PoolInfo{}
// get info from mds
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
r.Logger().Error("ListLogicalPool bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
Expand Down Expand Up @@ -353,7 +353,7 @@ func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) {
}

func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint64) (interface{}, errno.Errno) {
pool, err := bsrpc.GMdsClient.GetLogicalPool(poolId)
pool, err := bshttp.GMdsClient.GetLogicalPool(poolId)
if err != nil {
r.Logger().Error("GetLogicalPool bsrpc.GetLogicalPool failed",
pigeon.Field("poolId", poolId),
Expand Down Expand Up @@ -411,7 +411,7 @@ func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint6

func ListTopology(r *pigeon.Request) (interface{}, errno.Errno) {
result := []Pool{}
logicalPools, err := bsrpc.GMdsClient.ListLogicalPool()
logicalPools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
r.Logger().Error("ListTopology bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
Expand Down
Loading