From f02759796045c9509266d4686f983ba125c43f76 Mon Sep 17 00:00:00 2001 From: singchia Date: Tue, 20 Feb 2024 20:17:24 +0800 Subject: [PATCH 1/2] add example of interactive command-line messaging --- api/v1/service/service_end.go | 6 +- cmd/frontier/main.go | 16 +- examples/Makefile | 9 + examples/iclm/Makefile | 17 + examples/iclm/README.md | 2 + examples/iclm/edge/edge.go | 333 ++++++++++++++++++ examples/iclm/service/service.go | 469 ++++++++++++++++++++++++++ go.mod | 4 + pkg/api/error.go | 18 +- pkg/api/interface.go | 18 +- pkg/api/proto.go | 13 +- pkg/config/config.go | 50 +-- pkg/config/config.yaml | 8 +- pkg/edgebound/edge_dataplane.go | 2 +- pkg/edgebound/edge_manager.go | 8 +- pkg/edgebound/edge_onoff.go | 39 ++- pkg/exchange/forward.go | 2 + pkg/exchange/oob.go | 96 ++++++ pkg/servicebound/service_dataplane.go | 2 +- pkg/servicebound/service_manager.go | 6 +- pkg/servicebound/service_onoff.go | 2 +- 21 files changed, 1057 insertions(+), 63 deletions(-) create mode 100644 examples/Makefile create mode 100644 examples/iclm/Makefile create mode 100644 examples/iclm/README.md create mode 100644 examples/iclm/edge/edge.go create mode 100644 examples/iclm/service/service.go create mode 100644 pkg/exchange/oob.go diff --git a/api/v1/service/service_end.go b/api/v1/service/service_end.go index 2a1b777..8bf67ce 100644 --- a/api/v1/service/service_end.go +++ b/api/v1/service/service_end.go @@ -59,7 +59,7 @@ func newServiceEnd(dialer client.Dialer, opts ...ServiceOption) (*serviceEnd, er // Control Register func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetEdgeID) error { - return service.End.Register(ctx, "get_edge_id", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { + return service.End.Register(ctx, api.RPCGetEdgeID, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { id, err := getEdgeID(req.Data()) if err != nil { // we just deliver the err back @@ -73,7 +73,7 @@ func (service *serviceEnd) RegisterGetEdgeID(ctx context.Context, getEdgeID GetE } func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline EdgeOnline) error { - return service.End.Register(ctx, "edge_online", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { + return service.End.Register(ctx, api.RPCEdgeOnline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { on := &api.OnEdgeOnline{} err := json.Unmarshal(req.Data(), on) if err != nil { @@ -91,7 +91,7 @@ func (service *serviceEnd) RegisterEdgeOnline(ctx context.Context, edgeOnline Ed } func (service *serviceEnd) RegisterEdgeOffline(ctx context.Context, edgeOffline EdgeOffline) error { - return service.End.Register(ctx, "edge_offline", func(ctx context.Context, req geminio.Request, rsp geminio.Response) { + return service.End.Register(ctx, api.RPCEdgeOffline, func(ctx context.Context, req geminio.Request, rsp geminio.Response) { off := &api.OnEdgeOffline{} err := json.Unmarshal(req.Data(), off) if err != nil { diff --git a/cmd/frontier/main.go b/cmd/frontier/main.go index 2a66d33..36f59ba 100644 --- a/cmd/frontier/main.go +++ b/cmd/frontier/main.go @@ -20,24 +20,33 @@ func main() { klog.Errorf("parse flags err: %s", err) return } + klog.Infof("frontier starts") + defer func() { + klog.Infof("frontier ends") + klog.Flush() + }() + // dao dao, err := dao.NewDao(conf) if err != nil { klog.Errorf("new dao err: %s", err) return } + klog.V(5).Infof("new dao succeed") // mqm mqm, err := mq.NewMQM(conf) if err != nil { klog.Errorf("new mq manager err: %s", err) return } + klog.V(5).Infof("new mq manager succeed") // exchange exchange, err := exchange.NewExchange(conf) if err != nil { klog.Errorf("new exchange err: %s", err) return } + klog.V(5).Infof("new exchange succeed") tmr := timer.NewTimer() // servicebound @@ -46,7 +55,8 @@ func main() { klog.Errorf("new servicebound err: %s", err) return } - servicebound.Serve() + go servicebound.Serve() + klog.V(5).Infof("new servicebound succeed") // edgebound edgebound, err := edgebound.NewEdgebound(conf, dao, nil, exchange, tmr) @@ -54,10 +64,12 @@ func main() { klog.Errorf("new edgebound err: %s", err) return } - edgebound.Serve() + go edgebound.Serve() + klog.V(5).Infof("new edgebound succeed") sig := sigaction.NewSignal() sig.Wait(context.TODO()) + edgebound.Close() servicebound.Close() } diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 0000000..87cd62b --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,9 @@ +GOHOSTOS?=$(shell go env GOHOSTOS) +GOARCH?=$(shell go env GOARCH) + +.PHONY: all +all: iclm + +.PHONY: iclm +iclm: + make -C iclm diff --git a/examples/iclm/Makefile b/examples/iclm/Makefile new file mode 100644 index 0000000..0ad7001 --- /dev/null +++ b/examples/iclm/Makefile @@ -0,0 +1,17 @@ +GOHOSTOS?=$(shell go env GOHOSTOS) +GOARCH?=$(shell go env GOARCH) + +.PHONY: all +all: iclm_service iclm_edge + +.PHONY: clean +clean: + rm iclm_service iclm_edge + +iclm_service: service/*.go + CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ + go build -trimpath -ldflags "-s -w" -o iclm_service service/*.go + +iclm_edge: edge/*.go + CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ + go build -trimpath -ldflags "-s -w" -o iclm_edge edge/*.go \ No newline at end of file diff --git a/examples/iclm/README.md b/examples/iclm/README.md new file mode 100644 index 0000000..66005eb --- /dev/null +++ b/examples/iclm/README.md @@ -0,0 +1,2 @@ + +## Iteractive command-line messaging \ No newline at end of file diff --git a/examples/iclm/edge/edge.go b/examples/iclm/edge/edge.go new file mode 100644 index 0000000..fa62101 --- /dev/null +++ b/examples/iclm/edge/edge.go @@ -0,0 +1,333 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/singchia/frontier/api/v1/edge" + "github.com/spf13/pflag" + + armlog "github.com/jumboframes/armorigo/log" + "github.com/singchia/geminio" +) + +var ( + sns sync.Map + methodSlice []string +) + +type LabelData struct { + Label string `json:"label"` + Data []byte `json:"data"` +} + +func main() { + methodSlice = []string{} + network := pflag.String("network", "tcp", "network to dial") + address := pflag.String("address", "127.0.0.1:4004", "address to dial") + loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") + meta := pflag.String("meta", "test", "meta to set on connection") + methods := pflag.String("methods", "", "method name, support echo, calculate") + label := pflag.String("label", "label-01", "label to message or rpc") + + pflag.Parse() + dialer := func() (net.Conn, error) { + return net.Dial(*network, *address) + } + // log + level, err := armlog.ParseLevel(*loglevel) + if err != nil { + fmt.Println("parse log level err:", err) + return + } + armlog.SetLevel(level) + armlog.SetOutput(os.Stdout) + + // get edge + cli, err := edge.NewEdge(dialer, + edge.OptionEdgeLog(armlog.DefaultLog), edge.OptionEdgeMeta([]byte(*meta))) + if err != nil { + armlog.Info("new edge err:", err) + return + } + //sms := cli.ListStreams() + //sns.Store("1", sms[0]) + if *methods != "" { + methodSlice = strings.Split(*methods, ",") + } + + // receive on edge + go func() { + for { + msg, err := cli.Receive(context.TODO()) + if err == io.EOF { + return + } + if err != nil { + fmt.Println("> receive err:", err) + fmt.Println(">>> ") + continue + } + msg.Done() + fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data())) + fmt.Print(">>> ") + } + }() + + go func() { + for { + st, err := cli.AcceptStream() + if err == io.EOF { + return + } else if err != nil { + fmt.Println("> accept stream err:", err) + fmt.Print(">>> ") + continue + } + fmt.Println("> accept stream", st.StreamID()) + fmt.Print(">>> ") + sns.Store(strconv.FormatUint(st.StreamID(), 10), st) + go handleStream(st) + } + }() + + go func() { + time.Sleep(200 * time.Millisecond) + // register + for _, method := range methodSlice { + switch method { + case "echo": + err = cli.Register(context.TODO(), "echo", echo) + if err != nil { + armlog.Info("> register echo err:", err) + return + } + } + } + }() + + cursor := "1" + fmt.Print(">>> ") + + // the command-line protocol + // 1. close + // 2. quit + // 3. switch {streamID} + // 4. open {service} + // 5. close {streamID} + // 6. publish {msg} #note to switch to stream first + // 7. publish {topic} {msg} + // 8. call {method} {req} + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + parts := strings.Split(text, " ") + switch len(parts) { + case 1: + // 1. close + // 2. quit + if parts[0] == "help" { + fmt.Println(`the cli protocol + 1. close + 2. quit + 3. open {service} + 4. close {streamID} + 5. switch {streamID} + 6. publish {msg} #note to switch to stream first + 7. publish {topic} {msg} + 8. call {method} {req}`) + goto NEXT + } + if parts[0] == "quit" || parts[0] == "close" { + cli.Close() + goto END + } + case 2: + // 1. open {service} + // 2. close {streamID} + // 3. switch {streamID} + // 4. publish {msg} + if parts[0] == "open" { + service := parts[1] + st, err := cli.OpenStream(service) + if err != nil { + fmt.Println("> open stream err:", err) + goto NEXT + } + fmt.Println("> open stream success:", st.StreamID()) + sns.Store(strconv.FormatUint(st.StreamID(), 10), st) + go handleStream(st) + goto NEXT + } + if parts[0] == "close" { + // close sessionID + session := parts[1] + sn, ok := sns.LoadAndDelete(session) + if !ok { + fmt.Printf("> stream id: %s not found\n", session) + goto NEXT + } + sn.(geminio.Stream).Close() + fmt.Println("> close stream success:", session) + goto NEXT + } + if parts[0] == "switch" { + session := parts[1] + if session == "1" { + cursor = session + fmt.Println("> swith stream success:", session) + goto NEXT + } + _, ok := sns.Load(session) + if !ok { + fmt.Println("> swith stream failed, not found:", session) + goto NEXT + } + cursor = session + fmt.Println("> swith stream success:", session) + goto NEXT + } + if cursor != "1" && (parts[0] == "publish") { + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + + if parts[0] == "publish" { + ld := &LabelData{ + Label: *label, + Data: []byte(parts[1]), + } + data, _ := json.Marshal(ld) + msg := cli.NewMessage(data) + err := sn.(geminio.Stream).Publish(context.TODO(), msg) + if err != nil { + fmt.Println("> publish err:", err) + goto NEXT + } + fmt.Println("> publish success") + goto NEXT + } + } + case 3: + // 1. publish {topic} {msg} + // 2. call {method} {req} + if cursor != "1" { + // in stream + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + if parts[0] == "call" { + req := cli.NewRequest([]byte(parts[2])) + rsp, err := sn.(geminio.Stream).Call(context.TODO(), string(parts[1]), req) + if err != nil { + fmt.Println("> call err:", err) + goto NEXT + } + fmt.Println("> call success, ret:", string(rsp.Data())) + goto NEXT + } + } + if parts[0] == "publish" { + ld := &LabelData{ + Label: *label, + Data: []byte(parts[2]), + } + data, _ := json.Marshal(ld) + msg := cli.NewMessage(data) + err := cli.Publish(context.TODO(), string(parts[1]), msg) + if err != nil { + fmt.Println("> publish err:", err) + goto NEXT + } + fmt.Println("> publish success") + goto NEXT + } + if parts[0] == "call" { + ld := &LabelData{ + Label: *label, + Data: []byte(parts[2]), + } + data, _ := json.Marshal(ld) + req := cli.NewRequest(data) + rsp, err := cli.Call(context.TODO(), string(parts[1]), req) + if err != nil { + fmt.Println("> call err:", err) + goto NEXT + } + fmt.Println("> call success, ret:", string(rsp.Data())) + goto NEXT + } + } + fmt.Println("> illegal operation") + NEXT: + if cursor != "1" { + fmt.Printf("[%20s] >>> ", cursor) + } else { + fmt.Print(">>> ") + } + } +END: + time.Sleep(time.Second) +} + +func handleStream(stream geminio.Stream) { + go func() { + for { + msg, err := stream.Receive(context.TODO()) + if err != nil { + fmt.Println("> receive err:", err) + fmt.Print(">>> ") + return + } + msg.Done() + fmt.Println("> receive msg:", msg.ClientID(), msg.StreamID(), string(msg.Data())) + fmt.Print(">>> ") + } + }() + go func() { + for { + data := make([]byte, 1024) + _, err := stream.Read(data) + if err != nil { + fmt.Println("> read err:", err) + fmt.Print(">>> ") + return + } + fmt.Println("> read data:", stream.ClientID(), + string(data)) + fmt.Print(">>> ") + } + }() + go func() { + time.Sleep(200 * time.Millisecond) + for _, method := range methodSlice { + switch method { + case "echo": + err := stream.Register(context.TODO(), "echo", echo) + if err != nil { + armlog.Info("> register echo err:", err) + return + } + } + } + }() +} + +func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { + fmt.Println("\ncall > ", req.ClientID(), req.StreamID(), string(req.Data())) + fmt.Print(">>> ") + rsp.SetData(req.Data()) +} diff --git a/examples/iclm/service/service.go b/examples/iclm/service/service.go new file mode 100644 index 0000000..eec2ce3 --- /dev/null +++ b/examples/iclm/service/service.go @@ -0,0 +1,469 @@ +package main + +import ( + "bufio" + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "log" + "net" + "net/http" + _ "net/http/pprof" + "os" + "runtime" + "strconv" + "strings" + "sync" + "time" + + armlog "github.com/jumboframes/armorigo/log" + "github.com/singchia/frontier/api/v1/service" + "github.com/singchia/geminio" + "github.com/spf13/pflag" +) + +var ( + edgeID uint64 + edgeIDs sync.Map + edges sync.Map + sns sync.Map + + methodSlice []string + topicSlice []string + printmessage *bool + srv service.Service + + labels map[string]int64 = map[string]int64{} + labelsMtx sync.RWMutex +) + +func addLabel(label string, delta int64) { + labelsMtx.Lock() + counter, ok := labels[label] + if ok { + counter += delta + labels[label] = counter + } else { + labels[label] = delta + } + labelsMtx.Unlock() +} + +func printLabel() { + labelsMtx.RLock() + defer labelsMtx.RUnlock() + + for label, counter := range labels { + fmt.Printf("label: %s, counter: %d\n", label, counter) + } +} + +type LabelData struct { + Label string `json:"label"` + Data []byte `json:"data"` +} + +func main() { + methodSlice = []string{} + runtime.SetCPUProfileRate(10000) + go func() { + http.ListenAndServe("0.0.0.0:6062", nil) + }() + network := pflag.String("network", "tcp", "network to dial") + address := pflag.String("address", "127.0.0.1:4003", "address to dial") + loglevel := pflag.String("loglevel", "info", "log level, trace debug info warn error") + serviceName := pflag.String("service", "foo", "service name") + topics := pflag.String("topics", "", "topics to receive message, empty means without consuming") + methods := pflag.String("methods", "", "method name, support echo, calculate") + printmessage = pflag.Bool("printmessage", true, "whether print message out") + stats := pflag.Bool("stats", false, "print statistics or not") + + pflag.Parse() + dialer := func() (net.Conn, error) { + return net.Dial(*network, *address) + } + // log + level, err := armlog.ParseLevel(*loglevel) + if err != nil { + fmt.Println("parse log level err:", err) + return + } + armlog.SetLevel(level) + armlog.SetOutput(os.Stdout) + + // get service + opt := []service.ServiceOption{service.OptionServiceLog(armlog.DefaultLog), service.OptionServiceName(*serviceName)} + if *topics != "" { + topicSlice = strings.Split(*topics, ",") + opt = append(opt, service.OptionServiceReceiveTopics(topicSlice)) + } + srv, err = service.NewService(dialer, opt...) + if err != nil { + log.Println("new end err:", err) + return + } + // pre register methods + if *methods != "" { + methodSlice = strings.Split(*methods, ",") + for _, method := range methodSlice { + switch method { + case "echo": + err = srv.Register(context.TODO(), "echo", echo) + if err != nil { + log.Println("> register echo err:", err) + return + } + } + } + } + // pre register functions for edges events + err = srv.RegisterGetEdgeID(context.TODO(), getID) + if err != nil { + log.Println("> end register getID err:", err) + return + } + err = srv.RegisterEdgeOnline(context.TODO(), online) + if err != nil { + log.Println("> end register online err:", err) + return + } + err = srv.RegisterEdgeOffline(context.TODO(), offline) + if err != nil { + log.Println("> end register offline err:", err) + return + } + + // label counter + if *stats { + go func() { + ticker := time.NewTicker(time.Second) + for { + <-ticker.C + printLabel() + } + }() + } + + // service receive + go func() { + for { + msg, err := srv.Receive(context.TODO()) + if err == io.EOF { + return + } + if err != nil { + fmt.Println("> receive err:", err) + fmt.Print(">>> ") + continue + } + msg.Done() + value := msg.Data() + ld := &LabelData{} + err = json.Unmarshal(value, ld) + if err == nil { + addLabel(string(ld.Label), 1) + value = ld.Data + } + if *printmessage { + fmt.Printf("> receive msg, edgeID: %d streamID: %d data: %s\n", msg.ClientID(), msg.StreamID(), string(value)) + fmt.Print(">>> ") + } + } + }() + + // service accept stream + go func() { + for { + st, err := srv.AcceptStream() + if err == io.EOF { + return + } else if err != nil { + fmt.Println("> accept stream err:", err) + continue + } + fmt.Println("> accept stream", st.ClientID(), st.StreamID()) + sns.Store(strconv.FormatUint(st.StreamID(), 10), st) + go handleStream(st) + } + }() + + cursor := "1" + fmt.Print(">>> ") + + // the command-line protocol + // 1. close + // 2. quit + // 3. open {edgeID} + // 4. close {streamID} + // 5. switch {streamID} + // 6. publish {msg} #note to switch to stream first + // 7. publish {edgeID} {msg} + // 8. call {method} {req} #note to switch to stream first + // 9. call {edgeID} {method} {req} + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + parts := strings.Split(text, " ") + switch len(parts) { + case 1: + if parts[0] == "help" { + fmt.Println(`the command-line protocol + 1. close + 2. quit + 3. open {clientID} + 4. close {streamID} + 5. switch {streamID} + 6. publish {msg} #note to switch to stream first + 7. publish {clientId} {msg} + 8. call {method} {req} #note to switch to stream first + 9. call {clientId} {method} {req}`) + goto NEXT + } + // 1. close + if parts[0] == "close" || parts[0] == "quit" { + srv.Close() + goto END + } + if parts[0] == "count" { + count := 0 + edges.Range(func(key, value interface{}) bool { + count++ + return true + }) + fmt.Println("> count:", count) + goto NEXT + } + case 2: + // 1. open {edgeID} + // 2. close {streamID} + // 3. switch {streamID} + // 4. publish {msg} + if parts[0] == "open" { + edgeID, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + fmt.Println("> illegal edgeID", err, parts[1]) + goto NEXT + } + // 1. open edgeID + st, err := srv.OpenStream(context.TODO(), edgeID) + if err != nil { + fmt.Println("> open stream err", err) + goto NEXT + } + fmt.Println("> open stream success:", edgeID, st.StreamID()) + sns.Store(strconv.FormatUint(st.StreamID(), 10), st) + go handleStream(st) + goto NEXT + } + if parts[0] == "close" { + stream := parts[1] + sn, ok := sns.LoadAndDelete(stream) + if !ok { + fmt.Printf("> stream id: %s not found\n", stream) + goto NEXT + } + sn.(geminio.Stream).Close() + fmt.Println("> close stream success:", stream) + goto NEXT + } + if parts[0] == "switch" { + session := parts[1] + if session == "1" { + cursor = session + fmt.Println("> swith stream success:", session) + goto NEXT + } + _, ok := sns.Load(session) + if !ok { + fmt.Println("> swith stream failed, not found:", session) + goto NEXT + } + cursor = session + fmt.Println("> swith stream success:", session) + goto NEXT + } + if cursor != "1" && (parts[0] == "publish") { + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + + if parts[0] == "publish" { + msg := srv.NewMessage([]byte(parts[1])) + err := sn.(geminio.Stream).Publish(context.TODO(), msg) + if err != nil { + fmt.Println("> publish err:", err) + goto NEXT + } + fmt.Println("> publish success") + goto NEXT + } + } + case 3: + // 1. publish {edgeID} {msg} + // 2. call {method} {req} if switch to stream + if cursor != "1" { + // in stream + sn, ok := sns.Load(cursor) + if !ok { + fmt.Printf("> stream: %s not found\n", cursor) + goto NEXT + } + if parts[0] == "call" { + req := srv.NewRequest([]byte(parts[2])) + rsp, err := sn.(geminio.Stream).Call(context.TODO(), string(parts[1]), req) + if err != nil { + fmt.Println("> call err:", err) + goto NEXT + } + fmt.Println("> call success, ret:", string(rsp.Data())) + goto NEXT + } + } + if parts[0] == "publish" { + edgeID, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + log.Println("> illegal edge id", err, parts[1]) + goto NEXT + } + msg := srv.NewMessage([]byte(parts[2])) + err = srv.Publish(context.TODO(), edgeID, msg) + if err != nil { + log.Println("> publish err:", err) + goto NEXT + } + fmt.Println("> publish success") + goto NEXT + } + case 4: + // call {edgeID} {method} {req} + if parts[0] == "call" { + edgeID, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + log.Println("> illegal edge id", err, parts[1]) + goto NEXT + } + req := srv.NewRequest([]byte(parts[3])) + rsp, err := srv.Call(context.TODO(), edgeID, parts[2], req) + if err != nil { + log.Println("> call err:", err) + goto NEXT + } + log.Println("> call success, ret:", string(rsp.Data())) + goto NEXT + } + } + log.Println("illegal operation") + NEXT: + if cursor != "1" { + fmt.Printf("[%20s] >>> ", cursor) + } else { + fmt.Print(">>> ") + } + } + +END: + time.Sleep(10 * time.Second) +} + +func handleStream(stream geminio.Stream) { + go func() { + for { + msg, err := stream.Receive(context.TODO()) + if err != nil { + log.Println("> receive err:", err) + return + } + msg.Done() + value := msg.Data() + ld := &LabelData{} + err = json.Unmarshal(value, ld) + if err == nil { + addLabel(string(ld.Label), 1) + value = ld.Data + } + if *printmessage { + edgeID := binary.BigEndian.Uint64(msg.Custom()) + fmt.Println("> receive msg:", edgeID, msg.StreamID(), string(value)) + fmt.Print(">>> ") + } + } + }() + go func() { + for { + data := make([]byte, 1024) + _, err := stream.Read(data) + if err != nil { + log.Println("> read err:", err) + return + } + fmt.Println("> read data:", stream.ClientID(), + string(data)) + fmt.Print(">>> ") + } + }() + go func() { + time.Sleep(200 * time.Millisecond) + for _, method := range methodSlice { + switch method { + case "echo": + err := stream.Register(context.TODO(), "echo", echo) + if err != nil { + log.Println("> register echo err:", err) + return + } + } + } + }() +} + +func snID(edgeID uint64, streamID uint64) string { + return strconv.FormatUint(edgeID, 10) + "-" + strconv.FormatUint(streamID, 10) +} + +func pickedge() uint64 { + var edgeID uint64 + edges.Range(func(key, value interface{}) bool { + // TODO 先返回第一个 + edgeID = key.(uint64) + return false + }) + return edgeID +} + +func getID(meta []byte) (uint64, error) { + id := uint64(time.Now().UnixMicro()) + //id := atomic.AddUint64(&edgeID, 1) + edgeIDs.Store(string(meta), id) + return id, nil +} + +func online(edgeID uint64, meta []byte, addr net.Addr) error { + fmt.Printf("\n> online, edgeID: %d, addr: %s\n", edgeID, addr.String()) + fmt.Print(">>> ") + edges.Store(edgeID, struct{}{}) + return nil +} + +func offline(edgeID uint64, meta []byte, addr net.Addr) error { + fmt.Printf("\n> offline, edgeID: %d, addr: %s\n", edgeID, addr.String()) + fmt.Print(">>> ") + edges.Delete(edgeID) + return nil +} + +func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) { + value := req.Data() + ld := &LabelData{} + err := json.Unmarshal(value, ld) + if err == nil { + addLabel(string(ld.Label), 1) + value = ld.Data + } + fmt.Println("\ncall > ", req.ClientID(), req.StreamID(), string(value)) + fmt.Print(">>> ") + rsp.SetData(value) +} diff --git a/go.mod b/go.mod index 35f6cd3..eb2e311 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module github.com/singchia/frontier go 1.20 +replace ( + github.com/singchia/geminio => ../../moresec/singchia/geminio +) + require ( github.com/jumboframes/armorigo v0.3.0 github.com/singchia/geminio v1.1.2 diff --git a/pkg/api/error.go b/pkg/api/error.go index ae6db3b..9d2b032 100644 --- a/pkg/api/error.go +++ b/pkg/api/error.go @@ -1,8 +1,20 @@ package api -import "errors" +import ( + "errors" + + "gorm.io/gorm" +) + +var ( + ErrEdgeNotOnline = errors.New("edge not online") + ErrServiceNotOnline = errors.New("service not online") + ErrRPCNotOnline = errors.New("rpc not online") + ErrTopicNotOnline = errors.New("topic not online") + ErrIllegalEdgeID = errors.New("illegal edgeID") + ErrRecordNotFound = gorm.ErrRecordNotFound +) var ( - ErrEdgeNotOnline = errors.New("edge not online") - ErrTopicNotOnline = errors.New("topic not online") + ErrStrUseOfClosedConnection = "use of closed network connection" ) diff --git a/pkg/api/interface.go b/pkg/api/interface.go index 43797d1..8ceca21 100644 --- a/pkg/api/interface.go +++ b/pkg/api/interface.go @@ -7,10 +7,16 @@ import ( ) type Exchange interface { + // For Service // rpc, message and raw io to edge ForwardToEdge(*Meta, geminio.End) // stream to edge // TODO StreamToEdge(geminio.Stream) + + // For Edge + GetEdgeID(meta []byte) (uint64, error) // get EdgeID for edge + EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error + EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error // rpc, message and raw io to service ForwardToService(geminio.End) // stream to service @@ -33,12 +39,6 @@ type Edgebound interface { Close() error } -type EdgeInformer interface { - EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) - EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) - EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) -} - // service related type Servicebound interface { ListService() []geminio.End @@ -52,6 +52,12 @@ type Servicebound interface { Close() error } +// informer +type EdgeInformer interface { + EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) + EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) + EdgeHeartbeat(edgeID uint64, meta []byte, addr net.Addr) +} type ServiceInformer interface { ServiceOnline(serviceID uint64, service string, addr net.Addr) ServiceOffline(serviceID uint64, service string, addr net.Addr) diff --git a/pkg/api/proto.go b/pkg/api/proto.go index e1f696f..0182bec 100644 --- a/pkg/api/proto.go +++ b/pkg/api/proto.go @@ -1,6 +1,13 @@ package api // frontier -> service +// global rpcs +var ( + RPCGetEdgeID = "get_edge_id" + RPCEdgeOnline = "edge_online" + RPCEdgeOffline = "edge_offline" +) + type OnEdgeOnline struct { EdgeID uint64 Meta []byte @@ -33,11 +40,7 @@ func (offline *OnEdgeOffline) String() string { } // service -> frontier -type ReceiveClaim struct { - Topics []string -} - -// service -> frontier +// meta carried when service inited type Meta struct { Service string Topics []string diff --git a/pkg/config/config.go b/pkg/config/config.go index 694cc85..fed0efa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -2,11 +2,12 @@ package config import ( "flag" + "fmt" "io" "os" - "strconv" armio "github.com/jumboframes/armorigo/io" + "github.com/jumboframes/armorigo/log" "github.com/spf13/pflag" "gopkg.in/yaml.v2" "k8s.io/klog/v2" @@ -102,38 +103,16 @@ type Configuration struct { func Parse() (*Configuration, error) { var ( argConfigFile = pflag.String("config", "", "config file, default not configured") + argArmorigoLogLevel = pflag.String("loglevel", "info", "log level for armorigo log") argDaemonRLimitNofile = pflag.Int("daemon-rlimit-nofile", -1, "SetRLimit for number of file of this daemon, default: -1 means ignore") config *Configuration ) pflag.Lookup("daemon-rlimit-nofile").NoOptDefVal = "1048576" - // config file - if *argConfigFile != "" { - data, err := os.ReadFile(*argConfigFile) - if err != nil { - return nil, err - } - config = &Configuration{} - if err = yaml.Unmarshal(data, config); err != nil { - return nil, err - } - } - // set klog klogFlags := flag.NewFlagSet("klog", flag.ExitOnError) klog.InitFlags(klogFlags) - klogFlags.Set("log_dir", config.Log.LogDir) - klogFlags.Set("log_file", config.Log.LogFile) - klogFlags.Set("log_file_max_file", strconv.FormatUint(config.Log.LogFileMaxSizeMB, 10)) - klogFlags.Set("logtostderr", strconv.FormatBool(config.Log.ToStderr)) - klogFlags.Set("alsologtostderr", strconv.FormatBool(config.Log.AlsoToStderr)) - klogFlags.Set("verbosity", strconv.FormatInt(int64(config.Log.Verbosity), 10)) - klogFlags.Set("add_dir_header", strconv.FormatBool(config.Log.AddDirHeader)) - klogFlags.Set("skip_headers", strconv.FormatBool(config.Log.SkipHeaders)) - klogFlags.Set("one_output", strconv.FormatBool(config.Log.OneOutput)) - klogFlags.Set("skip_log_headers", strconv.FormatBool(config.Log.SkipLogHeaders)) - klogFlags.Set("stderrthreshold", strconv.FormatInt(int64(config.Log.StderrThreshold), 10)) // sync the glog and klog flags. pflag.CommandLine.VisitAll(func(f1 *pflag.Flag) { @@ -151,11 +130,32 @@ func Parse() (*Configuration, error) { pflag.CommandLine.AddGoFlagSet(flag.CommandLine) pflag.Parse() + // armorigo log + level, err := log.ParseLevel(*argArmorigoLogLevel) + if err != nil { + fmt.Println("parse log level err:", err) + return nil, err + } + log.SetLevel(level) + log.SetOutput(os.Stdout) + + // config file + if *argConfigFile != "" { + // TODO the command-line is prior to config file + data, err := os.ReadFile(*argConfigFile) + if err != nil { + return nil, err + } + config = &Configuration{} + if err = yaml.Unmarshal(data, config); err != nil { + return nil, err + } + } + if config == nil { config = &Configuration{} } config.Daemon.RLimit.NumFile = *argDaemonRLimitNofile - return config, nil } diff --git a/pkg/config/config.yaml b/pkg/config/config.yaml index 40e73fb..f29dfd0 100644 --- a/pkg/config/config.yaml +++ b/pkg/config/config.yaml @@ -8,7 +8,7 @@ edgebound: network: tcp addr: 0.0.0.0:2432 tls: - enable: true + enable: false mtls: true ca_certs: - ca1.cert @@ -17,11 +17,11 @@ edgebound: - cert: edgebound.cert key: edgebound.key bypass: - enable: true + enable: false network: tcp addr: 192.168.1.10:8443 tls: - enable: true + enable: false mtls: true ca_certs: - ca1.cert @@ -34,7 +34,7 @@ servicebound: network: tcp addr: 0.0.0.0:2431 tls: - enable: true + enable: false mtls: true ca_certs: - ca1.cert diff --git a/pkg/edgebound/edge_dataplane.go b/pkg/edgebound/edge_dataplane.go index 6579d51..3a6159d 100644 --- a/pkg/edgebound/edge_dataplane.go +++ b/pkg/edgebound/edge_dataplane.go @@ -33,7 +33,7 @@ func (em *edgeManager) closedStream(stream geminio.Stream) { func (em *edgeManager) forward(end geminio.End) { edgeID := end.ClientID() meta := end.Meta() - klog.V(5).Infof("edge forward stream, edgeID: %d, meta: %s", edgeID, meta) + klog.V(5).Infof("edge forward raw message and rpc, edgeID: %d, meta: %s", edgeID, meta) if em.exchange != nil { em.exchange.ForwardToService(end) } diff --git a/pkg/edgebound/edge_manager.go b/pkg/edgebound/edge_manager.go index 5dbaa4b..c8aff19 100644 --- a/pkg/edgebound/edge_manager.go +++ b/pkg/edgebound/edge_manager.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "net" "os" + "strings" "sync" "github.com/jumboframes/armorigo/rproxy" @@ -77,12 +78,14 @@ func newEdgeManager(conf *config.Configuration, dao *dao.Dao, informer api.EdgeI streams: mapmap.NewMapMap(), dao: dao, shub: synchub.NewSyncHub(synchub.OptionTimer(tmr)), + edges: make(map[uint64]geminio.End), UnimplementedDelegate: &delegate.UnimplementedDelegate{}, // a simple unix timestamp incemental id factory idFactory: id.DefaultIncIDCounter, informer: informer, exchange: exchange, } + exchange.AddEdgebound(em) if !listen.TLS.Enable { if ln, err = net.Listen(network, addr); err != nil { @@ -240,7 +243,9 @@ func (em *edgeManager) Serve() { for { conn, err := em.geminioLn.Accept() if err != nil { - klog.V(4).Infof("edge manager listener accept err: %s", err) + if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { + klog.V(4).Infof("edge manager listener accept err: %s", err) + } return } go em.handleConn(conn) @@ -270,6 +275,7 @@ func (em *edgeManager) handleConn(conn net.Conn) error { return nil } +// management apis func (em *edgeManager) GetEdgeByID(edgeID uint64) geminio.End { em.mtx.RLock() defer em.mtx.RUnlock() diff --git a/pkg/edgebound/edge_onoff.go b/pkg/edgebound/edge_onoff.go index f188e02..a27965d 100644 --- a/pkg/edgebound/edge_onoff.go +++ b/pkg/edgebound/edge_onoff.go @@ -1,12 +1,12 @@ package edgebound import ( - "errors" "net" "strconv" "time" "github.com/jumboframes/armorigo/synchub" + "github.com/singchia/frontier/pkg/api" "github.com/singchia/frontier/pkg/repo/dao" "github.com/singchia/frontier/pkg/repo/model" "github.com/singchia/geminio" @@ -95,20 +95,26 @@ func (em *edgeManager) offline(edgeID uint64, addr net.Addr) error { // delegations for all ends from edgebound, called by geminio func (em *edgeManager) ConnOnline(d delegate.ConnDescriber) error { edgeID := d.ClientID() - meta := string(d.Meta()) + meta := d.Meta() addr := d.RemoteAddr() - klog.V(4).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, meta, addr) - // notification for others + + klog.V(4).Infof("edge online, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) + // inform others if em.informer != nil { em.informer.EdgeOnline(edgeID, d.Meta(), addr) } + // exchange to service + if em.exchange != nil { + return em.exchange.EdgeOnline(edgeID, meta, addr) + } return nil } func (em *edgeManager) ConnOffline(d delegate.ConnDescriber) error { edgeID := d.ClientID() - meta := string(d.Meta()) + meta := d.Meta() addr := d.RemoteAddr() + klog.V(4).Infof("edge offline, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) // offline the cache err := em.offline(edgeID, addr) @@ -117,10 +123,14 @@ func (em *edgeManager) ConnOffline(d delegate.ConnDescriber) error { err, edgeID, string(meta), addr) return err } + // inform others if em.informer != nil { em.informer.EdgeOffline(edgeID, d.Meta(), addr) } - // notification for others + // exchange to service + if em.exchange != nil { + return em.exchange.EdgeOffline(edgeID, meta, addr) + } return nil } @@ -128,7 +138,7 @@ func (em *edgeManager) Heartbeat(d delegate.ConnDescriber) error { edgeID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(5).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) + klog.V(6).Infof("edge heartbeat, edgeID: %d, meta: %s, addr: %s", edgeID, string(meta), addr) if em.informer != nil { em.informer.EdgeHeartbeat(edgeID, d.Meta(), addr) } @@ -151,9 +161,18 @@ func (em *edgeManager) RemoteRegistration(rpc string, edgeID, streamID uint64) { } func (em *edgeManager) GetClientID(meta []byte) (uint64, error) { - // TODO - if em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn { + var ( + edgeID uint64 + err error + ) + if em.exchange != nil { + edgeID, err = em.exchange.GetEdgeID(meta) + if err == nil { + return edgeID, err + } + } + if err == api.ErrRecordNotFound && em.conf.Edgebound.EdgeIDAllocWhenNoIDServiceOn { return em.idFactory.GetID(), nil } - return 0, errors.New("unable to get an edgeID") + return 0, err } diff --git a/pkg/exchange/forward.go b/pkg/exchange/forward.go index 902872d..0d6bf21 100644 --- a/pkg/exchange/forward.go +++ b/pkg/exchange/forward.go @@ -79,6 +79,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { klog.Errorf("forward message, serviceID: %d, receive err: %s", serviceID, err) continue } + klog.V(7).Infof("forward message, receive msg: %s from: %d", string(msg.Data()), end.ClientID()) // get target edgeID custom := msg.Custom() edgeID := binary.BigEndian.Uint64(custom[len(custom)-8:]) @@ -92,6 +93,7 @@ func (ex *exchange) forwardMessageToEdge(end geminio.End) { return } // publish in sync, TODO publish in async + msg.SetClientID(edgeID) err = edge.Publish(context.TODO(), msg) if err != nil { klog.V(5).Infof("forward message, serviceID: %d, publish edge: %d err: %s", serviceID, edgeID, err) diff --git a/pkg/exchange/oob.go b/pkg/exchange/oob.go new file mode 100644 index 0000000..d5494a5 --- /dev/null +++ b/pkg/exchange/oob.go @@ -0,0 +1,96 @@ +package exchange + +import ( + "context" + "encoding/binary" + "encoding/json" + "net" + + "github.com/singchia/frontier/pkg/api" + "k8s.io/klog/v2" +) + +func (ex *exchange) GetEdgeID(meta []byte) (uint64, error) { + svc, err := ex.Servicebound.GetServiceByRPC(api.RPCGetEdgeID) + if err != nil { + klog.Errorf("exchange get edgeID, get service err: %s, meta: %s", err, string(meta)) + if err == api.ErrRecordNotFound { + return 0, api.ErrServiceNotOnline + } + return 0, err + } + // call service + req := svc.NewRequest(meta) + rsp, err := svc.Call(context.TODO(), api.RPCGetEdgeID, req) + if err != nil { + klog.V(5).Infof("exchange call service: %d, get edgeID err: %s, meta: %s", svc.ClientID(), err, meta) + return 0, err + } + data := rsp.Data() + if data == nil || len(data) != 8 { + return 0, api.ErrIllegalEdgeID + } + return binary.BigEndian.Uint64(data), nil +} + +func (ex *exchange) EdgeOnline(edgeID uint64, meta []byte, addr net.Addr) error { + svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOnline) + if err != nil { + klog.Errorf("exchange edge online, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + if err == api.ErrRecordNotFound { + return api.ErrServiceNotOnline + } + return err + } + // call service the edge online event + event := &api.OnEdgeOnline{ + EdgeID: edgeID, + Meta: meta, + Net: addr.Network(), + Str: addr.String(), + } + data, err := json.Marshal(event) + if err != nil { + klog.Errorf("exchange edge online, json marshal err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + return err + } + // call service + req := svc.NewRequest(data) + _, err = svc.Call(context.TODO(), api.RPCEdgeOnline, req) + if err != nil { + klog.V(5).Infof("exchange call service: %d, edge online err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) + return err + } + return nil +} + +func (ex *exchange) EdgeOffline(edgeID uint64, meta []byte, addr net.Addr) error { + svc, err := ex.Servicebound.GetServiceByRPC(api.RPCEdgeOffline) + if err != nil { + klog.Errorf("exchange edge offline, get service err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + if err == api.ErrRecordNotFound { + return api.ErrServiceNotOnline + } + return err + } + // call service the edge offline event + event := &api.OnEdgeOffline{ + EdgeID: edgeID, + Meta: meta, + Net: addr.Network(), + Str: addr.String(), + } + data, err := json.Marshal(event) + if err != nil { + klog.Errorf("exchange edge offline, json marshal err: %s, edgeID: %d, meta: %s, addr: %s", err, edgeID, string(meta), addr) + return err + } + // call service + req := svc.NewRequest(data) + _, err = svc.Call(context.TODO(), api.RPCEdgeOffline, req) + if err != nil { + klog.V(5).Infof("exchange call service: %d, edge offline err: %s, meta: %s, addr: %s", svc.ClientID(), err, meta, addr) + return err + } + return nil +} diff --git a/pkg/servicebound/service_dataplane.go b/pkg/servicebound/service_dataplane.go index 467a58d..8698969 100644 --- a/pkg/servicebound/service_dataplane.go +++ b/pkg/servicebound/service_dataplane.go @@ -34,7 +34,7 @@ func (sm *serviceManager) closedStream(stream geminio.Stream) { func (sm *serviceManager) forward(meta *api.Meta, end geminio.End) { serviceID := end.ClientID() service := meta.Service - klog.V(5).Infof("service forward stream, serviceID: %d, service: %s", serviceID, service) + klog.V(5).Infof("service forward raw message and rpc, serviceID: %d, service: %s", serviceID, service) if sm.exchange != nil { sm.exchange.ForwardToEdge(meta, end) } diff --git a/pkg/servicebound/service_manager.go b/pkg/servicebound/service_manager.go index 6c04a15..90c7a19 100644 --- a/pkg/servicebound/service_manager.go +++ b/pkg/servicebound/service_manager.go @@ -6,6 +6,7 @@ import ( "encoding/json" "net" "os" + "strings" "sync" "github.com/jumboframes/armorigo/synchub" @@ -77,6 +78,7 @@ func newServiceManager(conf *config.Configuration, dao *dao.Dao, informer api.Se // a simple unix timestamp incremental id factory idFactory: id.DefaultIncIDCounter, informer: informer, + exchange: exchange, } exchange.AddServicebound(sm) @@ -143,7 +145,9 @@ func (sm *serviceManager) Serve() { for { conn, err := sm.ln.Accept() if err != nil { - klog.V(4).Infof("service manager listener accept err: %s", err) + if !strings.Contains(err.Error(), api.ErrStrUseOfClosedConnection) { + klog.V(4).Infof("service manager listener accept err: %s", err) + } return } go sm.handleConn(conn) diff --git a/pkg/servicebound/service_onoff.go b/pkg/servicebound/service_onoff.go index d0f588e..bb25e8f 100644 --- a/pkg/servicebound/service_onoff.go +++ b/pkg/servicebound/service_onoff.go @@ -132,7 +132,7 @@ func (sm *serviceManager) Heartbeat(d delegate.ConnDescriber) error { serviceID := d.ClientID() meta := string(d.Meta()) addr := d.RemoteAddr() - klog.V(5).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr) + klog.V(6).Infof("service heartbeat, serviceID: %d, meta: %s, addr: %s", serviceID, string(meta), addr) if sm.informer != nil { sm.informer.ServiceHeartbeat(serviceID, meta, addr) } From a80a8057706d67f747aeb4664299308efbd44a6f Mon Sep 17 00:00:00 2001 From: singchia Date: Tue, 20 Feb 2024 20:17:47 +0800 Subject: [PATCH 2/2] add top layer makefile --- Makefile | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7b17552 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +all: frontier examples + +.PHONY: frontier +frontier: + go build -trimpath -ldflags "-s -w" -o ./frontier cmd/frontier/main.go + +.PHONY: frontier-linux +frontier-linux: + CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags "-s -w" -o ./frontier cmd/frontier/main.go + +#docker: linux +# docker build -t harbor.moresec.cn/moresec/ms_gw:1.4.0 . +# docker push harbor.moresec.cn/moresec/ms_gw:1.4.0 + +.PHONY: examples +examples: + make -C examples + +.PHONY: clean +clean: + rm ./frontier + rm ./examples/iclm/iclm_edge + rm ./examples/iclm/iclm_service + +.PHONY: output +output: build + +