From b18f09cbe29c590ff53d50e4acba746df5ad4128 Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Fri, 30 Aug 2024 14:27:54 +0200 Subject: [PATCH 01/24] sse endpoint impl --- .../internal/cluster/k8sclient/client.go | 14 +++ .../internal/controller/sse/proxychan.go | 55 +++++++++++ .../internal/controller/sse/resources.go | 79 +++++++++++++++ .../internal/controller/sse/server.go | 95 +++++++++++++++++++ cyclops-ctrl/internal/handler/handler.go | 8 +- 5 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 cyclops-ctrl/internal/controller/sse/proxychan.go create mode 100644 cyclops-ctrl/internal/controller/sse/resources.go create mode 100644 cyclops-ctrl/internal/controller/sse/server.go diff --git a/cyclops-ctrl/internal/cluster/k8sclient/client.go b/cyclops-ctrl/internal/cluster/k8sclient/client.go index d1589fdb..5f4d56c2 100644 --- a/cyclops-ctrl/internal/cluster/k8sclient/client.go +++ b/cyclops-ctrl/internal/cluster/k8sclient/client.go @@ -9,6 +9,7 @@ import ( "io" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" "os" "os/exec" "sort" @@ -1077,3 +1078,16 @@ func isRole(group, version, kind string) bool { func isNetworkPolicy(group, version, kind string) bool { return group == "networking.k8s.io" && version == "v1" && kind == "NetworkPolicy" } + +func (k *KubernetesClient) WatchResource(group, version, resource, name, namespace string) (watch.Interface, error) { + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: resource, + } + + // Start the watch + return k.Dynamic.Resource(gvr).Namespace(namespace).Watch(context.Background(), metav1.ListOptions{ + FieldSelector: "metadata.name=" + name, + }) +} diff --git a/cyclops-ctrl/internal/controller/sse/proxychan.go b/cyclops-ctrl/internal/controller/sse/proxychan.go new file mode 100644 index 00000000..554bb7e0 --- /dev/null +++ b/cyclops-ctrl/internal/controller/sse/proxychan.go @@ -0,0 +1,55 @@ +package sse + +import ( + "context" + "fmt" + "k8s.io/apimachinery/pkg/watch" + "time" +) + +type ProxyChan struct { + update bool + input <-chan watch.Event + output chan any +} + +func NewProxyChan(ctx context.Context, input <-chan watch.Event, interval time.Duration) ProxyChan { + p := ProxyChan{ + input: input, + output: make(chan any), + } + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + batcher := time.NewTicker(time.Millisecond * 500) + defer batcher.Stop() + + for { + select { + case _, ok := <-p.input: + if !ok { + return + } + p.update = true + case <-ticker.C: + p.output <- true + case <-batcher.C: + if p.update { + p.output <- true + } + p.update = false + case <-ctx.Done(): + fmt.Println("ctx.Done jesam") + return + } + } + }() + + return p +} + +func (p ProxyChan) Events() chan any { + return p.output +} diff --git a/cyclops-ctrl/internal/controller/sse/resources.go b/cyclops-ctrl/internal/controller/sse/resources.go new file mode 100644 index 00000000..e112ca47 --- /dev/null +++ b/cyclops-ctrl/internal/controller/sse/resources.go @@ -0,0 +1,79 @@ +package sse + +import ( + "fmt" + "github.com/gin-gonic/gin" + json "github.com/json-iterator/go" + "io" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "net/http" + "time" +) + +type k8sEvents chan watch.Event + +func (s *Server) Resources(ctx *gin.Context) { + type Ref struct { + Group string `json:"group"` + Version string `json:"version"` + Kind string `json:"kind"` + Name string `json:"name"` + Namespace string `json:"namespace"` + } + + var r *Ref + if err := ctx.BindJSON(&r); err != nil { + ctx.Status(http.StatusBadRequest) + return + } + + resourceName, err := s.k8sClient.GVKtoAPIResourceName( + schema.GroupVersion{ + Group: r.Group, + Version: r.Version, + }, + r.Kind, + ) + + watchResource, err := s.k8sClient.WatchResource(r.Group, r.Version, resourceName, r.Name, r.Namespace) + if err != nil { + ctx.String(http.StatusInternalServerError, err.Error()) + return + } + + p := NewProxyChan(ctx.Request.Context(), watchResource.ResultChan(), time.Second*5) + + ctx.Stream(func(w io.Writer) bool { + for { + select { + case msg, ok := <-p.Events(): + if !ok { + return false + } + + res, err := s.k8sClient.GetResource( + r.Group, + r.Version, + r.Kind, + r.Name, + r.Namespace, + ) + + if err != nil { + continue + } + + d, _ := json.Marshal(msg) + fmt.Println(string(d)) + + ctx.SSEvent("resource-update", res) + return true + case <-ctx.Request.Context().Done(): + watchResource.Stop() + return false + } + } + }) + +} diff --git a/cyclops-ctrl/internal/controller/sse/server.go b/cyclops-ctrl/internal/controller/sse/server.go new file mode 100644 index 00000000..bf7504ca --- /dev/null +++ b/cyclops-ctrl/internal/controller/sse/server.go @@ -0,0 +1,95 @@ +package sse + +import ( + "fmt" + "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/cluster/k8sclient" + "github.com/gin-gonic/gin" + "log" +) + +type Server struct { + // Events are pushed to this channel by the main events-gathering routine + Message k8sEvents + + // New client connections + NewClients chan k8sEvents + + // Closed client connections + ClosedClients chan k8sEvents + + // Total client connections + TotalClients map[k8sEvents]bool + + k8sClient *k8sclient.KubernetesClient +} + +// Initialize event and Start procnteessing requests +func NewServer(k8sClient *k8sclient.KubernetesClient) *Server { + server := &Server{ + Message: make(k8sEvents), + NewClients: make(chan k8sEvents), + ClosedClients: make(chan k8sEvents), + TotalClients: make(map[k8sEvents]bool), + k8sClient: k8sClient, + } + + go server.listen() + + return server +} + +// It Listens all incoming requests from clients. +// Handles addition and removal of clients and broadcast messages to clients. +func (s *Server) listen() { + for { + select { + // Add new available client + case client := <-s.NewClients: + s.TotalClients[client] = true + log.Printf("Client added. %d registered clients", len(s.TotalClients)) + + // Remove closed client + case client := <-s.ClosedClients: + delete(s.TotalClients, client) + close(client) + log.Printf("Removed client. %d registered clients", len(s.TotalClients)) + + // Broadcast message to client + case eventMsg := <-s.Message: + for clientMessageChan := range s.TotalClients { + clientMessageChan <- eventMsg + } + } + } +} + +func (s *Server) ServeHTTP() gin.HandlerFunc { + return func(c *gin.Context) { + // Initialize client channel + clientChan := make(k8sEvents) + + // Send new connection to event server + s.NewClients <- clientChan + + defer func() { + // Send closed connection to event server + s.ClosedClients <- clientChan + }() + + c.Set("clientChan", clientChan) + + c.Next() + + fmt.Println("gotov s next") + } +} + +func HeadersMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Next() + } +} diff --git a/cyclops-ctrl/internal/handler/handler.go b/cyclops-ctrl/internal/handler/handler.go index 391feb64..e771a1b3 100644 --- a/cyclops-ctrl/internal/handler/handler.go +++ b/cyclops-ctrl/internal/handler/handler.go @@ -1,9 +1,9 @@ package handler import ( - "net/http" - + "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/controller/sse" "github.com/gin-gonic/gin" + "net/http" "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/cluster/k8sclient" "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/controller" @@ -50,6 +50,10 @@ func (h *Handler) Start() error { h.router = gin.New() + server := sse.NewServer(h.k8sClient) + + h.router.POST("/stream/resources", sse.HeadersMiddleware(), server.ServeHTTP(), server.Resources) + h.router.GET("/ping", h.pong()) // templates From 735d65be751f7fb404333dfeea72c9d52346021d Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Fri, 30 Aug 2024 14:28:25 +0200 Subject: [PATCH 02/24] fetch sse endpoint impl --- cyclops-ui/package.json | 1 + .../components/k8s-resources/Deployment.tsx | 33 ++++++++++++++--- cyclops-ui/src/components/layouts/Sidebar.tsx | 6 ++-- .../src/components/pages/Modules/Modules.tsx | 3 ++ cyclops-ui/src/setupProxy.js | 3 ++ cyclops-ui/yarn.lock | 36 +++++-------------- 6 files changed, 47 insertions(+), 35 deletions(-) diff --git a/cyclops-ui/package.json b/cyclops-ui/package.json index 8f00d5ef..fb61c632 100644 --- a/cyclops-ui/package.json +++ b/cyclops-ui/package.json @@ -7,6 +7,7 @@ "@ant-design/compatible": "^5.1.1", "@ant-design/icons": "^5.3.3", "@babel/preset-typescript": "^7.22.5", + "@microsoft/fetch-event-source": "^2.0.1", "@rmlio/matey": "^1.0.4", "@testing-library/jest-dom": "^5.16.2", "@testing-library/react": "^12.1.3", diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index e47cf962..dac69332 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -3,6 +3,7 @@ import { Col, Divider, Row, Alert } from "antd"; import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; +import { fetchEventSource } from "@microsoft/fetch-event-source"; interface Props { name: string; @@ -19,6 +20,30 @@ const Deployment = ({ name, namespace }: Props) => { description: "", }); + useEffect(() => { + console.log("sse start"); + + fetchEventSource( + `/api/stream/resources?group=apps&version=v1&kind=Deployment&name=${name}&namespace=${namespace}`, + { + method: "POST", + body: JSON.stringify({ + group: `apps`, + version: `v1`, + kind: `Deployment`, + name: name, + namespace: namespace, + }), + onmessage(ev) { + console.log("sse", ev.data); + setDeployment(JSON.parse(ev.data)); + }, + }, + ).then((r) => { + console.log("done"); + }); + }, []); + useEffect(() => { function fetchDeployment() { axios @@ -40,10 +65,10 @@ const Deployment = ({ name, namespace }: Props) => { } fetchDeployment(); - const interval = setInterval(() => fetchDeployment(), 15000); - return () => { - clearInterval(interval); - }; + // const interval = setInterval(() => fetchDeployment(), 15000); + // return () => { + // clearInterval(interval); + // }; }, [name, namespace]); return ( diff --git a/cyclops-ui/src/components/layouts/Sidebar.tsx b/cyclops-ui/src/components/layouts/Sidebar.tsx index e4e662f6..096eec06 100644 --- a/cyclops-ui/src/components/layouts/Sidebar.tsx +++ b/cyclops-ui/src/components/layouts/Sidebar.tsx @@ -17,17 +17,17 @@ const SideNav = () => { const sidebarItems: MenuProps["items"] = [ { - label: Modules, + label: Modules, icon: , key: "modules", }, { - label: Nodes, + label: Nodes, icon: , key: "nodes", }, { - label: Templates, + label: Templates, icon: , key: "templates", }, diff --git a/cyclops-ui/src/components/pages/Modules/Modules.tsx b/cyclops-ui/src/components/pages/Modules/Modules.tsx index 791840f2..5cd933ca 100644 --- a/cyclops-ui/src/components/pages/Modules/Modules.tsx +++ b/cyclops-ui/src/components/pages/Modules/Modules.tsx @@ -14,7 +14,10 @@ import { Checkbox, } from "antd"; import { useNavigate } from "react-router"; + import axios from "axios"; +import { fetchEventSource } from "@microsoft/fetch-event-source"; + import Link from "antd/lib/typography/Link"; import styles from "./styles.module.css"; diff --git a/cyclops-ui/src/setupProxy.js b/cyclops-ui/src/setupProxy.js index 46b2d52d..ce399ac6 100644 --- a/cyclops-ui/src/setupProxy.js +++ b/cyclops-ui/src/setupProxy.js @@ -9,6 +9,9 @@ module.exports = function (app) { pathRewrite: { "^/api": "", }, + onProxyReq: (proxyRes, req, res) => { + res.on("close", () => proxyRes.destroy()); + }, }), ); }; diff --git a/cyclops-ui/yarn.lock b/cyclops-ui/yarn.lock index b6fc0332..a4899b7d 100644 --- a/cyclops-ui/yarn.lock +++ b/cyclops-ui/yarn.lock @@ -2995,6 +2995,11 @@ rw "^1.3.3" sort-object "^3.0.3" +"@microsoft/fetch-event-source@^2.0.1": + version "2.0.1" + resolved "https://registry.yarnpkg.com/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz#9ceecc94b49fbaa15666e38ae8587f64acce007d" + integrity sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA== + "@nicolo-ribaudo/eslint-scope-5-internals@5.1.1-v1": version "5.1.1-v1" resolved "https://registry.yarnpkg.com/@nicolo-ribaudo/eslint-scope-5-internals/-/eslint-scope-5-internals-5.1.1-v1.tgz#dbf733a965ca47b1973177dc0bb6c889edcfb129" @@ -13401,16 +13406,7 @@ string-natural-compare@^3.0.1: resolved "https://registry.yarnpkg.com/string-natural-compare/-/string-natural-compare-3.0.1.tgz#7a42d58474454963759e8e8b7ae63d71c1e7fdf4" integrity sha512-n3sPwynL1nwKi3WJ6AIsClwBMa0zTi54fn2oLU6ndfTSIO05xaznjSf15PcBZU6FNWbmN5Q6cxT4V5hGvB4taw== -"string-width-cjs@npm:string-width@^4.2.0": - version "4.2.3" - resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" - integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== - dependencies: - emoji-regex "^8.0.0" - is-fullwidth-code-point "^3.0.0" - strip-ansi "^6.0.1" - -string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0", string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.3: version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -13502,7 +13498,7 @@ stringify-object@^3.3.0: is-obj "^1.0.1" is-regexp "^1.0.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1": +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -13516,13 +13512,6 @@ strip-ansi@^3.0.0: dependencies: ansi-regex "^2.0.0" -strip-ansi@^6.0.0, strip-ansi@^6.0.1: - version "6.0.1" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" - integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== - dependencies: - ansi-regex "^5.0.1" - strip-ansi@^7.0.1, strip-ansi@^7.1.0: version "7.1.0" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-7.1.0.tgz#d5b6568ca689d8561370b0707685d22434faff45" @@ -14850,7 +14839,7 @@ workbox-window@6.6.1: "@types/trusted-types" "^2.0.2" workbox-core "6.6.1" -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0: version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== @@ -14868,15 +14857,6 @@ wrap-ansi@^6.2.0: string-width "^4.1.0" strip-ansi "^6.0.0" -wrap-ansi@^7.0.0: - version "7.0.0" - resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" - integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== - dependencies: - ansi-styles "^4.0.0" - string-width "^4.1.0" - strip-ansi "^6.0.0" - wrap-ansi@^8.1.0: version "8.1.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-8.1.0.tgz#56dc22368ee570face1b49819975d9b9a5ead214" From 40ca3b8045f777c64b7299f106ac6d088a5aee9d Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Fri, 30 Aug 2024 14:32:53 +0200 Subject: [PATCH 03/24] fix failing build --- cyclops-ui/src/components/k8s-resources/Deployment.tsx | 2 +- cyclops-ui/src/components/pages/Modules/Modules.tsx | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index dac69332..bf1189bb 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -42,7 +42,7 @@ const Deployment = ({ name, namespace }: Props) => { ).then((r) => { console.log("done"); }); - }, []); + }, [name, namespace]); useEffect(() => { function fetchDeployment() { diff --git a/cyclops-ui/src/components/pages/Modules/Modules.tsx b/cyclops-ui/src/components/pages/Modules/Modules.tsx index 5cd933ca..69ca9eb8 100644 --- a/cyclops-ui/src/components/pages/Modules/Modules.tsx +++ b/cyclops-ui/src/components/pages/Modules/Modules.tsx @@ -16,7 +16,6 @@ import { import { useNavigate } from "react-router"; import axios from "axios"; -import { fetchEventSource } from "@microsoft/fetch-event-source"; import Link from "antd/lib/typography/Link"; From f5152f74222ad6fd94b82cfed4117f856b63889e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 30 Aug 2024 12:56:03 +0000 Subject: [PATCH 04/24] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20update=20cyclops=20t?= =?UTF-8?q?o=20v0.11.0-rc-streaming?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- install/cyclops-install.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/install/cyclops-install.yaml b/install/cyclops-install.yaml index 400795cc..80a39fcd 100644 --- a/install/cyclops-install.yaml +++ b/install/cyclops-install.yaml @@ -334,7 +334,7 @@ spec: spec: containers: - name: cyclops-ui - image: cyclopsui/cyclops-ui:v0.10.0 + image: cyclopsui/cyclops-ui:v0.11.0-rc-streaming ports: - containerPort: 80 env: @@ -397,7 +397,7 @@ spec: serviceAccountName: cyclops-ctrl containers: - name: cyclops-ctrl - image: cyclopsui/cyclops-ctrl:v0.10.0 + image: cyclopsui/cyclops-ctrl:v0.11.0-rc-streaming ports: - containerPort: 8080 env: From ec3661cae203beb479fbde0f3dc7ea58bdef2e29 Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Sun, 1 Sep 2024 12:14:34 +0200 Subject: [PATCH 05/24] remove client tracking --- .../internal/controller/sse/proxychan.go | 5 +- .../internal/controller/sse/resources.go | 17 ++--- .../internal/controller/sse/server.go | 71 +------------------ cyclops-ctrl/internal/handler/handler.go | 2 +- 4 files changed, 11 insertions(+), 84 deletions(-) diff --git a/cyclops-ctrl/internal/controller/sse/proxychan.go b/cyclops-ctrl/internal/controller/sse/proxychan.go index 554bb7e0..d6362177 100644 --- a/cyclops-ctrl/internal/controller/sse/proxychan.go +++ b/cyclops-ctrl/internal/controller/sse/proxychan.go @@ -2,9 +2,9 @@ package sse import ( "context" - "fmt" - "k8s.io/apimachinery/pkg/watch" "time" + + "k8s.io/apimachinery/pkg/watch" ) type ProxyChan struct { @@ -41,7 +41,6 @@ func NewProxyChan(ctx context.Context, input <-chan watch.Event, interval time.D } p.update = false case <-ctx.Done(): - fmt.Println("ctx.Done jesam") return } } diff --git a/cyclops-ctrl/internal/controller/sse/resources.go b/cyclops-ctrl/internal/controller/sse/resources.go index e112ca47..2fe96129 100644 --- a/cyclops-ctrl/internal/controller/sse/resources.go +++ b/cyclops-ctrl/internal/controller/sse/resources.go @@ -1,17 +1,13 @@ package sse import ( - "fmt" - "github.com/gin-gonic/gin" - json "github.com/json-iterator/go" "io" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" "net/http" "time" -) -type k8sEvents chan watch.Event + "github.com/gin-gonic/gin" + "k8s.io/apimachinery/pkg/runtime/schema" +) func (s *Server) Resources(ctx *gin.Context) { type Ref struct { @@ -47,7 +43,7 @@ func (s *Server) Resources(ctx *gin.Context) { ctx.Stream(func(w io.Writer) bool { for { select { - case msg, ok := <-p.Events(): + case _, ok := <-p.Events(): if !ok { return false } @@ -59,18 +55,15 @@ func (s *Server) Resources(ctx *gin.Context) { r.Name, r.Namespace, ) - if err != nil { continue } - d, _ := json.Marshal(msg) - fmt.Println(string(d)) - ctx.SSEvent("resource-update", res) return true case <-ctx.Request.Context().Done(): watchResource.Stop() + close(p.output) return false } } diff --git a/cyclops-ctrl/internal/controller/sse/server.go b/cyclops-ctrl/internal/controller/sse/server.go index bf7504ca..f831ad96 100644 --- a/cyclops-ctrl/internal/controller/sse/server.go +++ b/cyclops-ctrl/internal/controller/sse/server.go @@ -1,89 +1,24 @@ package sse import ( - "fmt" - "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/cluster/k8sclient" "github.com/gin-gonic/gin" - "log" + + "github.com/cyclops-ui/cyclops/cyclops-ctrl/internal/cluster/k8sclient" ) type Server struct { - // Events are pushed to this channel by the main events-gathering routine - Message k8sEvents - - // New client connections - NewClients chan k8sEvents - - // Closed client connections - ClosedClients chan k8sEvents - - // Total client connections - TotalClients map[k8sEvents]bool - k8sClient *k8sclient.KubernetesClient } // Initialize event and Start procnteessing requests func NewServer(k8sClient *k8sclient.KubernetesClient) *Server { server := &Server{ - Message: make(k8sEvents), - NewClients: make(chan k8sEvents), - ClosedClients: make(chan k8sEvents), - TotalClients: make(map[k8sEvents]bool), - k8sClient: k8sClient, + k8sClient: k8sClient, } - go server.listen() - return server } -// It Listens all incoming requests from clients. -// Handles addition and removal of clients and broadcast messages to clients. -func (s *Server) listen() { - for { - select { - // Add new available client - case client := <-s.NewClients: - s.TotalClients[client] = true - log.Printf("Client added. %d registered clients", len(s.TotalClients)) - - // Remove closed client - case client := <-s.ClosedClients: - delete(s.TotalClients, client) - close(client) - log.Printf("Removed client. %d registered clients", len(s.TotalClients)) - - // Broadcast message to client - case eventMsg := <-s.Message: - for clientMessageChan := range s.TotalClients { - clientMessageChan <- eventMsg - } - } - } -} - -func (s *Server) ServeHTTP() gin.HandlerFunc { - return func(c *gin.Context) { - // Initialize client channel - clientChan := make(k8sEvents) - - // Send new connection to event server - s.NewClients <- clientChan - - defer func() { - // Send closed connection to event server - s.ClosedClients <- clientChan - }() - - c.Set("clientChan", clientChan) - - c.Next() - - fmt.Println("gotov s next") - } -} - func HeadersMiddleware() gin.HandlerFunc { return func(c *gin.Context) { c.Writer.Header().Set("Content-Type", "text/event-stream") diff --git a/cyclops-ctrl/internal/handler/handler.go b/cyclops-ctrl/internal/handler/handler.go index e771a1b3..58a3d0f1 100644 --- a/cyclops-ctrl/internal/handler/handler.go +++ b/cyclops-ctrl/internal/handler/handler.go @@ -52,7 +52,7 @@ func (h *Handler) Start() error { server := sse.NewServer(h.k8sClient) - h.router.POST("/stream/resources", sse.HeadersMiddleware(), server.ServeHTTP(), server.Resources) + h.router.POST("/stream/resources", sse.HeadersMiddleware(), server.Resources) h.router.GET("/ping", h.pong()) From f5ec5609753bf83b9d25174c3ad1be1876e8a91b Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Sun, 1 Sep 2024 12:24:27 +0200 Subject: [PATCH 06/24] remove unused params --- .../components/k8s-resources/Deployment.tsx | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index bf1189bb..e25f0aa0 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -23,23 +23,19 @@ const Deployment = ({ name, namespace }: Props) => { useEffect(() => { console.log("sse start"); - fetchEventSource( - `/api/stream/resources?group=apps&version=v1&kind=Deployment&name=${name}&namespace=${namespace}`, - { - method: "POST", - body: JSON.stringify({ - group: `apps`, - version: `v1`, - kind: `Deployment`, - name: name, - namespace: namespace, - }), - onmessage(ev) { - console.log("sse", ev.data); - setDeployment(JSON.parse(ev.data)); - }, + fetchEventSource(`/api/stream/resources`, { + method: "POST", + body: JSON.stringify({ + group: `apps`, + version: `v1`, + kind: `Deployment`, + name: name, + namespace: namespace, + }), + onmessage(ev) { + setDeployment(JSON.parse(ev.data)); }, - ).then((r) => { + }).then((r) => { console.log("done"); }); }, [name, namespace]); From a4b8456b253922e04f996384bb46de4dacadefd4 Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Mon, 2 Sep 2024 11:22:07 +0200 Subject: [PATCH 07/24] destroy on proxy --- .../internal/controller/sse/resources.go | 17 +++++-- .../components/k8s-resources/Deployment.tsx | 49 +++++++++++++++++-- cyclops-ui/src/setupProxy.js | 13 +++++ 3 files changed, 70 insertions(+), 9 deletions(-) diff --git a/cyclops-ctrl/internal/controller/sse/resources.go b/cyclops-ctrl/internal/controller/sse/resources.go index 2fe96129..36b8ee4a 100644 --- a/cyclops-ctrl/internal/controller/sse/resources.go +++ b/cyclops-ctrl/internal/controller/sse/resources.go @@ -1,6 +1,7 @@ package sse import ( + "fmt" "io" "net/http" "time" @@ -11,11 +12,11 @@ import ( func (s *Server) Resources(ctx *gin.Context) { type Ref struct { - Group string `json:"group"` - Version string `json:"version"` - Kind string `json:"kind"` - Name string `json:"name"` - Namespace string `json:"namespace"` + Group string `json:"group" form:"group"` + Version string `json:"version" form:"version"` + Kind string `json:"kind" form:"kind"` + Name string `json:"name" form:"name"` + Namespace string `json:"namespace" form:"namespace"` } var r *Ref @@ -64,6 +65,12 @@ func (s *Server) Resources(ctx *gin.Context) { case <-ctx.Request.Context().Done(): watchResource.Stop() close(p.output) + fmt.Println("ctx.Request.Context().Done()") + return false + case <-ctx.Done(): + watchResource.Stop() + close(p.output) + fmt.Println("ctx.Done()") return false } } diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index e25f0aa0..f0cf0ce1 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -3,7 +3,10 @@ import { Col, Divider, Row, Alert } from "antd"; import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; -import { fetchEventSource } from "@microsoft/fetch-event-source"; +import { + EventStreamContentType, + fetchEventSource, +} from "@microsoft/fetch-event-source"; interface Props { name: string; @@ -23,6 +26,20 @@ const Deployment = ({ name, namespace }: Props) => { useEffect(() => { console.log("sse start"); + // const eventSource = new EventSource(`/api/stream/resources?group=apps&version=v1&kind=Deployment&name=${name}&namespace=${namespace}`); + // + // eventSource.onopen = function() { + // console.log("sse otvorio onopen"); + // }; + // + // eventSource.onmessage = function(event) { + // console.log("sse Received data: ", event); + // }; + // + // eventSource.onerror = function(event) { + // console.log("sse Connection lost. Retrying..."); + // }; + fetchEventSource(`/api/stream/resources`, { method: "POST", body: JSON.stringify({ @@ -35,9 +52,33 @@ const Deployment = ({ name, namespace }: Props) => { onmessage(ev) { setDeployment(JSON.parse(ev.data)); }, - }).then((r) => { - console.log("done"); - }); + onerror: (err) => { + console.error("Error occurred:", err); + }, + async onopen(response) { + if ( + response.ok && + response.headers.get("content-type") === EventStreamContentType + ) { + console.log("sse onopen all good"); + return; // everything's good + } else if ( + response.status >= 400 && + response.status < 500 && + response.status !== 429 + ) { + // client-side errors are usually non-retriable: + console.log("sse error client-side errors are usually non-retriable"); + } else { + console.log("sse error retry"); + } + }, + onclose: () => { + console.log("sse prekinuo"); + }, + }) + .then((r) => console.log("sse THEN")) + .catch((r) => console.log("see CATCH")); }, [name, namespace]); useEffect(() => { diff --git a/cyclops-ui/src/setupProxy.js b/cyclops-ui/src/setupProxy.js index ce399ac6..38042e2a 100644 --- a/cyclops-ui/src/setupProxy.js +++ b/cyclops-ui/src/setupProxy.js @@ -12,6 +12,19 @@ module.exports = function (app) { onProxyReq: (proxyRes, req, res) => { res.on("close", () => proxyRes.destroy()); }, + onProxyRes: (proxyRes, req, res) => { + proxyRes.on("close", () => { + if (!res.writableEnded) { + res.end(); + } + }); + + proxyRes.on("end", () => { + if (!res.writableEnded) { + res.end(); + } + }); + }, }), ); }; From 1b85ceee37fd538ea6be5b95d938dbce0f95873c Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Mon, 2 Sep 2024 18:13:16 +0200 Subject: [PATCH 08/24] extract sse subscribe to a function --- .../internal/controller/sse/proxychan.go | 4 ++ .../internal/controller/sse/resources.go | 2 +- .../components/k8s-resources/Deployment.tsx | 67 ++----------------- cyclops-ui/src/utils/api/sse/resources.tsx | 56 ++++++++++++++++ 4 files changed, 65 insertions(+), 64 deletions(-) create mode 100644 cyclops-ui/src/utils/api/sse/resources.tsx diff --git a/cyclops-ctrl/internal/controller/sse/proxychan.go b/cyclops-ctrl/internal/controller/sse/proxychan.go index d6362177..a812963e 100644 --- a/cyclops-ctrl/internal/controller/sse/proxychan.go +++ b/cyclops-ctrl/internal/controller/sse/proxychan.go @@ -33,13 +33,17 @@ func NewProxyChan(ctx context.Context, input <-chan watch.Event, interval time.D return } p.update = true + case <-ticker.C: p.output <- true + p.update = false + case <-batcher.C: if p.update { p.output <- true } p.update = false + case <-ctx.Done(): return } diff --git a/cyclops-ctrl/internal/controller/sse/resources.go b/cyclops-ctrl/internal/controller/sse/resources.go index 36b8ee4a..2a05a6ca 100644 --- a/cyclops-ctrl/internal/controller/sse/resources.go +++ b/cyclops-ctrl/internal/controller/sse/resources.go @@ -39,7 +39,7 @@ func (s *Server) Resources(ctx *gin.Context) { return } - p := NewProxyChan(ctx.Request.Context(), watchResource.ResultChan(), time.Second*5) + p := NewProxyChan(ctx.Request.Context(), watchResource.ResultChan(), time.Second*15) ctx.Stream(func(w io.Writer) bool { for { diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index f0cf0ce1..4829bf8e 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -3,10 +3,7 @@ import { Col, Divider, Row, Alert } from "antd"; import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; -import { - EventStreamContentType, - fetchEventSource, -} from "@microsoft/fetch-event-source"; +import { resourceStream } from "../../utils/api/sse/resources"; interface Props { name: string; @@ -24,61 +21,9 @@ const Deployment = ({ name, namespace }: Props) => { }); useEffect(() => { - console.log("sse start"); - - // const eventSource = new EventSource(`/api/stream/resources?group=apps&version=v1&kind=Deployment&name=${name}&namespace=${namespace}`); - // - // eventSource.onopen = function() { - // console.log("sse otvorio onopen"); - // }; - // - // eventSource.onmessage = function(event) { - // console.log("sse Received data: ", event); - // }; - // - // eventSource.onerror = function(event) { - // console.log("sse Connection lost. Retrying..."); - // }; - - fetchEventSource(`/api/stream/resources`, { - method: "POST", - body: JSON.stringify({ - group: `apps`, - version: `v1`, - kind: `Deployment`, - name: name, - namespace: namespace, - }), - onmessage(ev) { - setDeployment(JSON.parse(ev.data)); - }, - onerror: (err) => { - console.error("Error occurred:", err); - }, - async onopen(response) { - if ( - response.ok && - response.headers.get("content-type") === EventStreamContentType - ) { - console.log("sse onopen all good"); - return; // everything's good - } else if ( - response.status >= 400 && - response.status < 500 && - response.status !== 429 - ) { - // client-side errors are usually non-retriable: - console.log("sse error client-side errors are usually non-retriable"); - } else { - console.log("sse error retry"); - } - }, - onclose: () => { - console.log("sse prekinuo"); - }, - }) - .then((r) => console.log("sse THEN")) - .catch((r) => console.log("see CATCH")); + resourceStream(`apps`, `v1`, `Deployment`, name, namespace, (r: any) => { + setDeployment(r); + }); }, [name, namespace]); useEffect(() => { @@ -102,10 +47,6 @@ const Deployment = ({ name, namespace }: Props) => { } fetchDeployment(); - // const interval = setInterval(() => fetchDeployment(), 15000); - // return () => { - // clearInterval(interval); - // }; }, [name, namespace]); return ( diff --git a/cyclops-ui/src/utils/api/sse/resources.tsx b/cyclops-ui/src/utils/api/sse/resources.tsx new file mode 100644 index 00000000..6927e01c --- /dev/null +++ b/cyclops-ui/src/utils/api/sse/resources.tsx @@ -0,0 +1,56 @@ +import { + EventStreamContentType, + fetchEventSource, +} from "@microsoft/fetch-event-source"; + +class RetriableError extends Error {} +class FatalError extends Error {} + +export function resourceStream( + group: string, + version: string, + kind: string, + name: string, + namespace: string, + setResource: (r: any) => void, +) { + fetchEventSource(`/api/stream/resources`, { + method: "POST", + body: JSON.stringify({ + group: group, + version: version, + kind: kind, + name: name, + namespace: namespace, + }), + onmessage(ev) { + setResource(JSON.parse(ev.data)); + }, + async onopen(response) { + if ( + response.ok && + response.headers.get("content-type") === EventStreamContentType + ) { + return; // everything's good + } else if ( + response.status >= 400 && + response.status < 500 && + response.status !== 429 + ) { + throw new FatalError(); + } else { + throw new RetriableError(); + } + }, + onclose() { + throw new RetriableError(); + }, + onerror: (err) => { + if (err instanceof FatalError) { + throw err; // rethrow to stop the operation + } + + return 5000; + }, + }).catch((r) => console.error(r)); +} From 57e14e415d5feab3eb3c51de6bfdc163d1e427ba Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Mon, 2 Sep 2024 21:23:58 +0200 Subject: [PATCH 09/24] remove logging --- cyclops-ctrl/internal/cluster/k8sclient/client.go | 1 - cyclops-ctrl/internal/controller/sse/resources.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/cyclops-ctrl/internal/cluster/k8sclient/client.go b/cyclops-ctrl/internal/cluster/k8sclient/client.go index 5f4d56c2..2eb360ae 100644 --- a/cyclops-ctrl/internal/cluster/k8sclient/client.go +++ b/cyclops-ctrl/internal/cluster/k8sclient/client.go @@ -1086,7 +1086,6 @@ func (k *KubernetesClient) WatchResource(group, version, resource, name, namespa Resource: resource, } - // Start the watch return k.Dynamic.Resource(gvr).Namespace(namespace).Watch(context.Background(), metav1.ListOptions{ FieldSelector: "metadata.name=" + name, }) diff --git a/cyclops-ctrl/internal/controller/sse/resources.go b/cyclops-ctrl/internal/controller/sse/resources.go index 2a05a6ca..b6ae9b4a 100644 --- a/cyclops-ctrl/internal/controller/sse/resources.go +++ b/cyclops-ctrl/internal/controller/sse/resources.go @@ -1,7 +1,6 @@ package sse import ( - "fmt" "io" "net/http" "time" @@ -65,12 +64,10 @@ func (s *Server) Resources(ctx *gin.Context) { case <-ctx.Request.Context().Done(): watchResource.Stop() close(p.output) - fmt.Println("ctx.Request.Context().Done()") return false case <-ctx.Done(): watchResource.Stop() close(p.output) - fmt.Println("ctx.Done()") return false } } From 0b24558a253be8b8fefbf56b8bd7a9f66339cc8a Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Mon, 2 Sep 2024 21:35:34 +0200 Subject: [PATCH 10/24] support daemonsets and statefulsets --- cyclops-ui/src/components/k8s-resources/DaemonSet.tsx | 11 +++++++---- .../src/components/k8s-resources/StatefulSet.tsx | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx b/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx index 2d347383..4bbf10a4 100644 --- a/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx +++ b/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx @@ -3,6 +3,7 @@ import { Col, Divider, Row, Alert } from "antd"; import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; +import { resourceStream } from "../../utils/api/sse/resources"; interface Props { name: string; @@ -20,6 +21,12 @@ const DaemonSet = ({ name, namespace }: Props) => { description: "", }); + useEffect(() => { + resourceStream(`apps`, `v1`, `DaemonSet`, name, namespace, (r: any) => { + setDaemonSet(r); + }); + }, [name, namespace]); + const fetchDaemonSet = useCallback(() => { axios .get(`/api/resources`, { @@ -41,10 +48,6 @@ const DaemonSet = ({ name, namespace }: Props) => { useEffect(() => { fetchDaemonSet(); - const interval = setInterval(() => fetchDaemonSet(), 15000); - return () => { - clearInterval(interval); - }; }, [fetchDaemonSet]); return ( diff --git a/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx b/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx index 5aa1985f..11c9af4c 100644 --- a/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx +++ b/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx @@ -3,6 +3,7 @@ import { Col, Divider, Row, Alert } from "antd"; import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; +import { resourceStream } from "../../utils/api/sse/resources"; interface Props { name: string; @@ -25,6 +26,12 @@ const StatefulSet = ({ name, namespace }: Props) => { description: "", }); + useEffect(() => { + resourceStream(`apps`, `v1`, `StatefulSet`, name, namespace, (r: any) => { + setStatefulSet(r); + }); + }, [name, namespace]); + const fetchStatefulSet = useCallback(() => { axios .get(`/api/resources`, { @@ -46,10 +53,6 @@ const StatefulSet = ({ name, namespace }: Props) => { useEffect(() => { fetchStatefulSet(); - const interval = setInterval(() => fetchStatefulSet(), 15000); - return () => { - clearInterval(interval); - }; }, [fetchStatefulSet]); return ( From 95807a69d4917c86007611d2ee92c78054e796da Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 3 Sep 2024 09:27:40 +0000 Subject: [PATCH 11/24] =?UTF-8?q?=E2=9A=99=EF=B8=8F=20update=20cyclops=20t?= =?UTF-8?q?o=20v0.11.0-rc-streaming.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- install/cyclops-install.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/install/cyclops-install.yaml b/install/cyclops-install.yaml index 80a39fcd..4144e1fb 100644 --- a/install/cyclops-install.yaml +++ b/install/cyclops-install.yaml @@ -334,7 +334,7 @@ spec: spec: containers: - name: cyclops-ui - image: cyclopsui/cyclops-ui:v0.11.0-rc-streaming + image: cyclopsui/cyclops-ui:v0.11.0-rc-streaming.2 ports: - containerPort: 80 env: @@ -397,7 +397,7 @@ spec: serviceAccountName: cyclops-ctrl containers: - name: cyclops-ctrl - image: cyclopsui/cyclops-ctrl:v0.11.0-rc-streaming + image: cyclopsui/cyclops-ctrl:v0.11.0-rc-streaming.2 ports: - containerPort: 8080 env: From 274553cd079bd3ce08a854ae78e4d115b2cb24f0 Mon Sep 17 00:00:00 2001 From: petar-cvit Date: Tue, 3 Sep 2024 17:40:27 +0200 Subject: [PATCH 12/24] streaming feature flag --- cyclops-ui/.env | 1 + cyclops-ui/env.js | 1 + cyclops-ui/public/env-config.js | 1 + .../src/components/k8s-resources/DaemonSet.tsx | 18 +++++++++++++++--- .../components/k8s-resources/Deployment.tsx | 18 +++++++++++++++--- .../components/k8s-resources/StatefulSet.tsx | 18 +++++++++++++++--- cyclops-ui/src/utils/api/common.tsx | 3 +++ cyclops-ui/src/utils/api/sse/resources.tsx | 2 +- 8 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 cyclops-ui/src/utils/api/common.tsx diff --git a/cyclops-ui/.env b/cyclops-ui/.env index 126ee2f5..4494f1b2 100644 --- a/cyclops-ui/.env +++ b/cyclops-ui/.env @@ -1,2 +1,3 @@ NODE_ENV=production REACT_APP_CYCLOPS_CTRL_HOST=http://localhost:8888 +REACT_APP_ENABLE_STREAMING=true diff --git a/cyclops-ui/env.js b/cyclops-ui/env.js index 04a85aa0..ac6f30fc 100644 --- a/cyclops-ui/env.js +++ b/cyclops-ui/env.js @@ -3,4 +3,5 @@ window.__RUNTIME_CONFIG__ = { NODE_ENV: "${NODE_ENV}", REACT_APP_CYCLOPS_CTRL_HOST: "${REACT_APP_CYCLOPS_CTRL_HOST}", REACT_APP_VERSION: "${REACT_APP_VERSION}", + REACT_APP_ENABLE_STREAMING: "${REACT_APP_ENABLE_STREAMING}", }; diff --git a/cyclops-ui/public/env-config.js b/cyclops-ui/public/env-config.js index c50c99da..39ffe808 100644 --- a/cyclops-ui/public/env-config.js +++ b/cyclops-ui/public/env-config.js @@ -2,4 +2,5 @@ window.__RUNTIME_CONFIG__ = { NODE_ENV: "development", REACT_APP_CYCLOPS_CTRL_HOST: "http://localhost:8888", REACT_APP_VERSION: "v0.0.0", + REACT_APP_ENABLE_STREAMING: "true", }; diff --git a/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx b/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx index 4bbf10a4..2c1f79f1 100644 --- a/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx +++ b/cyclops-ui/src/components/k8s-resources/DaemonSet.tsx @@ -4,6 +4,7 @@ import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; import { resourceStream } from "../../utils/api/sse/resources"; +import { isStreamingEnabled } from "../../utils/api/common"; interface Props { name: string; @@ -22,9 +23,11 @@ const DaemonSet = ({ name, namespace }: Props) => { }); useEffect(() => { - resourceStream(`apps`, `v1`, `DaemonSet`, name, namespace, (r: any) => { - setDaemonSet(r); - }); + if (isStreamingEnabled()) { + resourceStream(`apps`, `v1`, `DaemonSet`, name, namespace, (r: any) => { + setDaemonSet(r); + }); + } }, [name, namespace]); const fetchDaemonSet = useCallback(() => { @@ -48,6 +51,15 @@ const DaemonSet = ({ name, namespace }: Props) => { useEffect(() => { fetchDaemonSet(); + + if (isStreamingEnabled()) { + return () => {}; + } + + const interval = setInterval(() => fetchDaemonSet(), 15000); + return () => { + clearInterval(interval); + }; }, [fetchDaemonSet]); return ( diff --git a/cyclops-ui/src/components/k8s-resources/Deployment.tsx b/cyclops-ui/src/components/k8s-resources/Deployment.tsx index c71c8b39..c4897309 100644 --- a/cyclops-ui/src/components/k8s-resources/Deployment.tsx +++ b/cyclops-ui/src/components/k8s-resources/Deployment.tsx @@ -4,6 +4,7 @@ import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; import { resourceStream } from "../../utils/api/sse/resources"; +import { isStreamingEnabled } from "../../utils/api/common"; interface Props { name: string; @@ -21,9 +22,11 @@ const Deployment = ({ name, namespace }: Props) => { }); useEffect(() => { - resourceStream(`apps`, `v1`, `Deployment`, name, namespace, (r: any) => { - setDeployment(r); - }); + if (isStreamingEnabled()) { + resourceStream(`apps`, `v1`, `Deployment`, name, namespace, (r: any) => { + setDeployment(r); + }); + } }, [name, namespace]); const fetchDeployment = useCallback(() => { @@ -47,6 +50,15 @@ const Deployment = ({ name, namespace }: Props) => { useEffect(() => { fetchDeployment(); + + if (isStreamingEnabled()) { + return () => {}; + } + + const interval = setInterval(() => fetchDeployment(), 15000); + return () => { + clearInterval(interval); + }; }, [fetchDeployment]); return ( diff --git a/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx b/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx index 11c9af4c..8fab853e 100644 --- a/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx +++ b/cyclops-ui/src/components/k8s-resources/StatefulSet.tsx @@ -4,6 +4,7 @@ import axios from "axios"; import { mapResponseError } from "../../utils/api/errors"; import PodTable from "./common/PodTable/PodTable"; import { resourceStream } from "../../utils/api/sse/resources"; +import { isStreamingEnabled } from "../../utils/api/common"; interface Props { name: string; @@ -27,9 +28,11 @@ const StatefulSet = ({ name, namespace }: Props) => { }); useEffect(() => { - resourceStream(`apps`, `v1`, `StatefulSet`, name, namespace, (r: any) => { - setStatefulSet(r); - }); + if (isStreamingEnabled()) { + resourceStream(`apps`, `v1`, `StatefulSet`, name, namespace, (r: any) => { + setStatefulSet(r); + }); + } }, [name, namespace]); const fetchStatefulSet = useCallback(() => { @@ -53,6 +56,15 @@ const StatefulSet = ({ name, namespace }: Props) => { useEffect(() => { fetchStatefulSet(); + + if (isStreamingEnabled()) { + return () => {}; + } + + const interval = setInterval(() => fetchStatefulSet(), 15000); + return () => { + clearInterval(interval); + }; }, [fetchStatefulSet]); return ( diff --git a/cyclops-ui/src/utils/api/common.tsx b/cyclops-ui/src/utils/api/common.tsx new file mode 100644 index 00000000..d40363b8 --- /dev/null +++ b/cyclops-ui/src/utils/api/common.tsx @@ -0,0 +1,3 @@ +export function isStreamingEnabled() { + return window.__RUNTIME_CONFIG__.REACT_APP_ENABLE_STREAMING === "true"; +} diff --git a/cyclops-ui/src/utils/api/sse/resources.tsx b/cyclops-ui/src/utils/api/sse/resources.tsx index 6927e01c..5e681993 100644 --- a/cyclops-ui/src/utils/api/sse/resources.tsx +++ b/cyclops-ui/src/utils/api/sse/resources.tsx @@ -31,7 +31,7 @@ export function resourceStream( response.ok && response.headers.get("content-type") === EventStreamContentType ) { - return; // everything's good + return; } else if ( response.status >= 400 && response.status < 500 && From f6adc39bff264c301e9f57d950b7b363eb863dfa Mon Sep 17 00:00:00 2001 From: naineel1209 Date: Sun, 8 Sep 2024 00:41:23 +0530 Subject: [PATCH 13/24] feat[BE]: impl streamed logs for pods over SSE --- .../internal/cluster/k8sclient/client.go | 35 +++++++++++-- cyclops-ctrl/internal/controller/modules.go | 49 ++++++++++++++----- cyclops-ctrl/internal/handler/handler.go | 2 +- 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/cyclops-ctrl/internal/cluster/k8sclient/client.go b/cyclops-ctrl/internal/cluster/k8sclient/client.go index 2eb360ae..b6bbb3a4 100644 --- a/cyclops-ctrl/internal/cluster/k8sclient/client.go +++ b/cyclops-ctrl/internal/cluster/k8sclient/client.go @@ -7,15 +7,16 @@ import ( "errors" "fmt" "io" - networkingv1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/watch" "os" "os/exec" "sort" "strings" "time" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" + "github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1" "gopkg.in/yaml.v2" @@ -169,6 +170,34 @@ func (k *KubernetesClient) GetPods(namespace, name string) ([]apiv1.Pod, error) return podList.Items, err } +func (k *KubernetesClient) GetStreamedPodLogs(namespace, container, name string, logCount *int64, logChan chan<- string) error { + podLogOptions := apiv1.PodLogOptions{ + Container: container, + TailLines: logCount, + Timestamps: true, + Follow: true, + } + + podClient := k.clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOptions) + stream, err := podClient.Stream(context.Background()) + if err != nil { + return err + } + defer stream.Close() + + scanner := bufio.NewScanner(stream) + + for scanner.Scan() { + logChan <- scanner.Text() + } + + if err := scanner.Err(); err != nil { + return err + } + + return nil +} + func (k *KubernetesClient) GetPodLogs(namespace, container, name string, numLogs *int64) ([]string, error) { podLogOptions := apiv1.PodLogOptions{ Container: container, diff --git a/cyclops-ctrl/internal/controller/modules.go b/cyclops-ctrl/internal/controller/modules.go index bb9b8ae4..e6a09777 100644 --- a/cyclops-ctrl/internal/controller/modules.go +++ b/cyclops-ctrl/internal/controller/modules.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "io" "net/http" "os" "strings" @@ -505,19 +506,43 @@ func (m *Modules) GetLogs(ctx *gin.Context) { ctx.Header("Access-Control-Allow-Origin", "*") logCount := int64(100) - logs, err := m.kubernetesClient.GetPodLogs( - ctx.Param("namespace"), - ctx.Param("container"), - ctx.Param("name"), - &logCount, - ) - if err != nil { - fmt.Println(err) - ctx.JSON(http.StatusInternalServerError, dto.NewError("Error fetching logs", err.Error())) - return - } - ctx.JSON(http.StatusOK, logs) + // pass on a channel to the GetPodLogs function to allow for streaming + logChan := make(chan string) + + go func() { + defer close(logChan) + err := m.kubernetesClient.GetStreamedPodLogs( + ctx.Param("namespace"), + ctx.Param("container"), + ctx.Param("name"), + &logCount, + logChan, + ) + if err != nil { + ctx.JSON(http.StatusInternalServerError, dto.NewError("Error fetching logs", err.Error())) + return + } + }() + + // stream logs to the client + ctx.Stream(func(w io.Writer) bool { + for { + select { + case log, ok := <-logChan: + if !ok { + return false + } + + ctx.SSEvent("pod-log", log) + return true + case <-ctx.Request.Context().Done(): + return false + case <-ctx.Done(): + return false + } + } + }) } func (m *Modules) GetDeploymentLogs(ctx *gin.Context) { diff --git a/cyclops-ctrl/internal/handler/handler.go b/cyclops-ctrl/internal/handler/handler.go index 58a3d0f1..0dd01e17 100644 --- a/cyclops-ctrl/internal/handler/handler.go +++ b/cyclops-ctrl/internal/handler/handler.go @@ -81,7 +81,7 @@ func (h *Handler) Start() error { h.router.GET("/modules/:name/helm-template", modulesController.HelmTemplate) //h.router.POST("/modules/resources", modulesController.ModuleToResources) - h.router.GET("/resources/pods/:namespace/:name/:container/logs", modulesController.GetLogs) + h.router.GET("/resources/pods/:namespace/:name/:container/logs", sse.HeadersMiddleware(), modulesController.GetLogs) h.router.GET("/resources/pods/:namespace/:name/:container/logs/download", modulesController.DownloadLogs) h.router.GET("/manifest", modulesController.GetManifest) From f001175799bafdd9252151ea2d793609c00c0836 Mon Sep 17 00:00:00 2001 From: naineel1209 Date: Sun, 8 Sep 2024 19:18:11 +0530 Subject: [PATCH 14/24] feat[FE]: impl log streaming for pods --- cyclops-ctrl/internal/controller/modules.go | 2 +- .../common/PodTable/PodTable.tsx | 118 ++++++++++-------- cyclops-ui/src/utils/api/sse/resources.tsx | 57 +++++++++ cyclops-ui/yarn.lock | 37 +++++- 4 files changed, 153 insertions(+), 61 deletions(-) diff --git a/cyclops-ctrl/internal/controller/modules.go b/cyclops-ctrl/internal/controller/modules.go index e6a09777..3f54fbcc 100644 --- a/cyclops-ctrl/internal/controller/modules.go +++ b/cyclops-ctrl/internal/controller/modules.go @@ -507,11 +507,11 @@ func (m *Modules) GetLogs(ctx *gin.Context) { logCount := int64(100) - // pass on a channel to the GetPodLogs function to allow for streaming logChan := make(chan string) go func() { defer close(logChan) + err := m.kubernetesClient.GetStreamedPodLogs( ctx.Param("namespace"), ctx.Param("container"), diff --git a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodTable.tsx b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodTable.tsx index 8a816b62..c93874e2 100644 --- a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodTable.tsx +++ b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodTable.tsx @@ -24,6 +24,8 @@ import { ReadOutlined, DeleteOutlined, } from "@ant-design/icons"; +import { logStream } from "../../../../utils/api/sse/resources"; +import { isStreamingEnabled } from "../../../../utils/api/common"; interface Props { namespace: string; @@ -40,7 +42,7 @@ interface Pod { } const PodTable = ({ pods, namespace, updateResourceData }: Props) => { - const [logs, setLogs] = useState(""); + const [logs, setLogs] = useState([]); const [logsModal, setLogsModal] = useState({ on: false, namespace: "", @@ -48,6 +50,8 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { containers: [], initContainers: [], }); + const [logsSignalController, setLogsSignalController] = + useState(null); const [deletePodRef, setDeletePodRef] = useState<{ on: boolean; @@ -77,7 +81,14 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { containers: [], initContainers: [], }); - setLogs(""); + setLogs([]); + setLogsSignalController((prevController) => { + if (prevController) { + prevController.abort(); + } + + return null; + }); }; const handleCancelDeletePod = () => { @@ -167,7 +178,7 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { type="primary" // icon={} onClick={downloadLogs(container.name)} - disabled={logs === "No logs available"} + disabled={logs.length === 0} > Download @@ -175,7 +186,9 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { @@ -195,7 +208,7 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { type="primary" // icon={} onClick={downloadLogs(container.name)} - disabled={logs === "No logs available"} + disabled={logs.length === 0} > Download @@ -203,7 +216,9 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { @@ -216,31 +231,32 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { }; const onLogsTabsChange = (container: string) => { - axios - .get( - "/api/resources/pods/" + - logsModal.namespace + - "/" + - logsModal.pod + - "/" + - container + - "/logs", - ) - .then((res) => { - if (res.data) { - let log = ""; - res.data.forEach((s: string) => { - log += s; - log += "\n"; + setLogsSignalController((prevController) => { + if (prevController) { + prevController.abort(); + } + + const controller = new AbortController(); + return controller; + }); + setLogs(() => []); //this is to remove the previous pod's logs + + if (isStreamingEnabled() && logsSignalController) { + logStream( + logsModal.pod, + logsModal.namespace, + container, + (log) => { + setLogs((prevLogs) => { + return [...prevLogs, log]; }); - setLogs(log); - } else { - setLogs("No logs available"); - } - }) - .catch((error) => { - setError(mapResponseError(error)); - }); + }, + (err) => { + setError(mapResponseError(err)); + }, + logsSignalController, + ); + } }; const podActionsMenu = (pod: any) => { @@ -251,31 +267,25 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { @@ -58,7 +69,9 @@ const PodLogs = ({ pod }: PodLogsProps) => { @@ -78,7 +91,7 @@ const PodLogs = ({ pod }: PodLogsProps) => { type="primary" // icon={} onClick={downloadLogs(container.name)} - disabled={logs === "No logs available"} + disabled={logs.length === 0} > Download @@ -86,7 +99,9 @@ const PodLogs = ({ pod }: PodLogsProps) => { @@ -99,31 +114,32 @@ const PodLogs = ({ pod }: PodLogsProps) => { }; const onLogsTabsChange = (container: string) => { - axios - .get( - "/api/resources/pods/" + - logsModal.namespace + - "/" + - logsModal.pod + - "/" + - container + - "/logs", - ) - .then((res) => { - if (res.data) { - let log = ""; - res.data.forEach((s: string) => { - log += s; - log += "\n"; + setLogsSignalController((prevController) => { + if (prevController) { + prevController.abort(); + } + + const controller = new AbortController(); + return controller; + }); + setLogs(() => []); //this is to remove the previous pod's logs + + if (isStreamingEnabled() && logsSignalController) { + logStream( + logsModal.pod, + logsModal.namespace, + container, + (log) => { + setLogs((prevLogs) => { + return [...prevLogs, log]; }); - setLogs(log); - } else { - setLogs("No logs available"); - } - }) - .catch((error) => { - setError(mapResponseError(error)); - }); + }, + (err) => { + setError(mapResponseError(err)); + }, + logsSignalController, + ); + } }; const downloadLogs = (container: string) => { @@ -139,45 +155,40 @@ const PodLogs = ({ pod }: PodLogsProps) => { }; }; - const handleViewPodLogs = async () => { - axios - .get( - "/api/resources/pods/" + - pod.namespace + - "/" + - pod.name + - "/" + - pod.containers[0].name + - "/logs", - ) - .then((res) => { - if (res.data) { - let log = ""; - res.data.forEach((s: string) => { - log += s; - log += "\n"; - }); - setLogs(log); - } else { - setLogs("No logs available"); - } - }) - .catch((error) => { - setError(mapResponseError(error)); - }); - - setLogsModal({ - on: true, - namespace: pod.namespace, - pod: pod.name, - containers: pod.containers, - initContainers: pod.initContainers, - }); - }; - return ( <> - - - - - ), - }); - } - } - - if (logsModal.initContainers !== null) { - for (container of logsModal.initContainers) { - items.push({ - key: container.name, - label: "(init container) " + container.name, - children: ( - - - - - - ), - }); - } - } - - return items; - }; - - const onLogsTabsChange = (container: string) => { - setLogsSignalController((prevController) => { - if (prevController) { - prevController.abort(); - } - - const controller = new AbortController(); - return controller; - }); - setLogs(() => []); //this is to remove the previous pod's logs - - if (isStreamingEnabled() && logsSignalController) { - logStream( - logsModal.pod, - logsModal.namespace, - container, - (log) => { - setLogs((prevLogs) => { - return [...prevLogs, log]; - }); - }, - (err) => { - setError(mapResponseError(err)); - }, - logsSignalController, - ); - } - }; - const podActionsMenu = (pod: any) => { return (
@@ -276,63 +130,6 @@ const PodTable = ({ pods, namespace, updateResourceData }: Props) => { - -
); }; From a13121fbe0235b10b1a683894583eac042715ffe Mon Sep 17 00:00:00 2001 From: naineel1209 Date: Thu, 12 Sep 2024 07:38:01 +0530 Subject: [PATCH 16/24] wip: resolved unclosed goroutine --- cyclops-ctrl/internal/controller/modules.go | 5 +++-- cyclops-ctrl/pkg/cluster/k8sclient/client.go | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cyclops-ctrl/internal/controller/modules.go b/cyclops-ctrl/internal/controller/modules.go index e565e57f..a368d962 100644 --- a/cyclops-ctrl/internal/controller/modules.go +++ b/cyclops-ctrl/internal/controller/modules.go @@ -5,10 +5,11 @@ import ( "io" "net/http" "os" - "sigs.k8s.io/yaml" "strings" "time" + "sigs.k8s.io/yaml" + "github.com/gin-gonic/gin" "github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1" @@ -542,6 +543,7 @@ func (m *Modules) GetLogs(ctx *gin.Context) { defer close(logChan) err := m.kubernetesClient.GetStreamedPodLogs( + ctx, // we will have to pass the context for the k8s podClient - so it can stop the stream when the client disconnects ctx.Param("namespace"), ctx.Param("container"), ctx.Param("name"), @@ -549,7 +551,6 @@ func (m *Modules) GetLogs(ctx *gin.Context) { logChan, ) if err != nil { - ctx.JSON(http.StatusInternalServerError, dto.NewError("Error fetching logs", err.Error())) return } }() diff --git a/cyclops-ctrl/pkg/cluster/k8sclient/client.go b/cyclops-ctrl/pkg/cluster/k8sclient/client.go index e3259d4a..247f2e80 100644 --- a/cyclops-ctrl/pkg/cluster/k8sclient/client.go +++ b/cyclops-ctrl/pkg/cluster/k8sclient/client.go @@ -7,16 +7,18 @@ import ( "errors" "fmt" "io" - networkingv1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/watch" "os" "os/exec" "sort" "strings" "time" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" + "github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1" + "github.com/gin-gonic/gin" "gopkg.in/yaml.v2" @@ -169,7 +171,7 @@ func (k *KubernetesClient) GetPods(namespace, name string) ([]apiv1.Pod, error) return podList.Items, err } -func (k *KubernetesClient) GetStreamedPodLogs(namespace, container, name string, logCount *int64, logChan chan<- string) error { +func (k *KubernetesClient) GetStreamedPodLogs(ctx *gin.Context, namespace, container, name string, logCount *int64, logChan chan<- string) error { podLogOptions := apiv1.PodLogOptions{ Container: container, TailLines: logCount, @@ -178,7 +180,7 @@ func (k *KubernetesClient) GetStreamedPodLogs(namespace, container, name string, } podClient := k.clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOptions) - stream, err := podClient.Stream(context.Background()) + stream, err := podClient.Stream(ctx.Request.Context()) if err != nil { return err } From 17d3b6751838f03c53fcd0285bf23e9e513ce9e0 Mon Sep 17 00:00:00 2001 From: naineel1209 Date: Thu, 12 Sep 2024 23:54:38 +0530 Subject: [PATCH 17/24] feat: fixed View Logs button dimensions --- .../src/components/k8s-resources/common/PodTable/PodLogs.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx index a6aaaf19..7781f22f 100644 --- a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx +++ b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx @@ -158,7 +158,7 @@ const PodLogs = ({ pod }: PodLogsProps) => { return ( <>