Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸš€ feat: implement logs streaming for pods #568

Merged
merged 34 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b18f09c
sse endpoint impl
petar-cvit Aug 30, 2024
735d65b
fetch sse endpoint impl
petar-cvit Aug 30, 2024
40ca3b8
fix failing build
petar-cvit Aug 30, 2024
f5152f7
βš™οΈ update cyclops to v0.11.0-rc-streaming
github-actions[bot] Aug 30, 2024
ec3661c
remove client tracking
petar-cvit Sep 1, 2024
f5ec560
remove unused params
petar-cvit Sep 1, 2024
8ecde2c
Merge branch 'main' into stream-deployment-updates
petar-cvit Sep 1, 2024
a4b8456
destroy on proxy
petar-cvit Sep 2, 2024
1b85cee
extract sse subscribe to a function
petar-cvit Sep 2, 2024
57e14e4
remove logging
petar-cvit Sep 2, 2024
a84e1f8
Merge branch 'main' into stream-deployment-updates
petar-cvit Sep 2, 2024
0b24558
support daemonsets and statefulsets
petar-cvit Sep 2, 2024
95807a6
βš™οΈ update cyclops to v0.11.0-rc-streaming.2
github-actions[bot] Sep 3, 2024
274553c
streaming feature flag
petar-cvit Sep 3, 2024
f6adc39
feat[BE]: impl streamed logs for pods over SSE
naineel1209 Sep 7, 2024
f001175
feat[FE]: impl log streaming for pods
naineel1209 Sep 8, 2024
8401e11
Merge branch 'main' into feat/log-streaming-for-pods
naineel1209 Sep 11, 2024
64e9202
wip: resolved merge conflicts
naineel1209 Sep 11, 2024
a13121f
wip: resolved unclosed goroutine
naineel1209 Sep 12, 2024
17d3b67
feat: fixed View Logs button dimensions
naineel1209 Sep 12, 2024
c5d0445
fix: applies recommended changes
naineel1209 Sep 13, 2024
3ca47dc
fix: applies recommended changes
naineel1209 Sep 13, 2024
0053df9
Merge branch 'feat/log-streaming-for-pods' of https://github.com/nain…
naineel1209 Sep 13, 2024
6074d12
fix: pass consistent `logSignalController` to `logStream`
naineel1209 Sep 16, 2024
8c26c21
fix: pass consistent `logSignalController` to `logStream`
naineel1209 Sep 16, 2024
44d1539
Merge branch 'feat/log-streaming-for-pods' of https://github.com/nain…
naineel1209 Sep 16, 2024
522d969
Merge branch 'feat/log-streaming-for-pods' of https://github.com/nain…
naineel1209 Sep 16, 2024
3239b87
Merge branch 'feat/log-streaming-for-pods' of https://github.com/nain…
naineel1209 Sep 16, 2024
1491ba6
Merge branch 'main' into feat/log-streaming-for-pods
naineel1209 Sep 19, 2024
5c018d5
πŸ”€ fix: resolved the duplicated logs streamed error
naineel1209 Sep 19, 2024
2505341
Merge branch 'main' into feat/log-streaming-for-pods
naineel1209 Sep 22, 2024
a930622
fix: resolves duplicated logs error on reconnection
naineel1209 Sep 22, 2024
5ae134e
fix: formatting
naineel1209 Sep 27, 2024
9d47701
remove unused state
petar-cvit Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions cyclops-ctrl/internal/controller/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package controller

