Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: basic finish service cluster api #43

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 221 additions & 38 deletions api/dataplane/v1/service/service_cluster_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"io"
"net"
"sync"
"time"
Expand All @@ -17,17 +18,17 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

type frontierNservice struct {
type frontierNend struct {
frontier *clusterv1.Frontier
service Service
end *serviceEnd
}

type serviceClusterEnd struct {
*delegate.UnimplementedDelegate
cc clusterv1.ClusterServiceClient

bimap *mapmap.BiMap // bidirectional edgeID and frontierID
frontiers sync.Map // key: frontierID; value: frontierNservice
edgefrontiers *mapmap.BiMap // bidirectional edgeID and frontierID
frontiers sync.Map // key: frontierID; value: frontierNservice

// options
*serviceOption
Expand All @@ -52,7 +53,7 @@ func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEn
}
cc := clusterv1.NewClusterServiceClient(conn)

serviceClusterEnd := &serviceClusterEnd{
end := &serviceClusterEnd{
cc: cc,
serviceOption: &serviceOption{},
rpcs: map[string]geminio.RPC{},
Expand All @@ -61,71 +62,69 @@ func newServiceClusterEnd(addr string, opts ...ServiceOption) (*serviceClusterEn
acceptMsgCh: make(chan geminio.Message, 128),
closed: make(chan struct{}),
}
serviceClusterEnd.serviceOption.delegate = serviceClusterEnd
end.serviceOption.delegate = end

for _, opt := range opts {
opt(serviceClusterEnd.serviceOption)
opt(end.serviceOption)
}
if serviceClusterEnd.serviceOption.logger == nil {
serviceClusterEnd.serviceOption.logger = armlog.DefaultLog
if end.serviceOption.logger == nil {
end.serviceOption.logger = armlog.DefaultLog
}
return serviceClusterEnd, nil
return end, nil
}

func (service *serviceClusterEnd) start() {
func (end *serviceClusterEnd) start() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := service.update()
err := end.update()
if err != nil {
service.logger.Warnf("cluster update err: %s", err)
end.logger.Warnf("cluster update err: %s", err)
continue
}
case <-service.closed:
case <-end.closed:
return
}
}
}

func (service *serviceClusterEnd) clear(frontierID string) {
service.updating.Lock()
defer service.updating.Unlock()
func (end *serviceClusterEnd) clear(frontierID string) {
end.updating.Lock()
defer end.updating.Unlock()

frontier, ok := service.frontiers.LoadAndDelete(frontierID)
frontier, ok := end.frontiers.LoadAndDelete(frontierID)
if ok {
frontier.(*frontierNservice).service.Close()
frontier.(*frontierNend).end.Close()
}
// clear map for edgeID and frontierID
end.edgefrontiers.DelValue(frontierID)
}

func (service *serviceClusterEnd) update() error {
rsp, err := service.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{})
func (end *serviceClusterEnd) update() error {
rsp, err := end.cc.ListFrontiers(context.TODO(), &clusterv1.ListFrontiersRequest{})
if err != nil {
service.logger.Errorf("list frontiers err: %s", err)
end.logger.Errorf("list frontiers err: %s", err)
return err
}

service.updating.Lock()
defer service.updating.Unlock()

keeps := []string{}
removes := []Service{}
removes := []*frontierNend{}

service.frontiers.Range(func(key, value interface{}) bool {
end.frontiers.Range(func(key, value interface{}) bool {
frontierID := key.(string)
frontierNservice := value.(*frontierNservice)
frontierNend := value.(*frontierNend)
for _, frontier := range rsp.Frontiers {
if frontierEqual(frontierNservice.frontier, frontier) {
if frontierEqual(frontierNend.frontier, frontier) {
keeps = append(keeps, frontierID)
return true
}
}
// out of date frontier
service.logger.Debugf("frontier: %v needs to be removed", key)
service.frontiers.Delete(key)
removes = append(removes, frontierNservice.service)
end.logger.Debugf("frontier: %v needs to be removed", key)
end.frontiers.Delete(key)
removes = append(removes, frontierNend)
return true
})

Expand All @@ -144,27 +143,80 @@ FOUND:
// aysnc connect and close
go func() {
for _, remove := range removes {
remove.Close()
remove.end.Close()
// clear unavaiable frontier and it's edges
end.edgefrontiers.DelValue(remove.frontier.FrontierId)
}
for _, new := range news {
serviceEnd, err := service.newServiceEnd(new.AdvertisedSbAddr)
serviceEnd, err := end.newServiceEnd(new.AdvertisedSbAddr)
if err != nil {
service.logger.Errorf("new service end err: %s", err)
end.logger.Errorf("new service end err: %s", err)
continue
}
// new frontier
prev, ok := service.frontiers.Swap(new.FrontierId, &frontierNservice{
prev, ok := end.frontiers.Swap(new.FrontierId, &frontierNend{
frontier: new,
service: serviceEnd,
end: serviceEnd,
})
if ok {
prev.(*frontierNservice).service.Close()
prev.(*frontierNend).end.Close()
}
}
}()
return nil
}

func (end *serviceClusterEnd) lookup(edgeID uint64) (string, *serviceEnd, error) {
var (
frontier *clusterv1.Frontier
serviceEnd *serviceEnd
err error
)
frontierID, ok := end.edgefrontiers.GetValue(edgeID)
// get or set edgeID to frontierID map
if !ok {
rsp, err := end.cc.GetFrontierByEdge(context.TODO(), &clusterv1.GetFrontierByEdgeIDRequest{
EdgeId: edgeID,
})
if err != nil {
end.logger.Errorf("get frontier by edge: %d err: %s", edgeID, err)
return "", nil, err
}
frontier = rsp.Fontier
frontierID = frontier.FrontierId
end.edgefrontiers.Set(edgeID, frontierID)
}

fe, ok := end.frontiers.Load(frontierID)
if !ok {
serviceEnd, err = end.newServiceEnd(frontier.AdvertisedSbAddr)
if err != nil {
end.logger.Errorf("new service end err: %s while lookup", err)
return "", nil, err
}
found, ok := end.frontiers.Swap(frontierID, &frontierNend{
frontier: frontier,
end: serviceEnd,
})
if ok {
found.(*frontierNend).end.Close()
}
} else {
serviceEnd = fe.(*frontierNend).end
}
return frontierID.(string), serviceEnd, nil
}

func (end *serviceClusterEnd) pickone() *serviceEnd {
var serviceEnd *serviceEnd
end.frontiers.Range(func(_, value interface{}) bool {
// return first one
serviceEnd = value.(*frontierNend).end
return false
})
return serviceEnd
}

func frontierEqual(a, b *clusterv1.Frontier) bool {
return a.AdvertisedSbAddr == b.AdvertisedEbAddr &&
a.FrontierId == b.FrontierId
Expand Down Expand Up @@ -217,3 +269,134 @@ ERR:
serviceEnd.Close()
return nil, err
}

// multiplexer
func (end *serviceClusterEnd) AcceptStream() (geminio.Stream, error) {
st, ok := <-end.acceptStreamCh
if !ok {
return nil, io.EOF
}
return st, nil
}

func (end *serviceClusterEnd) OpenStream(ctx context.Context, edgeID uint64) (geminio.Stream, error) {
frontierID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
stream, err := serviceEnd.OpenStream(ctx, edgeID)
if err != nil {
end.clear(frontierID)
return stream, err
}
return stream, nil
}

func (end *serviceClusterEnd) ListStreams() []geminio.Stream {
streams := []geminio.Stream{}
end.frontiers.Range(func(_, value interface{}) bool {
sts := value.(*frontierNend).end.ListStreams()
if sts != nil {
streams = append(streams, sts...)
}
return true
})
return streams
}

// Messager
func (end *serviceClusterEnd) NewMessage(data []byte) geminio.Message {
serviceEnd := end.pickone()
if serviceEnd == nil {
return nil
}
return serviceEnd.NewMessage(data)
}

func (end *serviceClusterEnd) Publish(ctx context.Context, edgeID uint64, msg geminio.Message) error {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return err
}
err = serviceEnd.Publish(ctx, edgeID, msg)
if err != nil {
end.clear(fronterID)
return err
}
return nil
}

func (end *serviceClusterEnd) PublishAsync(ctx context.Context, edgeID uint64, msg geminio.Message, ch chan *geminio.Publish) (*geminio.Publish, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
pub, err := serviceEnd.PublishAsync(ctx, edgeID, msg, ch)
if err != nil {
end.clear(fronterID)
return nil, err
}
return pub, err
}

func (end *serviceClusterEnd) Receive(ctx context.Context) (geminio.Message, error) {
msg, ok := <-end.acceptMsgCh
if !ok {
return nil, io.EOF
}
return msg, nil
}

// RPCer
func (end *serviceClusterEnd) NewRequest(data []byte) geminio.Request {
serviceEnd := end.pickone()
if serviceEnd == nil {
return nil
}
return serviceEnd.NewRequest(data)
}

func (end *serviceClusterEnd) Call(ctx context.Context, edgeID uint64, method string, req geminio.Request) (geminio.Response, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
rsp, err := serviceEnd.Call(ctx, edgeID, method, req)
if err != nil {
end.clear(fronterID)
return nil, err
}
return rsp, nil
}

func (end *serviceClusterEnd) CallAsync(ctx context.Context, edgeID uint64, method string, req geminio.Request, ch chan *geminio.Call) (*geminio.Call, error) {
fronterID, serviceEnd, err := end.lookup(edgeID)
if err != nil {
return nil, err
}
call, err := serviceEnd.CallAsync(ctx, edgeID, method, req, ch)
if err != nil {
end.clear(fronterID)
return nil, err
}
return call, nil
}

func (end *serviceClusterEnd) Register(ctx context.Context, method string, rpc geminio.RPC) error {
end.appMtx.Lock()
end.rpcs[method] = rpc
end.appMtx.Unlock()

var (
err error
)
// TODO optimize it
end.frontiers.Range(func(key, value interface{}) bool {
err = value.(*frontierNend).end.Register(ctx, method, rpc)
if err != nil {
return false
}
return true
})
return err
}
Loading