Skip to content

Commit

Permalink
Merge pull request #43 from singchia/feat/controller
Browse files Browse the repository at this point in the history
api: basic finish service cluster api
  • Loading branch information
singchia committed Apr 26, 2024
2 parents a718b5a + c9e5b8d commit 5681483
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 71 deletions.
259 changes: 221 additions & 38 deletions api/dataplane/v1/service/service_cluster_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"io"
"net"
"sync"
"time"
Expand All @@ -17,17 +18,17 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

type frontierNservice struct {
type frontierNend struct {
frontier *clusterv1.Frontier
service Service
end *serviceEnd
}

type serviceClusterEnd struct {
*delegate.UnimplementedDelegate
cc clusterv1.ClusterServiceClient

bimap *mapmap.BiMap // bidirectional edgeID and frontierID
frontiers sync.Map // key: frontierID; value: frontierNservice
edgefrontiers *mapmap.BiMap // bidirectional edgeID and frontierID
frontiers sync.Map // key: frontierID; value: frontierNservice

// options
*serviceOption
Expand All @@ -52,7 +53,7 @@ func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEn
}
cc := clusterv1.NewClusterServiceClient(conn)

serviceClusterEnd := &serviceClusterEnd{
end := &serviceClusterEnd{
cc: cc,
serviceOption: &serviceOption{},
rpcs: map[string]geminio.RPC{},
Expand All @@ -61,71 +62,69 @@ func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEn
acceptMsgCh: make(chan geminio.Message, 128),
closed: make(chan struct{}),
}
serviceClusterEnd.serviceOption.delegate = serviceClusterEnd
end.serviceOption.delegate = end

for _, opt := range opts {
opt(serviceClusterEnd.serviceOption)
opt(end.serviceOption)
}
if serviceClusterEnd.serviceOption.logger == nil {
serviceClusterEnd.serviceOption.logger = armlog.DefaultLog
if end.serviceOption.logger == nil {
end.serviceOption.logger = armlog.DefaultLog
}
return serviceClusterEnd, nil
return end, nil
}

func (service *serviceClusterEnd) start() {
func (end *serviceClusterEnd) start() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := service.update()
err := end.update()
if err != nil {
service.logger.Warnf("cluster update err: %s", err)
end.logger.Warnf("cluster update err: %s", err)
continue
}
case <-service.closed:
case <-end.closed:
return
}
}
}

