Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream bootloader logs #111

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Client interface {
EventsWatch(ctx context.Context, namespace string) (watch.Interface, error)

LogsGet(ctx context.Context, namespace string, name string) (string, error)
StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
}

var _ Client = (*DefaultK8sClient)(nil)
Expand Down Expand Up @@ -325,3 +326,11 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s
}
return buf.String(), nil
}

func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
SinceTime: &metav1.Time{Time: since},
})
return req.Stream(ctx)
}
10 changes: 10 additions & 0 deletions internal/cmd/local/k8s/k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package k8stest

import (
"context"
"io"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/k8s"
v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -33,6 +35,7 @@ type MockClient struct {
FnServiceGet func(ctx context.Context, namespace, name string) (*corev1.Service, error)
FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
FnLogsGet func(ctx context.Context, namespace string, name string) (string, error)
FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error)
}

func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
Expand Down Expand Up @@ -166,3 +169,10 @@ func (m *MockClient) LogsGet(ctx context.Context, namespace string, name string)
}
return m.FnLogsGet(ctx, namespace, name)
}

func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
if m.FnStreamPodLogs == nil {
panic("FnStreamPodLogs is not configured")
}
return m.FnStreamPodLogs(ctx, namespace, podName, since)
}
1 change: 1 addition & 0 deletions internal/cmd/local/local/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
airbyteBootloaderPodName = "airbyte-abctl-airbyte-bootloader"
airbyteChartName = "airbyte/airbyte"
airbyteChartRelease = "airbyte-abctl"
airbyteIngress = "ingress-abctl"
Expand Down
63 changes: 62 additions & 1 deletion internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package local

import (
"bufio"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -27,8 +29,8 @@ import (
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand Down Expand Up @@ -378,6 +380,63 @@ func (c *Command) watchEvents(ctx context.Context) {
}
}

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how fragile this is. We're likely changing our logging implementation on the platform shortly, and I don't think we have any guarantees that this format won't be modified with that change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all this log stuff is definitely fragile, but it's better than nothing for now. We need to do some work in the bootloader to make all this stuff clean and easy and stable over time.

And, maybe it's reasonable to say that the current log format should be the standard going forward. It's a very common format. Note that there are actually multiple formats in these logs too, which is annoying.

What's best might be to not parse logs in abctl, but have some other summarized bootloader status.


func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {
r, err := c.k8s.StreamPodLogs(ctx, airbyteNamespace, airbyteBootloaderPodName, since)
if err != nil {
return err
}
defer r.Close()

level := pterm.Debug
scanner := bufio.NewScanner(r)

for scanner.Scan() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the state of the ctx in this loop? Break out if the context is no longer active?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's necessary. The context is passed to the k8s.StreamPodLogs call, which returns a reader, which I would expect to close when the context is canceled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I proved this to myself. I couldn't think of a nice way to write an actual test for it (since it depends on the real k8s implementation), but I hacked in this code (note javaLogScanner is not in this PR, but it's just a scanner with the java parsing wrapped in)

func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {

	ctx, cancel := context.WithCancel(ctx)
	go func() {
		log.Printf("sleeping\n")
		time.Sleep(30 * time.Second)
		log.Printf("canceling\n")
		cancel()
	}()

	r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since)
	if err != nil {
		return err
	}
	defer r.Close()

	s := newJavaLogScanner(r)
	for s.Scan() {
		if s.line.level == "ERROR" {
			pterm.Error.Printfln("%s: %s", prefix, s.line.msg)
		} else {
			pterm.Debug.Printfln("%s: %s", prefix, s.line.msg)
		}
	}

	log.Printf("scanner exited: %v", s.Err())
	return s.Err()
}

and you can see the scanner exists in the logs:
image


// skip java stacktrace noise
if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(scanner.Bytes())
var msg string

if m != nil {
msg = string(m[2])
if string(m[1]) == "ERROR" {
level = pterm.Error
} else {
level = pterm.Debug
}
} else {
msg = scanner.Text()
}

level.Printfln("%s: %s", prefix, msg)
}
return scanner.Err()
}

func (c *Command) watchBootloaderLogs(ctx context.Context) {
pterm.Debug.Printfln("start streaming bootloader logs")
since := time.Now()

for {
// Wait a few seconds on the first iteration, give the bootloaders some time to start.
time.Sleep(5 * time.Second)

err := c.streamPodLogs(ctx, airbyteNamespace, airbyteBootloaderPodName, "airbyte-bootloader", since)
if err == nil {
break
} else {
pterm.Debug.Printfln("error streaming bootloader logs. will retry: %s", err)
}
}
pterm.Debug.Printfln("done streaming bootloader logs")
}

// now is used to filter out kubernetes events that happened in the past.
// Kubernetes wants us to use the ResourceVersion on the event watch request itself, but that approach
// is more complicated as it requires determining which ResourceVersion to initially provide.
Expand All @@ -398,6 +457,8 @@ func (c *Command) handleEvent(ctx context.Context, e *eventsv1.Event) {
case strings.EqualFold(e.Type, "normal"):
if strings.EqualFold(e.Reason, "backoff") {
pterm.Warning.Println(e.Note)
} else if e.Reason == "Started" && e.Regarding.Name == "airbyte-abctl-airbyte-bootloader" {
go c.watchBootloaderLogs(ctx)
} else {
pterm.Debug.Println(e.Note)
}
Expand Down