Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

servicebound: add service manager test #14

Merged
merged 3 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/v1/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Multiplexer interface {
ListStreams() []geminio.Stream
}

// controller functions
// Controller functions
type GetEdgeID func(meta []byte) (uint64, error)
type EdgeOnline func(edgeID uint64, meta []byte, addr net.Addr) error
type EdgeOffline func(edgeID uint64, meta []byte, addr net.Addr) error
Expand All @@ -50,6 +50,7 @@ type ControlRegister interface {
RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error
}

// Service
type Service interface {
// Service can direct Message or RPC
RPCMessager
Expand Down
8 changes: 4 additions & 4 deletions api/v1/service/service_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/json"
"strconv"

"github.com/singchia/frontier/pkg/proto"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"github.com/singchia/geminio/client"
"github.com/singchia/geminio/options"
Expand All @@ -32,7 +32,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er
sopts.SetLog(sopt.logger)
}
// meta
meta := &proto.Meta{}
meta := &api.Meta{}
if sopt.topics != nil {
// we deliver topics in meta
meta.Topics = sopt.topics
Expand Down Expand Up @@ -74,7 +74,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE

func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error {
return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
on := &proto.OnEdgeOnline{}
on := &api.OnEdgeOnline{}
err := json.Unmarshal(req.Data(), on)
if err != nil {
// shouldn't be here
Expand All @@ -92,7 +92,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed

func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error {
return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) {
off := &proto.OnEdgeOffline{}
off := &api.OnEdgeOffline{}
err := json.Unmarshal(req.Data(), off)
if err != nil {
// shouldn't be here
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package api

import "errors"

var (
ErrEdgeNotOnline = errors.New("edge not online")
)
46 changes: 46 additions & 0 deletions pkg/api/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package api

import (
"net"

"github.com/singchia/geminio"
)

type Exchange interface {
// rpc, message and raw io to edge
ForwardToEdge(*Meta, geminio.End)
// stream to edge
StreamToEdge(geminio.Stream)
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
StreamToService(geminio.Stream)
}

// edge related
type Edgebound interface {
ListEdges() []geminio.End
// for management
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
}

type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}

// service related
type Servicebound interface {
ListService() []geminio.End
// for management
GetService(service string) geminio.End
DelSerivces(service string) error
}

type ServiceInformer interface {
ServiceOnline(serviceID uint64, service string, addr net.Addr)
ServiceOffline(serviceID uint64, service string, addr net.Addr)
ServiceHeartbeat(serviceID uint64, service string, addr net.Addr)
}
2 changes: 1 addition & 1 deletion pkg/proto/proto.go → pkg/api/proto.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proto
package api

// frontier -> service
type OnEdgeOnline struct {
Expand Down
29 changes: 5 additions & 24 deletions pkg/edgebound/edge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/jumboframes/armorigo/rproxy"
"github.com/jumboframes/armorigo/synchub"
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/frontier/pkg/config"
"github.com/singchia/frontier/pkg/mapmap"
"github.com/singchia/frontier/pkg/repo/dao"
Expand All @@ -24,31 +25,11 @@ import (
"k8s.io/klog/v2"
)

type Edgebound interface {
ListEdges() []geminio.End
// for management
GetEdgeByID(edgeID uint64) geminio.End
DelEdgeByID(edgeID uint64) error
}

type EdgeInformer interface {
EdgeOnline(edgeID uint64, meta []byte, addr net.Addr)
EdgeOffline(edgeID uint64, meta []byte, addr net.Addr)
EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr)
}

type Exchange interface {
// rpc, message and raw io to service
ForwardToService(geminio.End)
// stream to service
StreamToService(geminio.Stream)
}

type edgeManager struct {
*delegate.UnimplementedDelegate

informer EdgeInformer
exchange Exchange
informer api.EdgeInformer
exchange api.Exchange
conf *config.Configuration
// edgeID allocator
idFactory id.IDFactory
Expand All @@ -74,8 +55,8 @@ type edgeManager struct {
}

// support for tls, mtls and tcp listening
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer EdgeInformer,
exchange Exchange, tmr timer.Timer) (*edgeManager, error) {
func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer,
exchange api.Exchange, tmr timer.Timer) (*edgeManager, error) {
listen := &conf.Edgebound.Listen
var (
ln net.Listener
Expand Down
20 changes: 10 additions & 10 deletions pkg/edgebound/edge_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func TestEdgeManager(t *testing.T) {
return
}

h := &handler{
inf := &informer{
wg: new(sync.WaitGroup),
}
h.wg.Add(2)
inf.wg.Add(2)
// edge manager
em, err := newEdgeManager(conf, dao, h, nil, timer.NewTimer())
em, err := newEdgeManager(conf, dao, inf, nil, timer.NewTimer())
if err != nil {
t.Error(err)
return
Expand All @@ -53,20 +53,20 @@ func TestEdgeManager(t *testing.T) {
return
}
edge.Close()
h.wg.Wait()
inf.wg.Wait()
// if the test failed, it will timeout
}

type handler struct {
type informer struct {
wg *sync.WaitGroup
}

func (h *handler) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) {
h.wg.Done()
func (inf *informer) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) {
inf.wg.Done()
}

func (h *handler) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) {
h.wg.Done()
func (inf *informer) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) {
inf.wg.Done()
}

func (h *handler) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {}
func (inf *informer) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {}
10 changes: 10 additions & 0 deletions pkg/exchange/exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package exchange

import (
"github.com/singchia/frontier/pkg/api"
)

type exchange struct {
Edgebound api.Edgebound
Servicebound api.Servicebound
}
103 changes: 103 additions & 0 deletions pkg/exchange/forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package exchange

import (
"context"
"encoding/binary"
"io"

"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"k8s.io/klog/v2"
)

func (ex *exchange) ForwardToEdge(meta *api.Meta, end geminio.End) {
// raw
ex.forwardRawToEdge(end)
// message
ex.forwardMessageToEdge(end)
// rpc
ex.forwardRPCToEdge(end)
}

func (ex *exchange) forwardRawToEdge(end geminio.End) {
go func() {
klog.V(6).Infof("exchange forward raw, discard for now")
//drop the io, actually we won't be here
io.Copy(io.Discard, end)
}()
}

func (ex *exchange) forwardRPCToEdge(end geminio.End) {
// we hijack all rpc and forward them to edge
end.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) {
serviceID := end.ClientID()
// get target edgeID
custom := r1.Custom()
edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:])
r1.SetCustom(custom[:len(custom)-8])

// get edge
edge := ex.Edgebound.GetEdgeByID(edgeID)
if edge == nil {
klog.V(4).Infof("forward rpc, service: %d, call edge: %d is not online", serviceID, edgeID)
r2.SetError(api.ErrEdgeNotOnline)
return
}
// call edge
r3, err := edge.Call(ctx, method, r1)
if err != nil {
klog.V(5).Infof("forward rpc, service: %d, call edge: %d err: %s", serviceID, edgeID, err)
r2.SetError(err)
return
}
// we record the edgeID back to r2, for service
tail := make([]byte, 8)
binary.BigEndian.PutUint64(tail, edgeID)
custom = r3.Custom()
if custom == nil {
custom = tail
} else {
custom = append(custom, tail...)
}
r2.SetData(r3.Data())
r2.SetError(r3.Error())
r2.SetCustom(custom)
})
}

func (ex *exchange) forwardMessageToEdge(end geminio.End) {
serviceID := end.ClientID()
go func() {
for {
msg, err := end.Receive(context.TODO())
if err != nil {
if err == io.EOF {
klog.V(5).Infof("forward message, service: %d receive EOF", serviceID)
return
}
klog.Errorf("forward message, service: %d receive err: %s", serviceID, err)
continue
}
// get target edgeID
custom := msg.Custom()
edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:])
msg.SetCustom(custom[:len(custom)-8])

// get edge
edge := ex.Edgebound.GetEdgeByID(edgeID)
if edge == nil {
klog.V(4).Infof("forward message, service: %d, the edge: %d is not online", serviceID, edgeID)
msg.Error(api.ErrEdgeNotOnline)
return
}
// publish in sync, TODO publish in async
err = edge.Publish(context.TODO(), msg)
if err != nil {
klog.V(5).Infof("forward message, service: %d, publish edge: %d err: %s", serviceID, edgeID, err)
msg.Error(err)
return
}
msg.Done()
}
}()
}
7 changes: 4 additions & 3 deletions pkg/servicebound/service_dataplane.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicebound

import (
"github.com/singchia/frontier/pkg/api"
"github.com/singchia/geminio"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -30,11 +31,11 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) {
}

// forward to exchange
func (sm *serviceManager) forward(end geminio.End) {
func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) {
serviceID := end.ClientID()
service := end.Meta()
service := meta.Service
klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service)
if sm.exchange != nil {
sm.exchange.ForwardToService(end)
sm.exchange.ForwardToEdge(meta, end)
}
}
Loading