diff --git a/internal/cmd/local/k8s/client.go b/internal/cmd/local/k8s/client.go index c072718..b5657dd 100644 --- a/internal/cmd/local/k8s/client.go +++ b/internal/cmd/local/k8s/client.go @@ -75,6 +75,7 @@ type Client interface { EventsWatch(ctx context.Context, namespace string) (watch.Interface, error) LogsGet(ctx context.Context, namespace string, name string) (string, error) + StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) } var _ Client = (*DefaultK8sClient)(nil) @@ -325,3 +326,11 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s } return buf.String(), nil } + +func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) { + req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ + Follow: true, + SinceTime: &metav1.Time{Time: since}, + }) + return req.Stream(ctx) +} diff --git a/internal/cmd/local/k8s/k8stest/k8stest.go b/internal/cmd/local/k8s/k8stest/k8stest.go index b31319d..7a73ccf 100644 --- a/internal/cmd/local/k8s/k8stest/k8stest.go +++ b/internal/cmd/local/k8s/k8stest/k8stest.go @@ -2,6 +2,8 @@ package k8stest import ( "context" + "io" + "time" "github.com/airbytehq/abctl/internal/cmd/local/k8s" v1 "k8s.io/api/apps/v1" @@ -33,6 +35,7 @@ type MockClient struct { FnServiceGet func(ctx context.Context, namespace, name string) (*corev1.Service, error) FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error) FnLogsGet func(ctx context.Context, namespace string, name string) (string, error) + FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error) } func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) { @@ -166,3 +169,10 @@ func (m *MockClient) LogsGet(ctx context.Context, namespace string, name string) } return m.FnLogsGet(ctx, namespace, name) } + +func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) { + if m.FnStreamPodLogs == nil { + panic("FnStreamPodLogs is not configured") + } + return m.FnStreamPodLogs(ctx, namespace, podName, since) +} diff --git a/internal/cmd/local/local/cmd.go b/internal/cmd/local/local/cmd.go index 002da42..507dfaf 100644 --- a/internal/cmd/local/local/cmd.go +++ b/internal/cmd/local/local/cmd.go @@ -20,6 +20,7 @@ import ( ) const ( + airbyteBootloaderPodName = "airbyte-abctl-airbyte-bootloader" airbyteChartName = "airbyte/airbyte" airbyteChartRelease = "airbyte-abctl" airbyteIngress = "ingress-abctl" diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index 917f5e4..5bbb076 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -1,11 +1,13 @@ package local import ( + "bufio" "context" "fmt" "net/http" "os" "path/filepath" + "regexp" "strconv" "strings" "time" @@ -27,8 +29,8 @@ import ( "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/repo" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -378,6 +380,63 @@ func (c *Command) watchEvents(ctx context.Context) { } } +// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273.... +var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P[A-Z]+)\x1b\[m (?P\S+ - .*)`) + +func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error { + r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since) + if err != nil { + return err + } + defer r.Close() + + level := pterm.Debug + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + + // skip java stacktrace noise + if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") { + continue + } + + m := javaLogRx.FindSubmatch(scanner.Bytes()) + var msg string + + if m != nil { + msg = string(m[2]) + if string(m[1]) == "ERROR" { + level = pterm.Error + } else { + level = pterm.Debug + } + } else { + msg = scanner.Text() + } + + level.Printfln("%s: %s", prefix, msg) + } + return scanner.Err() +} + +func (c *Command) watchBootloaderLogs(ctx context.Context) { + pterm.Debug.Printfln("start streaming bootloader logs") + since := time.Now() + + for { + // Wait a few seconds on the first iteration, give the bootloaders some time to start. + time.Sleep(5 * time.Second) + + err := c.streamPodLogs(ctx, airbyteNamespace, airbyteBootloaderPodName, "airbyte-bootloader", since) + if err == nil { + break + } else { + pterm.Debug.Printfln("error streaming bootloader logs. will retry: %s", err) + } + } + pterm.Debug.Printfln("done streaming bootloader logs") +} + // now is used to filter out kubernetes events that happened in the past. // Kubernetes wants us to use the ResourceVersion on the event watch request itself, but that approach // is more complicated as it requires determining which ResourceVersion to initially provide. @@ -398,6 +457,8 @@ func (c *Command) handleEvent(ctx context.Context, e *eventsv1.Event) { case strings.EqualFold(e.Type, "normal"): if strings.EqualFold(e.Reason, "backoff") { pterm.Warning.Println(e.Note) + } else if e.Reason == "Started" && e.Regarding.Name == "airbyte-abctl-airbyte-bootloader" { + go c.watchBootloaderLogs(ctx) } else { pterm.Debug.Println(e.Note) }