Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove detected goroutine leaks #2833

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"io"
stdlog "log"
"strconv"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -20,20 +21,22 @@ import (
"go.k6.io/k6/log"
)

const waitRemoteLoggerTimeout = time.Second * 5
const waitLoggerCloseTimeout = time.Second * 5

// This is to keep all fields needed for the main/root k6 command
type rootCommand struct {
globalState *state.GlobalState

cmd *cobra.Command
loggerStopped <-chan struct{}
stopLoggersCh chan struct{}
loggersWg sync.WaitGroup
loggerIsRemote bool
}

func newRootCommand(gs *state.GlobalState) *rootCommand {
c := &rootCommand{
globalState: gs,
globalState: gs,
stopLoggersCh: make(chan struct{}),
}
// the base command when called without any subcommands.
rootCmd := &cobra.Command{
Expand Down Expand Up @@ -66,37 +69,31 @@ func newRootCommand(gs *state.GlobalState) *rootCommand {
}

func (c *rootCommand) persistentPreRunE(cmd *cobra.Command, args []string) error {
var err error

c.loggerStopped, err = c.setupLoggers()
err := c.setupLoggers(c.stopLoggersCh)
if err != nil {
return err
}
select {
case <-c.loggerStopped:
default:
c.loggerIsRemote = true
}

stdlog.SetOutput(c.globalState.Logger.Writer())
c.globalState.Logger.Debugf("k6 version: v%s", consts.FullVersion())
return nil
}

func (c *rootCommand) execute() {
ctx, cancel := context.WithCancel(c.globalState.Ctx)
defer cancel()
c.globalState.Ctx = ctx

exitCode := -1
defer func() {
cancel()
c.stopLoggers()
c.globalState.OSExit(exitCode)
}()

err := c.cmd.Execute()
if err == nil {
cancel()
c.waitRemoteLogger()
// TODO: explicitly call c.globalState.osExit(0), for simpler tests and clarity?
exitCode = 0
return
}

exitCode := -1
var ecerr errext.HasExitCode
if errors.As(err, &ecerr) {
exitCode = int(ecerr.ExitCode())
Expand All @@ -117,11 +114,7 @@ func (c *rootCommand) execute() {
c.globalState.Logger.WithFields(fields).Error(errText)
if c.loggerIsRemote {
c.globalState.FallbackLogger.WithFields(fields).Error(errText)
cancel()
c.waitRemoteLogger()
}

c.globalState.OSExit(exitCode)
}

// Execute adds all child commands to the root command sets flags appropriately.
Expand All @@ -138,13 +131,17 @@ func ExecuteWithGlobalState(gs *state.GlobalState) {
newRootCommand(gs).execute()
}

func (c *rootCommand) waitRemoteLogger() {
if c.loggerIsRemote {
select {
case <-c.loggerStopped:
case <-time.After(waitRemoteLoggerTimeout):
c.globalState.FallbackLogger.Errorf("Remote logger didn't stop in %s", waitRemoteLoggerTimeout)
}
func (c *rootCommand) stopLoggers() {
done := make(chan struct{})
go func() {
c.loggersWg.Wait()
close(done)
}()
close(c.stopLoggersCh)
select {
case <-done:
case <-time.After(waitLoggerCloseTimeout):
c.globalState.FallbackLogger.Errorf("The logger didn't stop in %s", waitLoggerCloseTimeout)
}
}

Expand Down Expand Up @@ -201,14 +198,16 @@ func (f RawFormatter) Format(entry *logrus.Entry) ([]byte, error) {
// The returned channel will be closed when the logger has finished flushing and pushing logs after
// the provided context is closed. It is closed if the logger isn't buffering and sending messages
// Asynchronously
func (c *rootCommand) setupLoggers() (<-chan struct{}, error) {
ch := make(chan struct{})
close(ch)

func (c *rootCommand) setupLoggers(stop <-chan struct{}) error {
if c.globalState.Flags.Verbose {
c.globalState.Logger.SetLevel(logrus.DebugLevel)
}

var (
hook log.AsyncHook
err error
)

loggerForceColors := false // disable color by default
switch line := c.globalState.Flags.LogOutput; {
case line == "stderr":
Expand All @@ -218,33 +217,24 @@ func (c *rootCommand) setupLoggers() (<-chan struct{}, error) {
loggerForceColors = !c.globalState.Flags.NoColor && c.globalState.Stdout.IsTTY
c.globalState.Logger.SetOutput(c.globalState.Stdout)
case line == "none":
c.globalState.Logger.SetOutput(ioutil.Discard)

c.globalState.Logger.SetOutput(io.Discard)
case strings.HasPrefix(line, "loki"):
ch = make(chan struct{}) // TODO: refactor, get it from the constructor
hook, err := log.LokiFromConfigLine(c.globalState.Ctx, c.globalState.FallbackLogger, line, ch)
c.loggerIsRemote = true
hook, err = log.LokiFromConfigLine(c.globalState.FallbackLogger, line)
if err != nil {
return nil, err
return err
}
c.globalState.Logger.AddHook(hook)
c.globalState.Logger.SetOutput(ioutil.Discard) // don't output to anywhere else
c.globalState.Flags.LogFormat = "raw"

case strings.HasPrefix(line, "file"):
ch = make(chan struct{}) // TODO: refactor, get it from the constructor
hook, err := log.FileHookFromConfigLine(
c.globalState.Ctx, c.globalState.FS, c.globalState.Getwd,
c.globalState.FallbackLogger, line, ch,
hook, err = log.FileHookFromConfigLine(
c.globalState.FS, c.globalState.Getwd,
c.globalState.FallbackLogger, line,
)
if err != nil {
return nil, err
return err
}

c.globalState.Logger.AddHook(hook)
c.globalState.Logger.SetOutput(ioutil.Discard)

default:
return nil, fmt.Errorf("unsupported log output '%s'", line)
return fmt.Errorf("unsupported log output '%s'", line)
}

switch c.globalState.Flags.LogFormat {
Expand All @@ -260,5 +250,36 @@ func (c *rootCommand) setupLoggers() (<-chan struct{}, error) {
})
c.globalState.Logger.Debug("Logger format: TEXT")
}
return ch, nil

cancel := func() {} // noop as default
if hook != nil {
ctx := context.Background()
ctx, cancel = context.WithCancel(ctx)
c.setLoggerHook(ctx, hook)
}

// Sometimes the Go runtime uses the standard log output to
// log some messages directly.
// It does when an invalid char is found in a Cookie.
// Check for details https://github.com/grafana/k6/issues/711#issue-341414887
w := c.globalState.Logger.Writer()
stdlog.SetOutput(w)
c.loggersWg.Add(1)
go func() {
<-stop
cancel()
_ = w.Close()
c.loggersWg.Done()
}()
return nil
}

func (c *rootCommand) setLoggerHook(ctx context.Context, h log.AsyncHook) {
c.loggersWg.Add(1)
go func() {
h.Listen(ctx)
c.loggersWg.Done()
}()
c.globalState.Logger.AddHook(h)
c.globalState.Logger.SetOutput(io.Discard) // don't output to anywhere else
}
45 changes: 45 additions & 0 deletions cmd/stdlog_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cmd

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cmd/tests"
"go.k6.io/k6/lib/testutils/httpmultibin"
)

// SetOutput sets the global log so it is racy with other tests
na-- marked this conversation as resolved.
Show resolved Hide resolved
//
//nolint:paralleltest
func TestStdLogOutputIsSet(t *testing.T) {
tb := httpmultibin.NewHTTPMultiBin(t)
ts := tests.NewGlobalTestState(t)
// Sometimes the Go runtime uses the standard log output to
// log some messages directly.
// It does when an invalid char is found in a Cookie.
// Check for details https://github.com/grafana/k6/issues/711#issue-341414887
ts.Stdin = bytes.NewReader([]byte(tb.Replacer.Replace(`
import http from 'k6/http';
codebien marked this conversation as resolved.
Show resolved Hide resolved
export const options = {
hosts: {
"HTTPSBIN_DOMAIN": "HTTPSBIN_IP",
},
insecureSkipTLSVerify: true,
}
export default function() {
http.get("HTTPSBIN_URL/get", {
"cookies": {
"test": "\""
},
})
}`)))

ts.CmdArgs = []string{"k6", "run", "-i", "1", "-"}
newRootCommand(ts.GlobalState).execute()

entries := ts.LoggerHook.Drain()
require.Len(t, entries, 1)
assert.Contains(t, entries[0].Message, "Cookie.Value; dropping invalid bytes")
}
29 changes: 25 additions & 4 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,11 +1716,9 @@ func TestPrometheusRemoteWriteOutput(t *testing.T) {
t.Parallel()

ts := NewGlobalTestState(t)
ts.Env["K6_PROMETHEUS_RW_SERVER_URL"] = "http://a-fake-url-for-fail"
ts.CmdArgs = []string{"k6", "run", "--out", "experimental-prometheus-rw", "-"}
ts.Stdin = bytes.NewBufferString(`
import exec from 'k6/execution';
export default function () {};
`)
ts.Stdin = bytes.NewBufferString(`export default function () {};`)

cmd.ExecuteWithGlobalState(ts.GlobalState)
ts.OutMutex.Lock()
Expand Down Expand Up @@ -1881,3 +1879,26 @@ func TestRunStaticArchives(t *testing.T) {
})
}
}

func TestBadLogOutput(t *testing.T) {
t.Parallel()

cases := map[string]string{
"NotExist": "badout",
"FileBadConfig": "file=,levels=bad",
"LokiBadConfig": "loki=,levels=bad",
}

for name, tc := range cases {
name := name
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
ts := NewGlobalTestState(t)
ts.CmdArgs = []string{"k6", "run", "--log-output", tc, "-"}
ts.Stdin = bytes.NewBufferString(`export default function () {};`)
ts.ExpectedExitCode = -1
cmd.ExecuteWithGlobalState(ts.GlobalState)
})
}
}
5 changes: 1 addition & 4 deletions cmd/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ func Main(m *testing.M) {
}()

defer func() {
// TODO: figure out why logrus' `Entry.WriterLevel` goroutine sticks
// around and remove this exception.
opt := goleak.IgnoreTopFunction("io.(*pipe).read")
if err := goleak.Find(opt); err != nil {
if err := goleak.Find(); err != nil {
fmt.Println(err)
exitCode = 3
}
Expand Down
25 changes: 25 additions & 0 deletions cmd/tests/tests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,33 @@ package tests

import (
"testing"

"github.com/stretchr/testify/assert"
"go.k6.io/k6/cmd"
)

func TestMain(m *testing.M) {
Main(m)
}

func TestRootCommand(t *testing.T) {
t.Parallel()

cases := map[string][]string{
"Just root": {"k6"},
"Help flag": {"k6", "--help"},
}

helptxt := "Usage:\n k6 [command]\n\nAvailable Commands"
for name, args := range cases {
name, args := name, args
t.Run(name, func(t *testing.T) {
t.Parallel()
ts := NewGlobalTestState(t)
ts.CmdArgs = args
cmd.ExecuteWithGlobalState(ts.GlobalState)
assert.Len(t, ts.LoggerHook.Drain(), 0)
assert.Contains(t, ts.Stdout.String(), helptxt)
})
}
}
Loading