From b91ced2077eb1a0cd4824c0c9d2b2bc8a133c4fc Mon Sep 17 00:00:00 2001 From: singchia Date: Thu, 8 Feb 2024 10:17:40 +0800 Subject: [PATCH 1/3] servicebound: add service manager test --- api/v1/service/service.go | 3 ++- pkg/edgebound/edge_manager_test.go | 20 +++++++------- pkg/servicebound/service_manager.go | 1 + pkg/servicebound/service_manager_test.go | 33 +++++++++++++++++++++--- 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/api/v1/service/service.go b/api/v1/service/service.go index e8601ae..ed7bf0e 100644 --- a/api/v1/service/service.go +++ b/api/v1/service/service.go @@ -39,7 +39,7 @@ type Multiplexer interface { ListStreams() []geminio.Stream } -// controller functions +// Controller functions type GetEdgeID func(meta []byte) (uint64, error) type EdgeOnline func(edgeID uint64, meta []byte, addr net.Addr) error type EdgeOffline func(edgeID uint64, meta []byte, addr net.Addr) error @@ -50,6 +50,7 @@ type ControlRegister interface { RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error } +// Service type Service interface { // Service can direct Message or RPC RPCMessager diff --git a/pkg/edgebound/edge_manager_test.go b/pkg/edgebound/edge_manager_test.go index fb8ac2b..3917ee2 100644 --- a/pkg/edgebound/edge_manager_test.go +++ b/pkg/edgebound/edge_manager_test.go @@ -30,12 +30,12 @@ func TestEdgeManager(t *testing.T) { return } - h := &handler{ + inf := &informer{ wg: new(sync.WaitGroup), } - h.wg.Add(2) + inf.wg.Add(2) // edge manager - em, err := newEdgeManager(conf, dao, h, nil, timer.NewTimer()) + em, err := newEdgeManager(conf, dao, inf, nil, timer.NewTimer()) if err != nil { t.Error(err) return @@ -53,20 +53,20 @@ func TestEdgeManager(t *testing.T) { return } edge.Close() - h.wg.Wait() + inf.wg.Wait() // if the test failed, it will timeout } -type handler struct { +type informer struct { wg *sync.WaitGroup } -func (h *handler) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) { - h.wg.Done() +func (inf *informer) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) { + inf.wg.Done() } -func (h *handler) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) { - h.wg.Done() +func (inf *informer) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) { + inf.wg.Done() } -func (h *handler) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {} +func (inf *informer) EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) {} diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 0897464..6780f3e 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -83,6 +83,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer Servic streams: mapmap.NewMapMap(), dao: dao, shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)), + services: make(map[uint64]geminio.End), UnimplementedDelegate: &delegate.UnimplementedDelegate{}, // a simple unix timestamp incremental id factory idFactory: id.DefaultIncIDCounter, diff --git a/pkg/servicebound/service_manager_test.go b/pkg/servicebound/service_manager_test.go index 3420822..bcb2a60 100644 --- a/pkg/servicebound/service_manager_test.go +++ b/pkg/servicebound/service_manager_test.go @@ -2,14 +2,16 @@ package servicebound import ( "net" + "sync" "testing" + "github.com/singchia/frontier/api/v1/service" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/go-timer/v2" ) -func TestServiceManagerStream(t *testing.T) { +func TestServiceManager(t *testing.T) { network := "tcp" addr := "0.0.0.0:1202" @@ -26,8 +28,12 @@ func TestServiceManagerStream(t *testing.T) { t.Error(err) return } + inf := &informer{ + wg: new(sync.WaitGroup), + } + inf.wg.Add(2) // service manager - sm, err := newServiceManager(conf, dao, nil, nil, timer.NewTimer()) + sm, err := newServiceManager(conf, dao, inf, nil, timer.NewTimer()) if err != nil { t.Error(err) return @@ -36,8 +42,29 @@ func TestServiceManagerStream(t *testing.T) { go sm.Serve() // service - _ = func() (net.Conn, error) { + dialer := func() (net.Conn, error) { return net.Dial(network, addr) } + service, err := service.NewService(dialer) + if err != nil { + t.Error(err) + return + } + service.Close() + inf.wg.Wait() + // if the test failed, it will timeout +} +type informer struct { + wg *sync.WaitGroup } + +func (inf *informer) ServiceOnline(serviceID uint64, service string, addr net.Addr) { + inf.wg.Done() +} + +func (inf *informer) ServiceOffline(serviceID uint64, service string, addr net.Addr) { + inf.wg.Done() +} + +func (inf *informer) ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) {} From e29543148254cab0955d85f474f5526e81c49246 Mon Sep 17 00:00:00 2001 From: singchia Date: Thu, 8 Feb 2024 12:09:01 +0800 Subject: [PATCH 2/3] exchange: add api and defines --- api/v1/service/service_end.go | 8 ++-- pkg/api/error.go | 7 +++ pkg/api/interface.go | 46 +++++++++++++++++++ pkg/{proto => api}/proto.go | 2 +- pkg/edgebound/edge_manager.go | 29 +++--------- pkg/servicebound/service_dataplane.go | 7 +-- pkg/servicebound/service_manager.go | 65 ++++++++++++++------------- pkg/servicebound/service_onoff.go | 34 ++------------ 8 files changed, 104 insertions(+), 94 deletions(-) create mode 100644 pkg/api/error.go create mode 100644 pkg/api/interface.go rename pkg/{proto => api}/proto.go (97%) diff --git a/api/v1/service/service_end.go b/api/v1/service/service_end.go index 23abbc7..2a1b777 100644 --- a/api/v1/service/service_end.go +++ b/api/v1/service/service_end.go @@ -6,7 +6,7 @@ import ( "encoding/json" "strconv" - "github.com/singchia/frontier/pkg/proto" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/geminio" "github.com/singchia/geminio/client" "github.com/singchia/geminio/options" @@ -32,7 +32,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er sopts.SetLog(sopt.logger) } // meta - meta := &proto.Meta{} + meta := &api.Meta{} if sopt.topics != nil { // we deliver topics in meta meta.Topics = sopt.topics @@ -74,7 +74,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error { return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { - on := &proto.OnEdgeOnline{} + on := &api.OnEdgeOnline{} err := json.Unmarshal(req.Data(), on) if err != nil { // shouldn't be here @@ -92,7 +92,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error { return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { - off := &proto.OnEdgeOffline{} + off := &api.OnEdgeOffline{} err := json.Unmarshal(req.Data(), off) if err != nil { // shouldn't be here diff --git a/pkg/api/error.go b/pkg/api/error.go new file mode 100644 index 0000000..453e780 --- /dev/null +++ b/pkg/api/error.go @@ -0,0 +1,7 @@ +package api + +import "errors" + +var ( + ErrEdgeNotOnline = errors.New("edge not online") +) diff --git a/pkg/api/interface.go b/pkg/api/interface.go new file mode 100644 index 0000000..e75cba4 --- /dev/null +++ b/pkg/api/interface.go @@ -0,0 +1,46 @@ +package api + +import ( + "net" + + "github.com/singchia/geminio" +) + +type Exchange interface { + // rpc, message and raw io to edge + ForwardToEdge(*Meta, geminio.End) + // stream to edge + StreamToEdge(geminio.Stream) + // rpc, message and raw io to service + ForwardToService(geminio.End) + // stream to service + StreamToService(geminio.Stream) +} + +// edge related +type Edgebound interface { + ListEdges() []geminio.End + // for management + GetEdgeByID(edgeID uint64) geminio.End + DelEdgeByID(edgeID uint64) error +} + +type EdgeInformer interface { + EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) + EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) + EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) +} + +// service related +type Servicebound interface { + ListService() []geminio.End + // for management + GetService(service string) geminio.End + DelSerivces(service string) error +} + +type ServiceInformer interface { + ServiceOnline(serviceID uint64, service string, addr net.Addr) + ServiceOffline(serviceID uint64, service string, addr net.Addr) + ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) +} diff --git a/pkg/proto/proto.go b/pkg/api/proto.go similarity index 97% rename from pkg/proto/proto.go rename to pkg/api/proto.go index bc29909..e1f696f 100644 --- a/pkg/proto/proto.go +++ b/pkg/api/proto.go @@ -1,4 +1,4 @@ -package proto +package api // frontier -> service type OnEdgeOnline struct { diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index ae378ba..178641a 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -10,6 +10,7 @@ import ( "github.com/jumboframes/armorigo/rproxy" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/repo/dao" @@ -24,31 +25,11 @@ import ( "k8s.io/klog/v2" ) -type Edgebound interface { - ListEdges() []geminio.End - // for management - GetEdgeByID(edgeID uint64) geminio.End - DelEdgeByID(edgeID uint64) error -} - -type EdgeInformer interface { - EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) - EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) - EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) -} - -type Exchange interface { - // rpc, message and raw io to service - ForwardToService(geminio.End) - // stream to service - StreamToService(geminio.Stream) -} - type edgeManager struct { *delegate.UnimplementedDelegate - informer EdgeInformer - exchange Exchange + informer api.EdgeInformer + exchange api.Exchange conf *config.Configuration // edgeID allocator idFactory id.IDFactory @@ -74,8 +55,8 @@ type edgeManager struct { } // support for tls, mtls and tcp listening -func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer EdgeInformer, - exchange Exchange, tmr timer.Timer) (*edgeManager, error) { +func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeInformer, + exchange api.Exchange, tmr timer.Timer) (*edgeManager, error) { listen := &conf.Edgebound.Listen var ( ln net.Listener diff --git a/pkg/servicebound/service_dataplane.go b/pkg/servicebound/service_dataplane.go index 8b476f8..f21e052 100644 --- a/pkg/servicebound/service_dataplane.go +++ b/pkg/servicebound/service_dataplane.go @@ -1,6 +1,7 @@ package servicebound import ( + "github.com/singchia/frontier/pkg/api" "github.com/singchia/geminio" "k8s.io/klog/v2" ) @@ -30,11 +31,11 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) { } // forward to exchange -func (sm *serviceManager) forward(end geminio.End) { +func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) { serviceID := end.ClientID() - service := end.Meta() + service := meta.Service klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service) if sm.exchange != nil { - sm.exchange.ForwardToService(end) + sm.exchange.ForwardToEdge(meta, end) } } diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 6780f3e..9de7f2b 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -1,17 +1,19 @@ package servicebound import ( - "context" "crypto/tls" "crypto/x509" + "encoding/json" "net" "os" "sync" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/config" "github.com/singchia/frontier/pkg/mapmap" "github.com/singchia/frontier/pkg/repo/dao" + "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/frontier/pkg/security" "github.com/singchia/frontier/pkg/utils" "github.com/singchia/geminio" @@ -22,31 +24,11 @@ import ( "k8s.io/klog/v2" ) -type Servicebound interface { - ListService() []geminio.End - // for management - GetService(service string) geminio.End - DelSerivces(service string) error -} - -type ServiceInformer interface { - ServiceOnline(serviceID uint64, service string, addr net.Addr) - ServiceOffline(serviceID uint64, service string, addr net.Addr) - ServiceHeartbeat(serviceID uint64, service string, addr net.Addr) -} - -type Exchange interface { - // rpc, message and raw io to edge - ForwardToService(geminio.End) - // stream to edge - StreamToEdge(geminio.Stream) -} - type serviceManager struct { *delegate.UnimplementedDelegate - informer ServiceInformer - exchange Exchange + informer api.ServiceInformer + exchange api.Exchange conf *config.Configuration // serviceID allocator idFactory id.IDFactory @@ -67,8 +49,8 @@ type serviceManager struct { tmr timer.Timer } -func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer ServiceInformer, - exchange Exchange, tmr timer.Timer) (*serviceManager, error) { +func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.ServiceInformer, + exchange api.Exchange, tmr timer.Timer) (*serviceManager, error) { listen := &conf.Servicebound.Listen var ( ln net.Listener @@ -173,19 +155,40 @@ func (sm *serviceManager) handleConn(conn net.Conn) error { klog.Errorf("service manager geminio server new end err: %s", err) return err } - - // handle online event for end - if err = sm.online(end); err != nil { + meta := &api.Meta{} + err = json.Unmarshal(end.Meta(), meta) + if err != nil { + klog.Errorf("handle conn, json unmarshal err: %s", err) return err } + // register topics claim of end + sm.remoteReceiveClaim(end.ClientID(), meta.Topics) - // register methods for service - if err = end.Register(context.TODO(), "topic_claim", sm.RemoteReceiveClaim); err != nil { + // handle online event for end + if err = sm.online(end, meta); err != nil { return err } // forward and stream up to edge - sm.forward(end) + sm.forward(meta, end) + return nil +} + +func (sm *serviceManager) remoteReceiveClaim(serviceID uint64, topics []string) error { + klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", topics, serviceID) + var err error + // memdb + for _, topic := range topics { + st := &model.ServiceTopic{ + Topic: topic, + ServiceID: serviceID, + } + err = sm.dao.CreateServiceTopic(st) + if err != nil { + klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err) + return err + } + } return nil } diff --git a/pkg/servicebound/service_onoff.go b/pkg/servicebound/service_onoff.go index e0188d2..d0f588e 100644 --- a/pkg/servicebound/service_onoff.go +++ b/pkg/servicebound/service_onoff.go @@ -1,13 +1,12 @@ package servicebound import ( - "context" - "encoding/json" "net" "strconv" "time" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/geminio" @@ -15,7 +14,7 @@ import ( "k8s.io/klog/v2" ) -func (sm *serviceManager) online(end geminio.End) error { +func (sm *serviceManager) online(end geminio.End, meta *api.Meta) error { // cache var sync synchub.Sync sm.mtx.Lock() @@ -42,7 +41,7 @@ func (sm *serviceManager) online(end geminio.End) error { // memdb service := &model.Service{ ServiceID: end.ClientID(), - Service: string(end.Meta()), + Service: meta.Service, Addr: end.RemoteAddr().String(), CreateTime: time.Now().Unix(), } @@ -157,33 +156,6 @@ func (sm *serviceManager) RemoteRegistration(rpc string, serviceID, streamID uin } } -// RemoteReceiveClaim is called by wrappered RPC -func (sm *serviceManager) RemoteReceiveClaim(ctx context.Context, req geminio.Request, rsp geminio.Response) { - // TODO return err - serviceID := req.ClientID() - - claim := &TopicClaim{} - err := json.Unmarshal(req.Data(), claim) - if err != nil { - klog.Errorf("service remote receive claim, err: %s", err) - return - } - klog.V(5).Infof("service remote receive claim, topics: %v, serviceID: %d", claim.Topics, serviceID) - - // memdb - for _, topic := range claim.Topics { - st := &model.ServiceTopic{ - Topic: topic, - ServiceID: serviceID, - } - err = sm.dao.CreateServiceTopic(st) - if err != nil { - klog.Errorf("service remote receive claim, create service topic: %s, err: %s", topic, err) - return - } - } -} - // actually the meta is service func (sm *serviceManager) GetClientID(meta []byte) (uint64, error) { // TODO From ce7e95eed3594b516f2a0e4ea6dd0f1c4a94be3a Mon Sep 17 00:00:00 2001 From: singchia Date: Thu, 8 Feb 2024 12:09:27 +0800 Subject: [PATCH 3/3] exchange: add exchange and forward edge functions --- pkg/exchange/exchange.go | 10 ++++ pkg/exchange/forward.go | 103 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 pkg/exchange/exchange.go create mode 100644 pkg/exchange/forward.go diff --git a/pkg/exchange/exchange.go b/pkg/exchange/exchange.go new file mode 100644 index 0000000..5041928 --- /dev/null +++ b/pkg/exchange/exchange.go @@ -0,0 +1,10 @@ +package exchange + +import ( + "github.com/singchia/frontier/pkg/api" +) + +type exchange struct { + Edgebound api.Edgebound + Servicebound api.Servicebound +} diff --git a/pkg/exchange/forward.go b/pkg/exchange/forward.go new file mode 100644 index 0000000..97f9c90 --- /dev/null +++ b/pkg/exchange/forward.go @@ -0,0 +1,103 @@ +package exchange + +import ( + "context" + "encoding/binary" + "io" + + "github.com/singchia/frontier/pkg/api" + "github.com/singchia/geminio" + "k8s.io/klog/v2" +) + +func (ex *exchange) ForwardToEdge(meta *api.Meta, end geminio.End) { + // raw + ex.forwardRawToEdge(end) + // message + ex.forwardMessageToEdge(end) + // rpc + ex.forwardRPCToEdge(end) +} + +func (ex *exchange) forwardRawToEdge(end geminio.End) { + go func() { + klog.V(6).Infof("exchange forward raw, discard for now") + //drop the io, actually we won't be here + io.Copy(io.Discard, end) + }() +} + +func (ex *exchange) forwardRPCToEdge(end geminio.End) { + // we hijack all rpc and forward them to edge + end.Hijack(func(ctx context.Context, method string, r1 geminio.Request, r2 geminio.Response) { + serviceID := end.ClientID() + // get target edgeID + custom := r1.Custom() + edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:]) + r1.SetCustom(custom[:len(custom)-8]) + + // get edge + edge := ex.Edgebound.GetEdgeByID(edgeID) + if edge == nil { + klog.V(4).Infof("forward rpc, service: %d, call edge: %d is not online", serviceID, edgeID) + r2.SetError(api.ErrEdgeNotOnline) + return + } + // call edge + r3, err := edge.Call(ctx, method, r1) + if err != nil { + klog.V(5).Infof("forward rpc, service: %d, call edge: %d err: %s", serviceID, edgeID, err) + r2.SetError(err) + return + } + // we record the edgeID back to r2, for service + tail := make([]byte, 8) + binary.BigEndian.PutUint64(tail, edgeID) + custom = r3.Custom() + if custom == nil { + custom = tail + } else { + custom = append(custom, tail...) + } + r2.SetData(r3.Data()) + r2.SetError(r3.Error()) + r2.SetCustom(custom) + }) +} + +func (ex *exchange) forwardMessageToEdge(end geminio.End) { + serviceID := end.ClientID() + go func() { + for { + msg, err := end.Receive(context.TODO()) + if err != nil { + if err == io.EOF { + klog.V(5).Infof("forward message, service: %d receive EOF", serviceID) + return + } + klog.Errorf("forward message, service: %d receive err: %s", serviceID, err) + continue + } + // get target edgeID + custom := msg.Custom() + edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:]) + msg.SetCustom(custom[:len(custom)-8]) + + // get edge + edge := ex.Edgebound.GetEdgeByID(edgeID) + if edge == nil { + klog.V(4).Infof("forward message, service: %d, the edge: %d is not online", serviceID, edgeID) + msg.Error(api.ErrEdgeNotOnline) + return + } + // publish in sync, TODO publish in async + err = edge.Publish(context.TODO(), msg) + if err != nil { + klog.V(5).Infof("forward message, service: %d, publish edge: %d err: %s", serviceID, edgeID, err) + msg.Error(err) + return + } + msg.Done() + } + }() +}