From cf55c0d03d2f946ca22539d2ecee186b4184e424 Mon Sep 17 00:00:00 2001 From: Nick Mitchell Date: Fri, 20 Sep 2024 12:04:34 -0400 Subject: [PATCH] feat: add component prefix to log line output Signed-off-by: Nick Mitchell --- pkg/be/ibmcloud/logs.go | 3 ++- pkg/be/kubernetes/logs.go | 9 +++++---- pkg/be/local/streamer.go | 28 ++++++++++++++-------------- pkg/be/streamer/streamer.go | 9 ++++++++- pkg/observe/logs.go | 22 ++++++++++++++++++++-- 5 files changed, 49 insertions(+), 22 deletions(-) diff --git a/pkg/be/ibmcloud/logs.go b/pkg/be/ibmcloud/logs.go index e9e08038..1988218d 100644 --- a/pkg/be/ibmcloud/logs.go +++ b/pkg/be/ibmcloud/logs.go @@ -3,10 +3,11 @@ package ibmcloud import ( "fmt" + "lunchpail.io/pkg/be/streamer" "lunchpail.io/pkg/lunchpail" ) // Stream logs from a given Component to the given channel -func (streamer Streamer) ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error { +func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts streamer.LogOptions) error { return fmt.Errorf("Unsupported operation: 'ComponentLogs'") } diff --git a/pkg/be/kubernetes/logs.go b/pkg/be/kubernetes/logs.go index f1242e92..286d8bc7 100644 --- a/pkg/be/kubernetes/logs.go +++ b/pkg/be/kubernetes/logs.go @@ -14,6 +14,7 @@ import ( "k8s.io/client-go/kubernetes" "lunchpail.io/pkg/be/events" + "lunchpail.io/pkg/be/streamer" "lunchpail.io/pkg/lunchpail" ) @@ -39,19 +40,19 @@ func (streamer Streamer) podLogs(podName string, component lunchpail.Component, } // TODO port this to use client-go -func (streamer Streamer) ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error { +func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts streamer.LogOptions) error { containers := "main" runSelector := ",app.kubernetes.io/instance=" + streamer.runname followFlag := "" - if follow { + if opts.Follow { followFlag = "-f" } selector := "app.kubernetes.io/component=" + string(component) + runSelector - cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99 | grep -v 'workerpool worker'" + cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(opts.Tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99 | grep -v 'workerpool worker'" - if verbose { + if opts.Verbose { fmt.Fprintf(os.Stderr, "Tracking logs of component=%s\n", component) fmt.Fprintf(os.Stderr, "Tracking logs via cmdline=%s\n", cmdline) } diff --git a/pkg/be/local/streamer.go b/pkg/be/local/streamer.go index 0ce2367d..77012a79 100644 --- a/pkg/be/local/streamer.go +++ b/pkg/be/local/streamer.go @@ -121,7 +121,7 @@ func (s localStreamer) QueueStats(c chan qstat.Model, opts qstat.Options) error return err } - tail, err := tailfChan(f, opts.Follow, opts.Verbose) + tail, err := tailfChan(f, streamer.LogOptions{Follow: opts.Follow, Verbose: opts.Verbose}) if err != nil { return err } @@ -142,7 +142,7 @@ func (s localStreamer) QueueStats(c chan qstat.Model, opts qstat.Options) error return streamer.QstatFromChan(s.Context, lines, c) } -func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool) error { +func (s localStreamer) watchForWorkerPools(logdir string, opts streamer.LogOptions) error { watching := make(map[string]bool) group, _ := errgroup.WithContext(s.Context) @@ -160,7 +160,7 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool) if !alreadyWatching || !exists { watching[file] = true group.Go(func() error { - return tailf(filepath.Join(logdir, file), follow, verbose) + return tailf(filepath.Join(logdir, file), opts) }) } } @@ -169,7 +169,7 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool) running, err := isRunning(s.runname) if err != nil { return err - } else if !running || !follow { + } else if !running || !opts.Follow { break } @@ -185,7 +185,7 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool) } // Stream logs from a given Component to os.Stdout -func (s localStreamer) ComponentLogs(c lunchpail.Component, taillines int, follow, verbose bool) error { +func (s localStreamer) ComponentLogs(c lunchpail.Component, opts streamer.LogOptions) error { logdir, err := files.LogDir(s.runname, true) if err != nil { return err @@ -193,35 +193,35 @@ func (s localStreamer) ComponentLogs(c lunchpail.Component, taillines int, follo switch c { case lunchpail.WorkersComponent: - return s.watchForWorkerPools(logdir, follow, verbose) + return s.watchForWorkerPools(logdir, opts) default: // TODO allow caller to select stderr versus stdout group, _ := errgroup.WithContext(s.Context) - group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".out"), follow, verbose) }) - group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".err"), follow, verbose) }) + group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".out"), opts) }) + group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".err"), opts) }) return group.Wait() } } -func tailfChan(outfile string, follow, verbose bool) (*tail.Tail, error) { +func tailfChan(outfile string, opts streamer.LogOptions) (*tail.Tail, error) { Logger := tail.DiscardingLogger - if verbose { + if opts.Verbose { // this tells tailf to use its default logger Logger = nil } - return tail.TailFile(outfile, tail.Config{Follow: follow, ReOpen: follow, Logger: Logger}) + return tail.TailFile(outfile, tail.Config{Follow: opts.Follow, ReOpen: opts.Follow, Logger: Logger}) } -func tailf(outfile string, follow, verbose bool) error { - c, err := tailfChan(outfile, follow, verbose) +func tailf(outfile string, opts streamer.LogOptions) error { + c, err := tailfChan(outfile, opts) if err != nil { return err } for line := range c.Lines { - fmt.Println(line.Text) + fmt.Printf("%s%s\n", opts.LinePrefix, line.Text) } return nil diff --git a/pkg/be/streamer/streamer.go b/pkg/be/streamer/streamer.go index ef11c478..5d6db8b3 100644 --- a/pkg/be/streamer/streamer.go +++ b/pkg/be/streamer/streamer.go @@ -7,6 +7,13 @@ import ( "lunchpail.io/pkg/lunchpail" ) +type LogOptions struct { + Tail int + Follow bool + Verbose bool + LinePrefix string +} + type Streamer interface { // RunEvents() (chan events.Message, error) @@ -21,5 +28,5 @@ type Streamer interface { QueueStats(c chan qstat.Model, opts qstat.Options) error // Stream logs from a given Component to os.Stdout - ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error + ComponentLogs(component lunchpail.Component, opts LogOptions) error } diff --git a/pkg/observe/logs.go b/pkg/observe/logs.go index 6bbc26e0..5e88b8ad 100644 --- a/pkg/observe/logs.go +++ b/pkg/observe/logs.go @@ -2,12 +2,15 @@ package observe import ( "context" + "fmt" "golang.org/x/sync/errgroup" "lunchpail.io/pkg/be" "lunchpail.io/pkg/be/runs/util" + "lunchpail.io/pkg/be/streamer" "lunchpail.io/pkg/lunchpail" + "lunchpail.io/pkg/observe/colors" ) type LogsOptions struct { @@ -27,11 +30,26 @@ func Logs(ctx context.Context, runnameIn string, backend be.Backend, opts LogsOp opts.Components = lunchpail.AllUserComponents } + useComponentPrefix := len(opts.Components) > 1 + group, _ := errgroup.WithContext(ctx) - streamer := backend.Streamer(ctx, runname) + s := backend.Streamer(ctx, runname) for _, component := range opts.Components { group.Go(func() error { - return streamer.ComponentLogs(component, opts.Tail, opts.Follow, opts.Verbose) + prefix := "" + if useComponentPrefix { + prefix = colors.ComponentStyle(component).Render(fmt.Sprintf("%-8s", lunchpail.ComponentShortName(component))) + } + + return s.ComponentLogs( + component, + streamer.LogOptions{ + Tail: opts.Tail, + Follow: opts.Follow, + Verbose: opts.Verbose, + LinePrefix: prefix, + }, + ) }) }