Skip to content

Commit

Permalink
feat: better reporting of airbyte chart errors (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte authored Sep 12, 2024
1 parent 8154b67 commit 1927aa3
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 35 deletions.
13 changes: 9 additions & 4 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

v1 "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,7 +28,7 @@ var DefaultPersistentVolumeSize = resource.MustParse("500Mi")
// Client primarily for testing purposes
type Client interface {
// DeploymentList returns a list of all the services within the namespace
DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error)
DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error)
// DeploymentRestart will force a restart of the deployment name in the provided namespace.
// This is a blocking call, it should only return once the deployment has completed.
DeploymentRestart(ctx context.Context, namespace, name string) error
Expand Down Expand Up @@ -76,6 +76,7 @@ type Client interface {

LogsGet(ctx context.Context, namespace string, name string) (string, error)
StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
PodList(ctx context.Context, namespace string) (*corev1.PodList, error)
}

var _ Client = (*DefaultK8sClient)(nil)
Expand All @@ -85,7 +86,7 @@ type DefaultK8sClient struct {
ClientSet kubernetes.Interface
}

func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error) {
return d.ClientSet.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{})
}

Expand Down Expand Up @@ -329,8 +330,12 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s

func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
Follow: true,
SinceTime: &metav1.Time{Time: since},
})
return req.Stream(ctx)
}

func (d *DefaultK8sClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
return d.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
}
8 changes: 8 additions & 0 deletions internal/cmd/local/k8s/k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MockClient struct {
FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
FnLogsGet func(ctx context.Context, namespace string, name string) (string, error)
FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error)
FnPodList func(ctx context.Context, namespace string) (*corev1.PodList, error)
}

func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
Expand Down Expand Up @@ -176,3 +177,10 @@ func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podNam
}
return m.FnStreamPodLogs(ctx, namespace, podName, since)
}

func (m *MockClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
if m.FnPodList == nil {
return nil, nil
}
return m.FnPodList(ctx, namespace)
}
68 changes: 37 additions & 31 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package local

import (
"bufio"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -162,7 +160,6 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {

if opts.Migrate {
c.spinner.UpdateText("Migrating airbyte data")
//if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return opts.Docker.MigrateComposeDB(ctx, "airbyte_db") }); err != nil {
if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return migrate.FromDockerVolume(ctx, opts.Docker.Client, "airbyte_db") }); err != nil {
pterm.Error.Println("Failed to migrate data from previous Airbyte installation")
return fmt.Errorf("unable to migrate data from previous airbyte installation: %w", err)
Expand Down Expand Up @@ -275,7 +272,7 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
namespace: airbyteNamespace,
valuesYAML: valuesYAML,
}); err != nil {
return fmt.Errorf("unable to install airbyte chart: %w", err)
return c.diagnoseAirbyteChartFailure(ctx, err)
}

if err := c.handleChart(ctx, chartRequest{
Expand Down Expand Up @@ -331,6 +328,35 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
return nil
}

func (c *Command) diagnoseAirbyteChartFailure(ctx context.Context, chartErr error) error {

if podList, err := c.k8s.PodList(ctx, airbyteNamespace); err == nil {

errors := []string{}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodFailed {
msg := "unknown"

logs, err := c.k8s.LogsGet(ctx, airbyteNamespace, pod.Name)
if err != nil {
msg = "unknown: failed to get pod logs."
}
m, err := getLastLogError(strings.NewReader(logs))
if err != nil {
msg = "unknown: failed to find error log."
}
if m != "" {
msg = m
}

errors = append(errors, fmt.Sprintf("pod %s: %s", pod.Name, msg))
}
}
return fmt.Errorf("unable to install airbyte chart:\n%s", strings.Join(errors, "\n"))
}
return fmt.Errorf("unable to install airbyte chart: %w", chartErr)
}

func (c *Command) handleIngress(ctx context.Context, hosts []string) error {
c.spinner.UpdateText("Checking for existing Ingress")

Expand Down Expand Up @@ -380,43 +406,23 @@ func (c *Command) watchEvents(ctx context.Context) {
}
}

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {
r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since)
if err != nil {
return err
}
defer r.Close()

level := pterm.Debug
scanner := bufio.NewScanner(r)