func (service *serviceClusterEnd) clear(frontierID string) {
service.updating.Lock()
defer service.updating.Unlock()
func (end *serviceClusterEnd) clear(frontierID string) {
end.updating.Lock()
defer end.updating.Unlock()

frontier, ok := service.frontiers.LoadAndDelete(frontierID)
frontier, ok := end.frontiers.LoadAndDelete(frontierID)
if ok {
frontier.(*frontierNservice).service.Close()
frontier.(*frontierNend).end.Close()
}
// clear map for edgeID and frontierID
end.edgefrontiers.DelValue(frontierID)
}

func (service *serviceClusterEnd) update() error {
rsp, err := service.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{})
func (end *serviceClusterEnd) update() error {
rsp, err := end.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{})
if err != nil {
service.logger.Errorf("list frontiers err: %s", err)
end.logger.Errorf("list frontiers err: %s", err)
return err
}

service.updating.Lock()
defer service.updating.Unlock()

keeps := []string{}
removes := []Service{}
removes := []*frontierNend{}

service.frontiers.Range(func(key, value interface{}) bool {
end.frontiers.Range(func(key, value interface{}) bool {
frontierID := key.(string)
frontierNservice := value.(*frontierNservice)
frontierNend := value.(*frontierNend)
for _, frontier := range rsp.Frontiers {
if frontierEqual(frontierNservice.frontier, frontier) {
if frontierEqual(frontierNend.frontier, frontier) {
keeps = append(keeps, frontierID)
return true
}
}
// out of date frontier
service.logger.Debugf("frontier: %v needs to be removed", key)
service.frontiers.Delete(key)
removes = append(removes, frontierNservice.service)
end.logger.Debugf("frontier: %v needs to be removed", key)
end.frontiers.Delete(key)
removes = append(removes, frontierNend)
return true
})

Expand All @@ -144,27 +143,80 @@ FOUND:
// aysnc connect and close
go func() {
for _, remove := range removes {
remove.Close()
remove.end.Close()
// clear unavaiable frontier and it's edges
end.edgefrontiers.DelValue(remove.frontier.FrontierId)
}
for _, new := range news {
serviceEnd, err := service.newServiceEnd(new.AdvertisedSbAddr)
serviceEnd, err := end.newServiceEnd(new.AdvertisedSbAddr)
if err != nil {
service.logger.Errorf("new service end err: %s", err)
end.logger.Errorf("new service end err: %s", err)
continue
}
// new frontier
prev, ok := service.frontiers.Swap(new.FrontierId, &frontierNservice{
prev, ok := end.frontiers.Swap(new.FrontierId, &frontierNend{
frontier: new,
service: serviceEnd,
end: serviceEnd,
})
if ok {
prev.(*frontierNservice).service.Close()
prev.(*frontierNend).end.Close()
}
}
}()
return nil
}

func (end *serviceClusterEnd) lookup(edgeID uint64) (string, *serviceEnd, error) {
var (
frontier *clusterv1.Frontier
serviceEnd *serviceEnd
err error
)
frontierID, ok := end.edgefrontiers.GetValue(edgeID)
// get or set edgeID to frontierID map
if !ok {
rsp, err := end.cc.GetFrontierByEdge(context.TODO(), &clusterv1.GetFrontierByEdgeIDRequest{
EdgeId: edgeID,
})
if err != nil {
end.logger.Errorf("get frontier by edge: %d err: %s", edgeID, err)
return "", nil, err
}
frontier = rsp.Fontier
frontierID = frontier.FrontierId
end.edgefrontiers.Set(edgeID, frontierID)
}

fe, ok := end.frontiers.Load(frontierID)
if !ok {
serviceEnd, err = end.newServiceEnd(frontier.AdvertisedSbAddr)
if err != nil {
end.logger.Errorf("new service end err: %s while lookup", err)
return "", nil, err
}
found, ok := end.frontiers.Swap(frontierID, &frontierNend{
frontier: frontier,
end: serviceEnd,
})
if ok {
found.(*frontierNend).end.Close()
}
} else {
serviceEnd = fe.(*frontierNend).end
}
return frontierID.(string), serviceEnd, nil
}

func (end *serviceClusterEnd) pickone() *serviceEnd {
var serviceEnd *serviceEnd
end.frontiers.Range(func(_, value interface{}) bool {
// return first one
serviceEnd = value.(*frontierNend).end
return false
})
return serviceEnd
}

func frontierEqual(a, b *clusterv1.Frontier) bool {
return a.AdvertisedSbAddr == b.AdvertisedEbAddr &&
a.FrontierId == b.FrontierId
Expand Down Expand Up @@ -217,3 +269,134 @@ ERR:
serviceEnd.Close()
return nil, err
}

// multiplexer
func (end *serviceClusterEnd) AcceptStream() (geminio.Stream, error) {
st, ok := <-end.acceptStreamCh
if !ok {
return nil, io.EOF
}
return st, nil
}

func (end *serviceClusterEnd) OpenStream(ctx context.Context, edgeID uint64) (geminio.Stream, error) {
frontierID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
stream, err := serviceEnd.OpenStream(ctx, edgeID)
if err != nil {
end.clear(frontierID)
return stream, err
}
return stream, nil
}

func (end *serviceClusterEnd) ListStreams() []geminio.Stream {
streams := []geminio.Stream{}
end.frontiers.Range(func(_, value interface{}) bool {
sts := value.(*frontierNend).end.ListStreams()
if sts != nil {
streams = append(streams, sts...)
}
return true
})
return streams
}

// Messager
func (end *serviceClusterEnd) NewMessage(data []byte) geminio.Message {
serviceEnd := end.pickone()
if serviceEnd == nil {
return nil
}
return serviceEnd.NewMessage(data)
}

func (end *serviceClusterEnd) Publish(ctx context.Context, edgeID uint64, msg geminio.Message) error {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return err
}
err = serviceEnd.Publish(ctx, edgeID, msg)
if err != nil {
end.clear(fronterID)
return err
}
return nil
}

func (end *serviceClusterEnd) PublishAsync(ctx context.Context, edgeID uint64, msg geminio.Message, ch chan *geminio.Publish) (*geminio.Publish, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
pub, err := serviceEnd.PublishAsync(ctx, edgeID, msg, ch)
if err != nil {
end.clear(fronterID)
return nil, err
}
return pub, err
}

func (end *serviceClusterEnd) Receive(ctx context.Context) (geminio.Message, error) {
msg, ok := <-end.acceptMsgCh
if !ok {
return nil, io.EOF
}
return msg, nil
}

// RPCer
func (end *serviceClusterEnd) NewRequest(data []byte) geminio.Request {
serviceEnd := end.pickone()
if serviceEnd == nil {
return nil
}
return serviceEnd.NewRequest(data)
}

func (end *serviceClusterEnd) Call(ctx context.Context, edgeID uint64, method string, req geminio.Request) (geminio.Response, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
rsp, err := serviceEnd.Call(ctx, edgeID, method, req)
if err != nil {
end.clear(fronterID)
return nil, err
}
return rsp, nil
}

func (end *serviceClusterEnd) CallAsync(ctx context.Context, edgeID uint64, method string, req geminio.Request, ch chan *geminio.Call) (*geminio.Call, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
call, err := serviceEnd.CallAsync(ctx, edgeID, method, req, ch)
if err != nil {
end.clear(fronterID)
return nil, err
}
return call, nil
}

func (end *serviceClusterEnd) Register(ctx context.Context, method string, rpc geminio.RPC) error {
end.appMtx.Lock()
end.rpcs[method] = rpc
end.appMtx.Unlock()

var (
err error
)
// TODO optimize it
end.frontiers.Range(func(key, value interface{}) bool {
err = value.(*frontierNend).end.Register(ctx, method, rpc)
if err != nil {
return false
}
return true
})
return err
}
Loading

0 comments on commit 5681483

Please sign in to comment.