Skip to content

Commit

Permalink
feat: add component prefix to log line output
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Mitchell <nickm@us.ibm.com>
  • Loading branch information
starpit committed Sep 20, 2024
1 parent 8a1cd06 commit cf55c0d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
3 changes: 2 additions & 1 deletion pkg/be/ibmcloud/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package ibmcloud
import (
"fmt"

"lunchpail.io/pkg/be/streamer"
"lunchpail.io/pkg/lunchpail"
)

// Stream logs from a given Component to the given channel
func (streamer Streamer) ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error {
func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts streamer.LogOptions) error {
return fmt.Errorf("Unsupported operation: 'ComponentLogs'")
}
9 changes: 5 additions & 4 deletions pkg/be/kubernetes/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/client-go/kubernetes"

"lunchpail.io/pkg/be/events"
"lunchpail.io/pkg/be/streamer"
"lunchpail.io/pkg/lunchpail"
)

Expand All @@ -39,19 +40,19 @@ func (streamer Streamer) podLogs(podName string, component lunchpail.Component,
}

// TODO port this to use client-go
func (streamer Streamer) ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error {
func (streamer Streamer) ComponentLogs(component lunchpail.Component, opts streamer.LogOptions) error {
containers := "main"
runSelector := ",app.kubernetes.io/instance=" + streamer.runname

followFlag := ""
if follow {
if opts.Follow {
followFlag = "-f"
}

selector := "app.kubernetes.io/component=" + string(component) + runSelector
cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99 | grep -v 'workerpool worker'"
cmdline := "kubectl logs -n " + streamer.backend.namespace + " -l " + selector + " --tail=" + strconv.Itoa(opts.Tail) + " " + followFlag + " -c " + containers + " --max-log-requests=99 | grep -v 'workerpool worker'"

if verbose {
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Tracking logs of component=%s\n", component)
fmt.Fprintf(os.Stderr, "Tracking logs via cmdline=%s\n", cmdline)
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/be/local/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s localStreamer) QueueStats(c chan qstat.Model, opts qstat.Options) error
return err
}

tail, err := tailfChan(f, opts.Follow, opts.Verbose)
tail, err := tailfChan(f, streamer.LogOptions{Follow: opts.Follow, Verbose: opts.Verbose})
if err != nil {
return err
}
Expand All @@ -142,7 +142,7 @@ func (s localStreamer) QueueStats(c chan qstat.Model, opts qstat.Options) error
return streamer.QstatFromChan(s.Context, lines, c)
}

func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool) error {
func (s localStreamer) watchForWorkerPools(logdir string, opts streamer.LogOptions) error {
watching := make(map[string]bool)
group, _ := errgroup.WithContext(s.Context)

Expand All @@ -160,7 +160,7 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool)
if !alreadyWatching || !exists {
watching[file] = true
group.Go(func() error {
return tailf(filepath.Join(logdir, file), follow, verbose)
return tailf(filepath.Join(logdir, file), opts)
})
}
}
Expand All @@ -169,7 +169,7 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool)
running, err := isRunning(s.runname)
if err != nil {
return err
} else if !running || !follow {
} else if !running || !opts.Follow {
break
}

Expand All @@ -185,43 +185,43 @@ func (s localStreamer) watchForWorkerPools(logdir string, follow, verbose bool)
}

// Stream logs from a given Component to os.Stdout
func (s localStreamer) ComponentLogs(c lunchpail.Component, taillines int, follow, verbose bool) error {
func (s localStreamer) ComponentLogs(c lunchpail.Component, opts streamer.LogOptions) error {
logdir, err := files.LogDir(s.runname, true)
if err != nil {
return err
}

switch c {
case lunchpail.WorkersComponent:
return s.watchForWorkerPools(logdir, follow, verbose)
return s.watchForWorkerPools(logdir, opts)

default:
// TODO allow caller to select stderr versus stdout
group, _ := errgroup.WithContext(s.Context)
group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".out"), follow, verbose) })
group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".err"), follow, verbose) })
group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".out"), opts) })
group.Go(func() error { return tailf(filepath.Join(logdir, string(c)+".err"), opts) })
return group.Wait()
}
}

func tailfChan(outfile string, follow, verbose bool) (*tail.Tail, error) {
func tailfChan(outfile string, opts streamer.LogOptions) (*tail.Tail, error) {
Logger := tail.DiscardingLogger
if verbose {
if opts.Verbose {
// this tells tailf to use its default logger
Logger = nil
}

return tail.TailFile(outfile, tail.Config{Follow: follow, ReOpen: follow, Logger: Logger})
return tail.TailFile(outfile, tail.Config{Follow: opts.Follow, ReOpen: opts.Follow, Logger: Logger})
}

func tailf(outfile string, follow, verbose bool) error {
c, err := tailfChan(outfile, follow, verbose)
func tailf(outfile string, opts streamer.LogOptions) error {
c, err := tailfChan(outfile, opts)
if err != nil {
return err
}

for line := range c.Lines {
fmt.Println(line.Text)
fmt.Printf("%s%s\n", opts.LinePrefix, line.Text)
}

return nil
Expand Down
9 changes: 8 additions & 1 deletion pkg/be/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"lunchpail.io/pkg/lunchpail"
)

type LogOptions struct {
Tail int
Follow bool
Verbose bool
LinePrefix string
}

type Streamer interface {
//
RunEvents() (chan events.Message, error)
Expand All @@ -21,5 +28,5 @@ type Streamer interface {
QueueStats(c chan qstat.Model, opts qstat.Options) error

// Stream logs from a given Component to os.Stdout
ComponentLogs(component lunchpail.Component, tail int, follow, verbose bool) error
ComponentLogs(component lunchpail.Component, opts LogOptions) error
}
22 changes: 20 additions & 2 deletions pkg/observe/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package observe

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"lunchpail.io/pkg/be"
"lunchpail.io/pkg/be/runs/util"
"lunchpail.io/pkg/be/streamer"
"lunchpail.io/pkg/lunchpail"
"lunchpail.io/pkg/observe/colors"
)

type LogsOptions struct {
Expand All @@ -27,11 +30,26 @@ func Logs(ctx context.Context, runnameIn string, backend be.Backend, opts LogsOp
opts.Components = lunchpail.AllUserComponents
}

useComponentPrefix := len(opts.Components) > 1

group, _ := errgroup.WithContext(ctx)
streamer := backend.Streamer(ctx, runname)
s := backend.Streamer(ctx, runname)
for _, component := range opts.Components {
group.Go(func() error {
return streamer.ComponentLogs(component, opts.Tail, opts.Follow, opts.Verbose)
prefix := ""
if useComponentPrefix {
prefix = colors.ComponentStyle(component).Render(fmt.Sprintf("%-8s", lunchpail.ComponentShortName(component)))
}

return s.ComponentLogs(
component,
streamer.LogOptions{
Tail: opts.Tail,
Follow: opts.Follow,
Verbose: opts.Verbose,
LinePrefix: prefix,
},
)
})
}

Expand Down

0 comments on commit cf55c0d

Please sign in to comment.