for scanner.Scan() {

// skip java stacktrace noise
if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(scanner.Bytes())
var msg string

if m != nil {
msg = string(m[2])
if string(m[1]) == "ERROR" {
level = pterm.Error
} else {
level = pterm.Debug
}
s := newLogScanner(r)
for s.Scan() {
if s.line.level == "ERROR" {
pterm.Error.Printfln("%s: %s", prefix, s.line.msg)
} else {
msg = scanner.Text()
pterm.Debug.Printfln("%s: %s", prefix, s.line.msg)
}

level.Printfln("%s: %s", prefix, msg)
}
return scanner.Err()

return s.Err()
}

func (c *Command) watchBootloaderLogs(ctx context.Context) {
Expand Down
79 changes: 79 additions & 0 deletions internal/cmd/local/local/log_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package local

import (
"bufio"
"io"
"regexp"
"strings"
)

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var logRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

type logLine struct {
msg string
level string
}

type logScanner struct {
scanner *bufio.Scanner
line logLine
}

func newLogScanner(r io.Reader) *logScanner {
return &logScanner{
scanner: bufio.NewScanner(r),
line: logLine{
msg: "",
level: "DEBUG",
},
}
}

func (j *logScanner) Scan() bool {
for {
if ok := j.scanner.Scan(); !ok {
return false
}

// skip java stacktrace noise
if strings.HasPrefix(j.scanner.Text(), "\tat ") || strings.HasPrefix(j.scanner.Text(), "\t... ") {
continue
}

m := logRx.FindSubmatch(j.scanner.Bytes())

if m != nil {
j.line.msg = string(m[2])
j.line.level = string(m[1])
} else {
// Some logs aren't from java (e.g. temporal) or they have a different format,
// or the log covers multiple lines (e.g. java stack trace). In that case, use the full line
// and reuse the level of the previous line.
j.line.msg = j.scanner.Text()
}
return true
}
}

func (j *logScanner) Err() error {
return j.scanner.Err()
}

func getLastLogError(r io.Reader) (string, error) {
lines := []logLine{}
s := newLogScanner(r)
for s.Scan() {
lines = append(lines, s.line)
}
if s.Err() != nil {
return "", s.Err()
}

for i := len(lines) - 1; i >= 0; i-- {
if lines[i].level == "ERROR" {
return lines[i].msg, nil
}
}
return "", nil
}
62 changes: 62 additions & 0 deletions internal/cmd/local/local/log_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package local

import (
"strings"
"testing"
)

var testLogs = strings.TrimSpace(`
2024-09-12 15:56:25 INFO i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...
2024-09-12 15:56:30 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused
2024-09-12 15:56:31 ERROR i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.
io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:54) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.initializeDatabases(Bootloader.java:229) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.load(Bootloader.java:104) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Application.main(Application.java:22) [io.airbyte-airbyte-bootloader-0.64.3.jar:?]
Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.
at io.airbyte.db.check.DatabaseAvailabilityCheck.check(DatabaseAvailabilityCheck.java:40) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:45) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
... 3 more
2024-09-12 15:56:31 INFO i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down
2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml
`)

func TestJavaLogScanner(t *testing.T) {
s := newLogScanner(strings.NewReader(testLogs))

expectLogLine := func(level, msg string) {
s.Scan()

if s.line.level != level {
t.Errorf("expected level %q but got %q", level, s.line.level)
}
if s.line.msg != msg {
t.Errorf("expected msg %q but got %q", msg, s.line.msg)
}
if s.Err() != nil {
t.Errorf("unexpected error %v", s.Err())
}
}

expectLogLine("INFO", "i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...")
expectLogLine("WARN", "i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused")
expectLogLine("ERROR", "i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.")
expectLogLine("ERROR", "io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.")
expectLogLine("ERROR", "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.")
expectLogLine("INFO", "i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down")
expectLogLine("INFO", "2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml")
}

func TestLastErrorLog(t *testing.T) {
l, err := getLastLogError(strings.NewReader(testLogs))
if err != nil {
t.Errorf("unexpected error %s", err)
}
expect := "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database."
if l != expect {
t.Errorf("expected %q but got %q", expect, l)
}
}


0 comments on commit 1927aa3

Please sign in to comment.