From 102d00eecdc4d963aa6c75fc9ce1eb9f9db22f32 Mon Sep 17 00:00:00 2001 From: singchia Date: Sat, 20 Apr 2024 23:53:40 +0800 Subject: [PATCH 1/4] frontier: add cluster service functions --- .../v1/service/service_cluster_end.go | 43 ++++++++++++++++++- .../cluster/service/frontier_service.go | 11 ++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/api/dataplane/v1/service/service_cluster_end.go b/api/dataplane/v1/service/service_cluster_end.go index 55f221d..a2c6937 100644 --- a/api/dataplane/v1/service/service_cluster_end.go +++ b/api/dataplane/v1/service/service_cluster_end.go @@ -1,3 +1,44 @@ package service -type serviceClusterEnd struct{} +import ( + "context" + "sync" + + clusterv1 "github.com/singchia/frontier/api/controlplane/frontlas/v1" + "github.com/singchia/frontier/pkg/mapmap" + "github.com/singchia/geminio" +) + +type frontierNservice struct { + frontier *clusterv1.Frontier + service Service +} + +type serviceClusterEnd struct { + cc clusterv1.ClusterServiceClient + + bimap *mapmap.BiMap // bidirectional edgeID and frontierID + frontiers sync.Map // key: frontierID; value: frontierNservice + + // options + *serviceOption + rpcs map[string]geminio.RPC + rpcMtx sync.RWMutex + + // update + updating sync.RWMutex + + // fan-in channels + acceptStreamCh chan geminio.Stream + acceptMsgCh chan geminio.Message +} + +func (service *serviceClusterEnd) update() error { + rsp, err := service.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{}) + if err != nil { + service.logger.Errorf("list frontiers err: %s", err) + return err + } + + return nil +} diff --git a/pkg/frontlas/cluster/service/frontier_service.go b/pkg/frontlas/cluster/service/frontier_service.go index 60b8861..a491f23 100644 --- a/pkg/frontlas/cluster/service/frontier_service.go +++ b/pkg/frontlas/cluster/service/frontier_service.go @@ -4,7 +4,6 @@ import ( "context" v1 "github.com/singchia/frontier/api/controlplane/frontlas/v1" - "github.com/singchia/frontier/pkg/frontlas/apis" "github.com/singchia/frontier/pkg/frontlas/repo" "k8s.io/klog/v2" ) @@ -92,7 +91,15 @@ func (cs *ClusterService) listFrontiers(_ context.Context, req *v1.ListFrontiers }, nil } - return nil, apis.ErrIllegalRequest + frontiers, err := cs.repo.GetAllFrontiers() + if err != nil { + klog.Errorf("cluster service list frontier, get all frontiers err: %s", err) + return nil, err + } + v1frontiers := transferFrontiers(frontiers) + return &v1.ListFrontiersResponse{ + Frontiers: v1frontiers, + }, nil } func transferFrontiers(frontiers []*repo.Frontier) []*v1.Frontier { From b77d6d76f4436c0b8e872f2fe399de0933369967 Mon Sep 17 00:00:00 2001 From: singchia Date: Sat, 20 Apr 2024 23:54:05 +0800 Subject: [PATCH 2/4] mapmap: add basic bimap structure --- pkg/mapmap/bimap.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 pkg/mapmap/bimap.go diff --git a/pkg/mapmap/bimap.go b/pkg/mapmap/bimap.go new file mode 100644 index 0000000..4473b6f --- /dev/null +++ b/pkg/mapmap/bimap.go @@ -0,0 +1,24 @@ +package mapmap + +import "sync" + +type BiMap struct { + mtx sync.RWMutex + kv map[any]any + vk map[any]any +} + +func NewBiMap() *BiMap { + return &BiMap{ + kv: map[any]any{}, + vk: map[any]any{}, + } +} + +func (bm *BiMap) Set(key, value any) { + bm.mtx.Lock() + defer bm.mtx.Unlock() + + bm.kv[key] = value + bm.vk[value] = key +} From 50966b4eb3a955ccf1c089438b4bda43a202c689 Mon Sep 17 00:00:00 2001 From: singchia Date: Mon, 22 Apr 2024 23:23:39 +0800 Subject: [PATCH 3/4] frontier: add update logic --- .../v1/service/service_cluster_end.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/api/dataplane/v1/service/service_cluster_end.go b/api/dataplane/v1/service/service_cluster_end.go index a2c6937..95190e1 100644 --- a/api/dataplane/v1/service/service_cluster_end.go +++ b/api/dataplane/v1/service/service_cluster_end.go @@ -40,5 +40,33 @@ func (service *serviceClusterEnd) update() error { return err } + service.updating.Lock() + defer service.updating.Unlock() + + alive := []string{} + service.frontiers.Range(func(key, value interface{}) bool { + frontierID := key.(string) + frontierNservice := value.(*frontierNservice) + for _, frontier := range rsp.Frontiers { + if frontierID == frontier.FrontierId { + if !frontierEqual(frontierNservice.frontier, frontier) { + frontierNservice.frontier = frontier + // servicebound addr changed, but we will reconnect later + } + alive = append(alive, frontierID) + return true + } + } + + // out of date frontier + service.logger.Debugf("frontier: %v needs to be removed", key) + return true + }) + return nil } + +func frontierEqual(a, b *clusterv1.Frontier) bool { + return a.AdvertisedSbAddr == b.AdvertisedEbAddr && + a.FrontierId == b.FrontierId +} From d5f4a6701cd8abc42e1519fa346bd706676cd6c4 Mon Sep 17 00:00:00 2001 From: singchia Date: Wed, 24 Apr 2024 21:01:47 +0800 Subject: [PATCH 4/4] api: add update and clear logic --- .../v1/service/service_cluster_end.go | 165 +++++++++++++++++- go.mod | 1 + go.sum | 2 + 3 files changed, 159 insertions(+), 9 deletions(-) diff --git a/api/dataplane/v1/service/service_cluster_end.go b/api/dataplane/v1/service/service_cluster_end.go index 95190e1..04b464e 100644 --- a/api/dataplane/v1/service/service_cluster_end.go +++ b/api/dataplane/v1/service/service_cluster_end.go @@ -2,11 +2,19 @@ package service import ( "context" + "net" "sync" + "time" + armlog "github.com/jumboframes/armorigo/log" + + mapset "github.com/deckarep/golang-set/v2" clusterv1 "github.com/singchia/frontier/api/controlplane/frontlas/v1" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/geminio" + "github.com/singchia/geminio/delegate" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type frontierNservice struct { @@ -15,6 +23,7 @@ type frontierNservice struct { } type serviceClusterEnd struct { + *delegate.UnimplementedDelegate cc clusterv1.ClusterServiceClient bimap *mapmap.BiMap // bidirectional edgeID and frontierID @@ -23,7 +32,8 @@ type serviceClusterEnd struct { // options *serviceOption rpcs map[string]geminio.RPC - rpcMtx sync.RWMutex + topics mapset.Set[string] + appMtx sync.RWMutex // update updating sync.RWMutex @@ -31,6 +41,63 @@ type serviceClusterEnd struct { // fan-in channels acceptStreamCh chan geminio.Stream acceptMsgCh chan geminio.Message + + closed chan struct{} +} + +func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEnd, error) { + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + cc := clusterv1.NewClusterServiceClient(conn) + + serviceClusterEnd := &serviceClusterEnd{ + cc: cc, + serviceOption: &serviceOption{}, + rpcs: map[string]geminio.RPC{}, + topics: mapset.NewSet[string](), + acceptStreamCh: make(chan geminio.Stream, 128), + acceptMsgCh: make(chan geminio.Message, 128), + closed: make(chan struct{}), + } + serviceClusterEnd.serviceOption.delegate = serviceClusterEnd + + for _, opt := range opts { + opt(serviceClusterEnd.serviceOption) + } + if serviceClusterEnd.serviceOption.logger == nil { + serviceClusterEnd.serviceOption.logger = armlog.DefaultLog + } + return serviceClusterEnd, nil +} + +func (service *serviceClusterEnd) start() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := service.update() + if err != nil { + service.logger.Warnf("cluster update err: %s", err) + continue + } + case <-service.closed: + return + } + } +} + +func (service *serviceClusterEnd) clear(frontierID string) { + service.updating.Lock() + defer service.updating.Unlock() + + frontier, ok := service.frontiers.LoadAndDelete(frontierID) + if ok { + frontier.(*frontierNservice).service.Close() + } + // clear map for edgeID and frontierID } func (service *serviceClusterEnd) update() error { @@ -43,26 +110,58 @@ func (service *serviceClusterEnd) update() error { service.updating.Lock() defer service.updating.Unlock() - alive := []string{} + keeps := []string{} + removes := []Service{} + service.frontiers.Range(func(key, value interface{}) bool { frontierID := key.(string) frontierNservice := value.(*frontierNservice) for _, frontier := range rsp.Frontiers { - if frontierID == frontier.FrontierId { - if !frontierEqual(frontierNservice.frontier, frontier) { - frontierNservice.frontier = frontier - // servicebound addr changed, but we will reconnect later - } - alive = append(alive, frontierID) + if frontierEqual(frontierNservice.frontier, frontier) { + keeps = append(keeps, frontierID) return true } } - // out of date frontier service.logger.Debugf("frontier: %v needs to be removed", key) + service.frontiers.Delete(key) + removes = append(removes, frontierNservice.service) return true }) + news := []*clusterv1.Frontier{} +FOUND: + for _, frontier := range rsp.Frontiers { + for _, keep := range keeps { + if frontier.FrontierId == keep { + continue FOUND + } + } + // new frontier + news = append(news, frontier) + } + + // aysnc connect and close + go func() { + for _, remove := range removes { + remove.Close() + } + for _, new := range news { + serviceEnd, err := service.newServiceEnd(new.AdvertisedSbAddr) + if err != nil { + service.logger.Errorf("new service end err: %s", err) + continue + } + // new frontier + prev, ok := service.frontiers.Swap(new.FrontierId, &frontierNservice{ + frontier: new, + service: serviceEnd, + }) + if ok { + prev.(*frontierNservice).service.Close() + } + } + }() return nil } @@ -70,3 +169,51 @@ func frontierEqual(a, b *clusterv1.Frontier) bool { return a.AdvertisedSbAddr == b.AdvertisedEbAddr && a.FrontierId == b.FrontierId } + +func (service *serviceClusterEnd) newServiceEnd(addr string) (*serviceEnd, error) { + dialer := func() (net.Conn, error) { + return net.Dial("tcp", addr) + } + serviceEnd, err := newServiceEnd(dialer, + OptionServiceLog(service.serviceOption.logger), + OptionServiceDelegate(service.serviceOption.delegate), + OptionServiceName(service.serviceOption.service), + OptionServiceReceiveTopics(service.serviceOption.topics), + OptionServiceTimer(service.serviceOption.tmr)) + if err != nil { + return nil, err + } + go func() { + for { + st, err := serviceEnd.AcceptStream() + if err != nil { + return + } + service.acceptStreamCh <- st + } + }() + go func() { + for { + msg, err := serviceEnd.Receive(context.TODO()) + if err != nil { + return + } + service.acceptMsgCh <- msg + } + }() + + service.appMtx.RLock() + defer service.appMtx.RUnlock() + + // rpcs + for method, rpc := range service.rpcs { + err = serviceEnd.Register(context.TODO(), method, rpc) + if err != nil { + goto ERR + } + } + +ERR: + serviceEnd.Close() + return nil, err +} diff --git a/go.mod b/go.mod index 4ffa143..d5485cb 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ go 1.20 require ( github.com/IBM/sarama v1.43.0 github.com/alicebob/miniredis/v2 v2.32.1 + github.com/deckarep/golang-set/v2 v2.6.0 github.com/go-kratos/kratos/v2 v2.7.2 github.com/google/uuid v1.6.0 github.com/jumboframes/armorigo v0.4.0-rc.1 diff --git a/go.sum b/go.sum index 388f176..1b08e98 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=