import (
"fmt"
"io"
"net/http"
"os"
"sigs.k8s.io/yaml"
"strings"
"time"

"sigs.k8s.io/yaml"

"github.com/gin-gonic/gin"

"github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1"
Expand Down Expand Up @@ -541,24 +543,43 @@ func (m *Modules) GetLogs(ctx *gin.Context) {
ctx.Header("Access-Control-Allow-Origin", "*")

logCount := int64(100)
rawLogs, err := m.kubernetesClient.GetPodLogs(
ctx.Param("namespace"),
ctx.Param("container"),
ctx.Param("name"),
&logCount,
)
if err != nil {
fmt.Println(err)
ctx.JSON(http.StatusInternalServerError, dto.NewError("Error fetching logs", err.Error()))
return
}

logs := make([]string, 0, len(rawLogs))
for _, log := range rawLogs {
logs = append(logs, trimLogLine(log))
}
logChan := make(chan string)

ctx.JSON(http.StatusOK, logs)
go func() {
defer close(logChan)

err := m.kubernetesClient.GetStreamedPodLogs(
ctx.Request.Context(), // we will have to pass the context for the k8s podClient - so it can stop the stream when the client disconnects
ctx.Param("namespace"),
ctx.Param("container"),
ctx.Param("name"),
&logCount,
logChan,
)
if err != nil {
return
}
}()

// stream logs to the client
ctx.Stream(func(w io.Writer) bool {
for {
select {
case log, ok := <-logChan:
if !ok {
return false
}

ctx.SSEvent("pod-log", log)
return true
case <-ctx.Request.Context().Done():
return false
case <-ctx.Done():
return false
}
}
})
}

func (m *Modules) GetDeploymentLogs(ctx *gin.Context) {
Expand Down
2 changes: 1 addition & 1 deletion cyclops-ctrl/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (h *Handler) Start() error {
h.router.GET("/modules/:name/helm-template", modulesController.HelmTemplate)
//h.router.POST("/modules/resources", modulesController.ModuleToResources)

h.router.GET("/resources/pods/:namespace/:name/:container/logs", modulesController.GetLogs)
h.router.GET("/resources/pods/:namespace/:name/:container/logs", sse.HeadersMiddleware(), modulesController.GetLogs)
h.router.GET("/resources/pods/:namespace/:name/:container/logs/download", modulesController.DownloadLogs)

h.router.GET("/manifest", modulesController.GetManifest)
Expand Down
30 changes: 30 additions & 0 deletions cyclops-ctrl/pkg/cluster/k8sclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"errors"
"fmt"
"io"

networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

"os"
"os/exec"
"sort"
Expand Down Expand Up @@ -171,6 +173,34 @@ func (k *KubernetesClient) GetPods(namespace, name string) ([]apiv1.Pod, error)
return podList.Items, err
}

func (k *KubernetesClient) GetStreamedPodLogs(ctx context.Context, namespace, container, name string, logCount *int64, logChan chan<- string) error {
podLogOptions := apiv1.PodLogOptions{
Container: container,
TailLines: logCount,
Timestamps: true,
Follow: true,
}

podClient := k.clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOptions)
stream, err := podClient.Stream(ctx)
if err != nil {
return err
}
defer stream.Close()

scanner := bufio.NewScanner(stream)

for scanner.Scan() {
logChan <- scanner.Text()
}

if err := scanner.Err(); err != nil {
return err
}

return nil
}

func (k *KubernetesClient) GetPodLogs(namespace, container, name string, numLogs *int64) ([]string, error) {
podLogOptions := apiv1.PodLogOptions{
Container: container,
Expand Down
153 changes: 84 additions & 69 deletions cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import { ReadOutlined } from "@ant-design/icons";
import { Alert, Button, Col, Divider, Modal, Tabs, TabsProps } from "antd";
import axios from "axios";
import { useState } from "react";
import ReactAce from "react-ace/lib/ace";
import { mapResponseError } from "../../../../utils/api/errors";
import { isStreamingEnabled } from "../../../../utils/api/common";
import { logStream } from "../../../../utils/api/sse/logs";

interface PodLogsProps {
pod: any;
}

const PodLogs = ({ pod }: PodLogsProps) => {
const [logs, setLogs] = useState("");
const [logs, setLogs] = useState<string[]>([]);
const [logsModal, setLogsModal] = useState({
on: false,
namespace: "",
pod: "",
containers: [],
initContainers: [],
});

const [error, setError] = useState({
message: "",
description: "",
Expand All @@ -31,7 +33,7 @@ const PodLogs = ({ pod }: PodLogsProps) => {
containers: [],
initContainers: [],
});
setLogs("");
setLogs([]);
};

const getTabItems = () => {
Expand All @@ -50,15 +52,17 @@ const PodLogs = ({ pod }: PodLogsProps) => {
type="primary"
// icon={<DownloadOutlined />}
onClick={downloadLogs(container.name)}
disabled={logs === "No logs available"}
disabled={logs.length === 0}
>
Download
</Button>
<Divider style={{ marginTop: "16px", marginBottom: "16px" }} />
<ReactAce
style={{ width: "100%" }}
mode={"sass"}
value={logs}
value={
logs.length === 0 ? "No logs available" : logs.join("\n")
}
readOnly={true}
/>
</Col>
Expand All @@ -78,15 +82,17 @@ const PodLogs = ({ pod }: PodLogsProps) => {
type="primary"
// icon={<DownloadOutlined />}
onClick={downloadLogs(container.name)}
disabled={logs === "No logs available"}
disabled={logs.length === 0}
>
Download
</Button>
<Divider style={{ marginTop: "16px", marginBottom: "16px" }} />
<ReactAce
style={{ width: "100%" }}
mode={"sass"}
value={logs}
value={
logs.length === 0 ? "No logs available" : logs.join("\n")
}
readOnly={true}
/>
</Col>
Expand All @@ -99,31 +105,36 @@ const PodLogs = ({ pod }: PodLogsProps) => {
};

const onLogsTabsChange = (container: string) => {
axios
.get(
"/api/resources/pods/" +
logsModal.namespace +
"/" +
logsModal.pod +
"/" +
container +
"/logs",
)
.then((res) => {
if (res.data) {
let log = "";
res.data.forEach((s: string) => {
log += s;
log += "\n";
});
setLogs(log);
} else {
setLogs("No logs available");
}
})
.catch((error) => {
setError(mapResponseError(error));
});
const controller = new AbortController();
setLogs(() => []);

if (isStreamingEnabled()) {
logStream(
logsModal.pod,
logsModal.namespace,
container,
(log, isReset = false) => {
if (isReset) {
setLogs(() => []);
} else {
setLogs((prevLogs) => {
return [...prevLogs, log];
});
}
},
(err, isReset = false) => {
if (isReset) {
setError({
message: "",
description: "",
});
} else {
setError(mapResponseError(err));
}
},
controller,
);
}
};

const downloadLogs = (container: string) => {
Expand All @@ -139,45 +150,49 @@ const PodLogs = ({ pod }: PodLogsProps) => {
};
};

const handleViewPodLogs = async () => {
axios
.get(
"/api/resources/pods/" +
pod.namespace +
"/" +
pod.name +
"/" +
pod.containers[0].name +
"/logs",
)
.then((res) => {
if (res.data) {
let log = "";
res.data.forEach((s: string) => {
log += s;
log += "\n";
});
setLogs(log);
} else {
setLogs("No logs available");
}
})
.catch((error) => {
setError(mapResponseError(error));
});

setLogsModal({
on: true,
namespace: pod.namespace,
pod: pod.name,
containers: pod.containers,
initContainers: pod.initContainers,
});
};

return (
<>
<Button style={{ width: "100%" }} onClick={handleViewPodLogs}>
<Button
style={{ width: "100%" }}
onClick={function () {
if (isStreamingEnabled()) {
const controller = new AbortController();
logStream(
pod.name,
pod.namespace,
pod.containers[0].name,
(log, isReset = false) => {
if (isReset) {
setLogs(() => []);
} else {
setLogs((prevLogs) => {
return [...prevLogs, log];
});
}
},
(err, isReset = false) => {
if (isReset) {
setError({
message: "",
description: "",
});
} else {
setError(mapResponseError(err));
}
},
controller,
);
}

setLogsModal({
on: true,
namespace: pod.namespace,
pod: pod.name,
containers: pod.containers,
initContainers: pod.initContainers,
});
}}
>
<h4>
<ReadOutlined style={{ paddingRight: "5px" }} />
View Logs
Expand Down
Loading
Loading