Skip to content

Commit

Permalink
Merge pull request #42 from singchia/feat/controller
Browse files Browse the repository at this point in the history
frontier: add update logic
  • Loading branch information
singchia authored Apr 25, 2024
2 parents be251cb + d5f4a67 commit a718b5a
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 3 deletions.
218 changes: 217 additions & 1 deletion api/dataplane/v1/service/service_cluster_end.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,219 @@
package service

type serviceClusterEnd struct{}
import (
"context"
"net"
"sync"
"time"

armlog "github.com/jumboframes/armorigo/log"

mapset "github.com/deckarep/golang-set/v2"
clusterv1 "github.com/singchia/frontier/api/controlplane/frontlas/v1"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/geminio"
"github.com/singchia/geminio/delegate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type frontierNservice struct {
frontier *clusterv1.Frontier
service Service
}

type serviceClusterEnd struct {
*delegate.UnimplementedDelegate
cc clusterv1.ClusterServiceClient

bimap *mapmap.BiMap // bidirectional edgeID and frontierID
frontiers sync.Map // key: frontierID; value: frontierNservice

// options
*serviceOption
rpcs map[string]geminio.RPC
topics mapset.Set[string]
appMtx sync.RWMutex

// update
updating sync.RWMutex

// fan-in channels
acceptStreamCh chan geminio.Stream
acceptMsgCh chan geminio.Message

closed chan struct{}
}

func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEnd, error) {
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
cc := clusterv1.NewClusterServiceClient(conn)

serviceClusterEnd := &serviceClusterEnd{
cc: cc,
serviceOption: &serviceOption{},
rpcs: map[string]geminio.RPC{},
topics: mapset.NewSet[string](),
acceptStreamCh: make(chan geminio.Stream, 128),
acceptMsgCh: make(chan geminio.Message, 128),
closed: make(chan struct{}),
}
serviceClusterEnd.serviceOption.delegate = serviceClusterEnd

for _, opt := range opts {
opt(serviceClusterEnd.serviceOption)
}
if serviceClusterEnd.serviceOption.logger == nil {
serviceClusterEnd.serviceOption.logger = armlog.DefaultLog
}
return serviceClusterEnd, nil
}

func (service *serviceClusterEnd) start() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := service.update()
if err != nil {
service.logger.Warnf("cluster update err: %s", err)
continue
}
case <-service.closed:
return
}
}
}

func (service *serviceClusterEnd) clear(frontierID string) {
service.updating.Lock()
defer service.updating.Unlock()

frontier, ok := service.frontiers.LoadAndDelete(frontierID)
if ok {
frontier.(*frontierNservice).service.Close()
}
// clear map for edgeID and frontierID
}

func (service *serviceClusterEnd) update() error {
rsp, err := service.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{})
if err != nil {
service.logger.Errorf("list frontiers err: %s", err)
return err
}

service.updating.Lock()
defer service.updating.Unlock()

keeps := []string{}
removes := []Service{}

service.frontiers.Range(func(key, value interface{}) bool {
frontierID := key.(string)
frontierNservice := value.(*frontierNservice)
for _, frontier := range rsp.Frontiers {
if frontierEqual(frontierNservice.frontier, frontier) {
keeps = append(keeps, frontierID)
return true
}
}
// out of date frontier
service.logger.Debugf("frontier: %v needs to be removed", key)
service.frontiers.Delete(key)
removes = append(removes, frontierNservice.service)
return true
})

news := []*clusterv1.Frontier{}
FOUND:
for _, frontier := range rsp.Frontiers {
for _, keep := range keeps {
if frontier.FrontierId == keep {
continue FOUND
}
}
// new frontier
news = append(news, frontier)
}

// aysnc connect and close
go func() {
for _, remove := range removes {
remove.Close()
}
for _, new := range news {
serviceEnd, err := service.newServiceEnd(new.AdvertisedSbAddr)
if err != nil {
service.logger.Errorf("new service end err: %s", err)
continue
}
// new frontier
prev, ok := service.frontiers.Swap(new.FrontierId, &frontierNservice{
frontier: new,
service: serviceEnd,
})
if ok {
prev.(*frontierNservice).service.Close()
}
}
}()
return nil
}

func frontierEqual(a, b *clusterv1.Frontier) bool {
return a.AdvertisedSbAddr == b.AdvertisedEbAddr &&
a.FrontierId == b.FrontierId
}

func (service *serviceClusterEnd) newServiceEnd(addr string) (*serviceEnd, error) {
dialer := func() (net.Conn, error) {
return net.Dial("tcp", addr)
}
serviceEnd, err := newServiceEnd(dialer,
OptionServiceLog(service.serviceOption.logger),
OptionServiceDelegate(service.serviceOption.delegate),
OptionServiceName(service.serviceOption.service),
OptionServiceReceiveTopics(service.serviceOption.topics),
OptionServiceTimer(service.serviceOption.tmr))
if err != nil {
return nil, err
}
go func() {
for {
st, err := serviceEnd.AcceptStream()
if err != nil {
return
}
service.acceptStreamCh <- st
}
}()
go func() {
for {
msg, err := serviceEnd.Receive(context.TODO())
if err != nil {
return
}
service.acceptMsgCh <- msg
}
}()

service.appMtx.RLock()
defer service.appMtx.RUnlock()

// rpcs
for method, rpc := range service.rpcs {
err = serviceEnd.Register(context.TODO(), method, rpc)
if err != nil {
goto ERR
}
}

ERR:
serviceEnd.Close()
return nil, err
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go 1.20
require (
github.com/IBM/sarama v1.43.0
github.com/alicebob/miniredis/v2 v2.32.1
github.com/deckarep/golang-set/v2 v2.6.0
github.com/go-kratos/kratos/v2 v2.7.2
github.com/google/uuid v1.6.0
github.com/jumboframes/armorigo v0.4.0-rc.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
Expand Down
11 changes: 9 additions & 2 deletions pkg/frontlas/cluster/service/frontier_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

v1 "github.com/singchia/frontier/api/controlplane/frontlas/v1"
"github.com/singchia/frontier/pkg/frontlas/apis"
"github.com/singchia/frontier/pkg/frontlas/repo"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -92,7 +91,15 @@ func (cs *ClusterService) listFrontiers(_ context.Context, req *v1.ListFrontiers
}, nil
}

return nil, apis.ErrIllegalRequest
frontiers, err := cs.repo.GetAllFrontiers()
if err != nil {
klog.Errorf("cluster service list frontier, get all frontiers err: %s", err)
return nil, err
}
v1frontiers := transferFrontiers(frontiers)
return &v1.ListFrontiersResponse{
Frontiers: v1frontiers,
}, nil
}

func transferFrontiers(frontiers []*repo.Frontier) []*v1.Frontier {
Expand Down
24 changes: 24 additions & 0 deletions pkg/mapmap/bimap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package mapmap

import "sync"

type BiMap struct {
mtx sync.RWMutex
kv map[any]any
vk map[any]any
}

func NewBiMap() *BiMap {
return &BiMap{
kv: map[any]any{},
vk: map[any]any{},
}
}

func (bm *BiMap) Set(key, value any) {
bm.mtx.Lock()
defer bm.mtx.Unlock()

bm.kv[key] = value
bm.vk[value] = key
}

0 comments on commit a718b5a

Please sign in to comment.