From 969f9bac8b8e562e8a8455169418b6422357203d Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 30 Jul 2020 16:57:40 +0300 Subject: [PATCH 01/24] Add support for pushing logs to loki --- cmd/root.go | 81 ++++++--- log/loki.go | 462 +++++++++++++++++++++++++++++++++++++++++++++++ log/loki_test.go | 112 ++++++++++++ 3 files changed, 631 insertions(+), 24 deletions(-) create mode 100644 log/loki.go create mode 100644 log/loki_test.go diff --git a/cmd/root.go b/cmd/root.go index 1929546fd06..9235d328c3e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -21,11 +21,14 @@ package cmd import ( + "context" "fmt" "io" - "log" + "io/ioutil" + stdlog "log" "os" "path/filepath" + "strings" "sync" "github.com/fatih/color" @@ -36,6 +39,7 @@ import ( "github.com/spf13/pflag" "github.com/loadimpact/k6/lib/consts" + "github.com/loadimpact/k6/log" ) var BannerColor = color.New(color.FgCyan) @@ -58,13 +62,15 @@ var defaultConfigFilePath = defaultConfigFileName // Updated with the user's con //nolint:gochecknoglobals var configFilePath = os.Getenv("K6_CONFIG") // Overridden by `-c`/`--config` flag! +//nolint:gochecknoglobals var ( - //TODO: have environment variables for configuring these? hopefully after we move away from global vars though... - verbose bool - quiet bool - noColor bool - logFmt string - address string + // TODO: have environment variables for configuring these? hopefully after we move away from global vars though... + verbose bool + quiet bool + noColor bool + logOutput string + logFmt string + address string ) // RootCmd represents the base command when called without any subcommands. @@ -74,8 +80,13 @@ var RootCmd = &cobra.Command{ Long: BannerColor.Sprintf("\n%s", consts.Banner), SilenceUsage: true, SilenceErrors: true, - PersistentPreRun: func(cmd *cobra.Command, args []string) { - setupLoggers(logFmt) + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + logger := logrus.StandardLogger() // don't use the global one to begin with + err := setupLoggers(logger, logFmt, logOutput) + if err != nil { + return err + } + if noColor { // TODO: figure out something else... currently, with the wrappers // below, we're stripping any colors from the output after we've @@ -92,8 +103,9 @@ var RootCmd = &cobra.Command{ stdout.Writer = colorable.NewNonColorable(os.Stdout) stderr.Writer = colorable.NewNonColorable(os.Stderr) } - log.SetOutput(logrus.StandardLogger().Writer()) - logrus.Debugf("k6 version: v%s", consts.FullVersion()) + stdlog.SetOutput(logger.Writer()) + logger.Debugf("k6 version: v%s", consts.FullVersion()) + return nil }, } @@ -116,14 +128,16 @@ func Execute() { func rootCmdPersistentFlagSet() *pflag.FlagSet { flags := pflag.NewFlagSet("", pflag.ContinueOnError) - //TODO: figure out a better way to handle the CLI flags - global variables are not very testable... :/ + // TODO: figure out a better way to handle the CLI flags - global variables are not very testable... :/ flags.BoolVarP(&verbose, "verbose", "v", false, "enable debug logging") flags.BoolVarP(&quiet, "quiet", "q", false, "disable progress updates") flags.BoolVar(&noColor, "no-color", false, "disable colored output") + flags.StringVar(&logOutput, "log-output", "stderr", + "change output to which logs go, possible values are stderr,stdout,none,loki[=host:port]") flags.StringVar(&logFmt, "logformat", "", "log output format") flags.StringVarP(&address, "address", "a", "localhost:6565", "address for the api server") - //TODO: Fix... This default value needed, so both CLI flags and environment variables work + // TODO: Fix... This default value needed, so both CLI flags and environment variables work flags.StringVarP(&configFilePath, "config", "c", configFilePath, "JSON config file") // And we also need to explicitly set the default value for the usage message here, so things // like `K6_CONFIG="blah" k6 run -h` don't produce a weird usage message @@ -157,7 +171,7 @@ func fprintf(w io.Writer, format string, a ...interface{}) (n int) { return n } -// RawFormatter it does nothing with the message just prints it +// RawFormater it does nothing with the message just prints it type RawFormater struct{} // Format renders a single log entry @@ -165,22 +179,41 @@ func (f RawFormater) Format(entry *logrus.Entry) ([]byte, error) { return append([]byte(entry.Message), '\n'), nil } -func setupLoggers(logFmt string) { +func setupLoggers(logger *logrus.Logger, logFmt string, logOutput string) error { if verbose { - logrus.SetLevel(logrus.DebugLevel) + logger.SetLevel(logrus.DebugLevel) + } + switch logOutput { + case "stderr": + logger.SetOutput(stderr) + case "stdout": + logger.SetOutput(stdout) + case "none": + logger.SetOutput(ioutil.Discard) + default: + if !strings.HasPrefix(logOutput, "loki") { + return fmt.Errorf("unsupported log output `%s`", logOutput) + } + hook, err := log.LokiFromConfigLine(context.Background(), logOutput) // TODO use some context that we can cancel + if err != nil { + return err + } + logger.AddHook(hook) + logger.SetOutput(ioutil.Discard) // don't output to anywhere else + logFmt = "raw" + noColor = true // disable color } - logrus.SetOutput(stderr) switch logFmt { case "raw": - logrus.SetFormatter(&RawFormater{}) - logrus.Debug("Logger format: RAW") + logger.SetFormatter(&RawFormater{}) + logger.Debug("Logger format: RAW") case "json": - logrus.SetFormatter(&logrus.JSONFormatter{}) - logrus.Debug("Logger format: JSON") + logger.SetFormatter(&logrus.JSONFormatter{}) + logger.Debug("Logger format: JSON") default: - logrus.SetFormatter(&logrus.TextFormatter{ForceColors: stderrTTY, DisableColors: noColor}) - logrus.Debug("Logger format: TEXT") + logger.SetFormatter(&logrus.TextFormatter{ForceColors: stderrTTY, DisableColors: noColor}) + logger.Debug("Logger format: TEXT") } - + return nil } diff --git a/log/loki.go b/log/loki.go new file mode 100644 index 00000000000..46c63f0d9d1 --- /dev/null +++ b/log/loki.go @@ -0,0 +1,462 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2020 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package log + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/sirupsen/logrus" +) + +type lokiHook struct { + addr string + labels [][2]string + ch chan *logrus.Entry + limit int + levels []logrus.Level + pushPeriod time.Duration + client *http.Client + ctx context.Context + profile bool +} + +// LokiFromConfigLine returns a new logrus.Hook that pushes logrus.Entrys to loki and is configured +// through the provided line +func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { + h := &lokiHook{ + addr: "http://127.0.0.1:3100/loki/api/v1/push", + limit: 100, + levels: logrus.AllLevels, + pushPeriod: time.Second * 1, + ctx: ctx, + } + if line == "loki" { + return h, nil + } + + parts := strings.SplitN(line, "=", 2) + if parts[0] != "loki" { + return nil, fmt.Errorf("loki configuration should be in the form `loki=url-to-push` but is `%s`", line) + } + args := strings.Split(parts[1], ",") + h.addr = args[0] + // TODO use something better ... maybe + // https://godoc.org/github.com/kubernetes/helm/pkg/strvals + // atleast until https://github.com/loadimpact/k6/issues/926? + if len(args) == 1 { + return h, nil + } + + for _, arg := range args[1:] { + paramParts := strings.SplitN(arg, "=", 2) + + if len(paramParts) != 2 { + return nil, fmt.Errorf("loki arguments should be in the form `address,key1=value1,key2=value2`, got %s", arg) + } + + key := paramParts[0] + value := paramParts[1] + + var err error + switch key { + case "pushPeriod": + h.pushPeriod, err = time.ParseDuration(value) + if err != nil { + return nil, fmt.Errorf("couldn't parse the loki pushPeriod %w", err) + } + case "profile": + h.profile = true + case "limit": + h.limit, err = strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("couldn't parse the loki limit as a number %w", err) + } + case "level": + h.levels, err = getLevels(value) + if err != nil { + return nil, err + } + default: + if strings.HasPrefix(key, "label.") { + labelKey := strings.TrimPrefix(key, "label.") + h.labels = append(h.labels, [2]string{labelKey, value}) + continue + } + + return nil, fmt.Errorf("unknown loki config key %s", key) + } + } + + h.client = &http.Client{Timeout: h.pushPeriod} + + h.start() + return h, nil +} + +func getLevels(level string) ([]logrus.Level, error) { + // TODO figure out if `tracing`,`fatal` and `panic` should be included + levels := []logrus.Level{} + switch level { + case "debug": + levels = append(levels, logrus.DebugLevel) + fallthrough + case "info": + levels = append(levels, logrus.InfoLevel) + fallthrough + case "warning": + levels = append(levels, logrus.WarnLevel) + fallthrough + case "error": + levels = append(levels, logrus.ErrorLevel) + default: + return nil, fmt.Errorf("unknown log level %s", level) + } + return levels, nil +} + +func (h *lokiHook) start() { + h.ch = make(chan *logrus.Entry, 1000) + go h.loop() +} + +// fill one of two equally sized slices with entries and then push it while filling the other one +// TODO benchmark this +//nolint:funlen +func (h *lokiHook) loop() { + var ( + msgs = make([]tmpMsg, h.limit) + msgsToPush = make([]tmpMsg, h.limit) + dropped int + count int + ticker = time.NewTicker(h.pushPeriod) + pushCh = make(chan chan time.Duration) + ) + + defer ticker.Stop() + defer close(pushCh) + + go func() { + oldLogs := make([]tmpMsg, 0, h.limit*2) + for ch := range pushCh { + msgsToPush, msgs = msgs, msgsToPush + oldCount, oldDropped := count, dropped + count, dropped = 0, 0 + bufferTime := <-ch + close(ch) // signal that more buffering can continue + + copy(oldLogs[len(oldLogs):len(oldLogs)+oldCount], msgsToPush[:oldCount]) + oldLogs = oldLogs[:len(oldLogs)+oldCount] + + t := time.Now() + cutOffIndex := sortAndSplitMsgs(oldLogs, bufferTime) + if cutOffIndex == 0 { + continue + } + t1 := time.Since(t) + + strms := h.createPushMessage(oldLogs, cutOffIndex, oldDropped) + if cutOffIndex > len(oldLogs) { + oldLogs = oldLogs[:0] + continue + } + oldLogs = oldLogs[:copy(oldLogs, oldLogs[cutOffIndex:])] + t2 := time.Since(t) - t1 + + var b bytes.Buffer + _, err := strms.WriteTo(&b) + if err != nil { + fmt.Printf("Error while marshaling logs for loki %s\n", err) + continue + } + size := b.Len() + t3 := time.Since(t) - t2 - t1 + + err = h.push(b) + if err != nil { + fmt.Printf("Error while sending logs to loki %s\n", err) + continue + } + t4 := time.Since(t) - t3 - t2 - t1 + + if h.profile { + fmt.Printf("sorting=%s, adding=%s marshalling=%s sending=%s count=%d final_size=%d\n", + t1, t2, t3, t4, cutOffIndex, size) + } + } + }() + + for { + select { + case entry := <-h.ch: + if count == h.limit { + dropped++ + continue + } + + // Arguably we can directly generate the final marshalled version of the labels right here + // through sorting the entry.Data, removing additionalparams from it and then dumping it + // as the final marshal and appending level and h.labels after it. + // If we reuse some kind of big enough `[]byte` buffer we can also possibly skip on some + // of allocation. Combined with the cutoff part and directly pushing in the final data + // type this can be really a lot faster and to use a lot less memory + labels := make(map[string]string, len(entry.Data)+1) + for k, v := range entry.Data { + labels[k] = fmt.Sprint(v) // TODO optimize ? + } + for _, params := range h.labels { + labels[params[0]] = params[1] + } + labels["level"] = entry.Level.String() + // have the cutoff here ? + // if we cutoff here we can cut somewhat on the backbuffers and optimize the inserting + // in/creating of the final Streams that we push + msgs[count] = tmpMsg{ + labels: labels, + msg: entry.Message, + t: entry.Time.UnixNano(), + } + count++ + case <-ticker.C: + ch := make(chan time.Duration) + pushCh <- ch + ch <- h.pushPeriod / 2 + <-ch + case <-h.ctx.Done(): + ch := make(chan time.Duration) + pushCh <- ch + ch <- 0 + <-ch + return + } + } +} + +func sortAndSplitMsgs(msgs []tmpMsg, bufferTime time.Duration) int { + if len(msgs) == 0 { + return 0 + } + + // TODO using time.Before was giving a lot of out of order, but even now, there are some, if the + // limit is big enough ... + sort.Slice(msgs, func(i, j int) bool { + return msgs[i].t < msgs[j].t + }) + + // We can technically cutoff during the addMsg phase, which will be even better if we sort after + // it as well ... maybe + cutOff := time.Now().Add(-bufferTime).UnixNano() // probably better to be configurable + + cutOffIndex := sort.Search(len(msgs), func(i int) bool { + return !(msgs[i].t < cutOff) + }) + return cutOffIndex +} + +func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *lokiPushMessage { + strms := new(lokiPushMessage) + for _, msg := range msgs[:cutOffIndex] { + strms.add(msg) + } + if dropped != 0 { + labels := make(map[string]string, 2+len(h.labels)) + labels["level"] = logrus.WarnLevel.String() + labels["dropped"] = strconv.Itoa(dropped) + for _, params := range h.labels { + labels[params[0]] = params[1] + } + + msg := tmpMsg{ + labels: labels, + msg: fmt.Sprintf("k6 dropped some packages because they were above the limit of %d/%s", + h.limit, h.pushPeriod), + t: msgs[cutOffIndex-1].t, + } + strms.add(msg) + } + return strms +} + +func (h *lokiHook) push(b bytes.Buffer) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", h.addr, &b) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + res, err := h.client.Do(req) + + if res != nil { + _, _ = io.Copy(ioutil.Discard, res.Body) + _ = res.Body.Close() + } + return err +} + +func mapEqual(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if v2, ok := b[k]; !ok || v2 != v { + return false + } + } + return true +} + +func (strms *lokiPushMessage) add(entry tmpMsg) { + var foundStrm *stream + for _, strm := range strms.Streams { + if mapEqual(strm.Stream, entry.labels) { + foundStrm = strm + break + } + } + + if foundStrm == nil { + foundStrm = &stream{Stream: entry.labels} + strms.Streams = append(strms.Streams, foundStrm) + } + + foundStrm.Values = append(foundStrm.Values, logEntry{t: entry.t, msg: entry.msg}) +} + +// this is temporary message format used to not keep the logrus.Entry around too long and to make +// sorting easier +type tmpMsg struct { + labels map[string]string + t int64 + msg string +} + +func (h *lokiHook) Fire(entry *logrus.Entry) error { + h.ch <- entry + return nil +} + +func (h *lokiHook) Levels() []logrus.Level { + return h.levels +} + +/* +{ + "streams": [ + { + "stream": { + "label1": "value1" + "label2": "value2" + }, + "values": [ // the nanoseconds need to be in order + [ "", "" ], + [ "", "" ] + ] + } + ] +} +*/ +type lokiPushMessage struct { + Streams []*stream `json:"streams"` +} + +func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { + var k int + write := func(b []byte) { + if err != nil { + return + } + k, err = w.Write(b) + n += int64(k) + } + // 10+ 9 for the amount of nanoseconds between 2001 and 2286 also it overflows in the year 2262 ;) + var nanoseconds [19]byte + write([]byte(`{"streams":[`)) + for i, str := range strms.Streams { + if i != 0 { + write([]byte(`,`)) + } + write([]byte(`{"stream":{`)) + var f bool + for k, v := range str.Stream { + if f { + write([]byte(`,`)) + } + f = true + write([]byte(`"`)) + write([]byte(k)) + write([]byte(`":"`)) + write([]byte(v)) + write([]byte(`"`)) + } + write([]byte(`},"values":[`)) + for j, v := range str.Values { + if j != 0 { + write([]byte(`,`)) + } + write([]byte(`["`)) + strconv.AppendInt(nanoseconds[:0], v.t, 10) + write(nanoseconds[:]) + write([]byte(`","`)) + write([]byte(v.msg)) + write([]byte(`"]`)) + } + write([]byte(`]}`)) + } + + write([]byte(`]}`)) + + return n, err +} + +type stream struct { + Stream map[string]string `json:"stream"` + Values []logEntry `json:"values"` +} + +type logEntry struct { + t int64 // nanoseconds + msg string // maybe intern those as they are likely to be the same for an interval +} + +// rewrite this either with easyjson or with a custom marshalling +func (l logEntry) MarshalJSON() ([]byte, error) { + // 2 for '[]', 1 for ',', 4 for '"' and 10 + 9 for the amount of nanoseconds between 2001 and + // 2286 also it overflows in the year 2262 ;) + b := make([]byte, 2, len(l.msg)+26) + b[0] = '[' + b[1] = '"' + b = strconv.AppendInt(b, l.t, 10) + b = append(b, '"', ',', '"') + b = append(b, l.msg...) + b = append(b, '"', ']') + return b, nil +} diff --git a/log/loki_test.go b/log/loki_test.go new file mode 100644 index 00000000000..8572a30f937 --- /dev/null +++ b/log/loki_test.go @@ -0,0 +1,112 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2020 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package log + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestSyslogFromConfigLine(t *testing.T) { + t.Parallel() + tests := [...]struct { + line string + err bool + res lokiHook + }{ + { + line: "loki", // default settings + res: lokiHook{ + ctx: context.Background(), + addr: "http://127.0.0.1:3100/loki/api/v1/push", + limit: 100, + pushPeriod: time.Second * 1, + levels: logrus.AllLevels, + }, + }, + { + line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=debug,pushPeriod=5m32s", + res: lokiHook{ + ctx: context.Background(), + addr: "somewhere:1233", + limit: 32, + pushPeriod: time.Minute*5 + time.Second*32, + levels: []logrus.Level{logrus.DebugLevel, logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}, + labels: [][2]string{{"something", "else"}, {"foo", "bar"}}, + }, + }, + { + line: "lokino", + err: true, + }, + { + line: "loki=something,limit=word", + err: true, + }, + { + line: "loki=something,level=notlevel", + err: true, + }, + { + line: "loki=something,unknownoption", + err: true, + }, + { + line: "loki=something,label=somethng", + err: true, + }, + } + + for _, test := range tests { + test := test + t.Run(test.line, func(t *testing.T) { + // no parallel because this is way too fast and parallel will only slow it down + + res, err := LokiFromConfigLine(context.Background(), test.line) + + if test.err { + require.Error(t, err) + return + } + require.NoError(t, err) + test.res.client = res.(*lokiHook).client + test.res.ch = res.(*lokiHook).ch + require.Equal(t, &test.res, res) + }) + } +} + +func TestLogEntryMarshal(t *testing.T) { + entry := logEntry{ + t: 9223372036854775807, // the actual max + msg: "something", + } + expected := []byte(`["9223372036854775807","something"]`) + s, err := json.Marshal(entry) + require.NoError(t, err) + + require.Equal(t, expected, s) +} From 0a29fc485de9fc6599b713c328162c54a41925d5 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 11:33:28 +0300 Subject: [PATCH 02/24] Marshal the log message with encoding/json to escape it properly --- log/loki.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/log/loki.go b/log/loki.go index 46c63f0d9d1..c9ab9b44fb9 100644 --- a/log/loki.go +++ b/log/loki.go @@ -23,6 +23,7 @@ package log import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -425,9 +426,14 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { write([]byte(`["`)) strconv.AppendInt(nanoseconds[:0], v.t, 10) write(nanoseconds[:]) - write([]byte(`","`)) - write([]byte(v.msg)) - write([]byte(`"]`)) + write([]byte(`",`)) + var b []byte + b, err = json.Marshal(v.msg) + if err != nil { + return n, err + } + write(b) + write([]byte(`]`)) } write([]byte(`]}`)) } From 08ac162c1ce07a9aa97519fc9b9581e4304d689c Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 11:40:20 +0300 Subject: [PATCH 03/24] Support for redirecting the loki push and some more debugging info --- log/loki.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/log/loki.go b/log/loki.go index c9ab9b44fb9..8631c95cc51 100644 --- a/log/loki.go +++ b/log/loki.go @@ -308,15 +308,25 @@ func (h *lokiHook) push(b bytes.Buffer) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() + body := b.Bytes() + req, err := http.NewRequestWithContext(ctx, "GET", h.addr, &b) if err != nil { return err } + req.GetBody = func() (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewBuffer(body)), nil + } + req.Header.Set("Content-Type", "application/json") res, err := h.client.Do(req) if res != nil { + if res.StatusCode == 400 { + r, _ := ioutil.ReadAll(res.Body) // maybe limit it to something like the first 1000 characters? + fmt.Println("Got 400 from loki: " + string(r)) + } _, _ = io.Copy(ioutil.Discard, res.Body) _ = res.Body.Close() } From 9ffee173b754ff68d9c4dfa0554ae62fb45ac008 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 14:17:08 +0300 Subject: [PATCH 04/24] Add msgMaxSize flag to loki --- log/loki.go | 20 +++++++++++++++++++- log/loki_test.go | 4 +++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/log/loki.go b/log/loki.go index 8631c95cc51..7ef503d7f16 100644 --- a/log/loki.go +++ b/log/loki.go @@ -41,6 +41,7 @@ type lokiHook struct { labels [][2]string ch chan *logrus.Entry limit int + msgMaxSize int levels []logrus.Level pushPeriod time.Duration client *http.Client @@ -57,6 +58,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { levels: logrus.AllLevels, pushPeriod: time.Second * 1, ctx: ctx, + msgMaxSize: 1024 * 1024, // 1mb } if line == "loki" { return h, nil @@ -99,6 +101,11 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { if err != nil { return nil, fmt.Errorf("couldn't parse the loki limit as a number %w", err) } + case "msgMaxSize": + h.msgMaxSize, err = strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err) + } case "level": h.levels, err = getLevels(value) if err != nil { @@ -282,6 +289,7 @@ func sortAndSplitMsgs(msgs []tmpMsg, bufferTime time.Duration) int { func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *lokiPushMessage { strms := new(lokiPushMessage) + strms.msgMaxSize = h.msgMaxSize for _, msg := range msgs[:cutOffIndex] { strms.add(msg) } @@ -396,7 +404,8 @@ func (h *lokiHook) Levels() []logrus.Level { } */ type lokiPushMessage struct { - Streams []*stream `json:"streams"` + Streams []*stream `json:"streams"` + msgMaxSize int } func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { @@ -437,6 +446,15 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { strconv.AppendInt(nanoseconds[:0], v.t, 10) write(nanoseconds[:]) write([]byte(`",`)) + if len([]rune(v.msg)) > strms.msgMaxSize { + difference := int64(len(v.msg) - strms.msgMaxSize) + omitMsg := append(strconv.AppendInt([]byte("... omitting "), difference, 10), " characters ..."...) + v.msg = strings.Join([]string{ + string([]rune(v.msg)[:strms.msgMaxSize/2]), + string([]rune(v.msg)[len([]rune(v.msg))-strms.msgMaxSize/2:]), + }, string(omitMsg)) + } + var b []byte b, err = json.Marshal(v.msg) if err != nil { diff --git a/log/loki_test.go b/log/loki_test.go index 8572a30f937..80915a9afef 100644 --- a/log/loki_test.go +++ b/log/loki_test.go @@ -45,10 +45,11 @@ func TestSyslogFromConfigLine(t *testing.T) { limit: 100, pushPeriod: time.Second * 1, levels: logrus.AllLevels, + msgMaxSize: 1024 * 1024, }, }, { - line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=debug,pushPeriod=5m32s", + line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=debug,pushPeriod=5m32s,msgMaxSize=1231", res: lokiHook{ ctx: context.Background(), addr: "somewhere:1233", @@ -56,6 +57,7 @@ func TestSyslogFromConfigLine(t *testing.T) { pushPeriod: time.Minute*5 + time.Second*32, levels: []logrus.Level{logrus.DebugLevel, logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}, labels: [][2]string{{"something", "else"}, {"foo", "bar"}}, + msgMaxSize: 1231, }, }, { From d93b9f1381c001f05ff2e33eddbfce76b2b1cb2b Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 14:34:13 +0300 Subject: [PATCH 05/24] Also escape field values as `error` for example has quotes inside --- log/loki.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/log/loki.go b/log/loki.go index 7ef503d7f16..b4838d198f5 100644 --- a/log/loki.go +++ b/log/loki.go @@ -420,6 +420,7 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { // 10+ 9 for the amount of nanoseconds between 2001 and 2286 also it overflows in the year 2262 ;) var nanoseconds [19]byte write([]byte(`{"streams":[`)) + var b []byte for i, str := range strms.Streams { if i != 0 { write([]byte(`,`)) @@ -433,9 +434,12 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { f = true write([]byte(`"`)) write([]byte(k)) - write([]byte(`":"`)) - write([]byte(v)) - write([]byte(`"`)) + write([]byte(`":`)) + b, err = json.Marshal(v) + if err != nil { + return n, err + } + write(b) } write([]byte(`},"values":[`)) for j, v := range str.Values { @@ -455,7 +459,6 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { }, string(omitMsg)) } - var b []byte b, err = json.Marshal(v.msg) if err != nil { return n, err From 686ea5f3adfd64c795e5cf46697d60aa0dc29dfe Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 15:34:14 +0300 Subject: [PATCH 06/24] s/RawFormater/RawFormatter/g --- cmd/root.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 9235d328c3e..3b5cbc18d87 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -171,11 +171,11 @@ func fprintf(w io.Writer, format string, a ...interface{}) (n int) { return n } -// RawFormater it does nothing with the message just prints it -type RawFormater struct{} +// RawFormatter it does nothing with the message just prints it +type RawFormatter struct{} // Format renders a single log entry -func (f RawFormater) Format(entry *logrus.Entry) ([]byte, error) { +func (f RawFormatter) Format(entry *logrus.Entry) ([]byte, error) { return append([]byte(entry.Message), '\n'), nil } @@ -206,7 +206,7 @@ func setupLoggers(logger *logrus.Logger, logFmt string, logOutput string) error switch logFmt { case "raw": - logger.SetFormatter(&RawFormater{}) + logger.SetFormatter(&RawFormatter{}) logger.Debug("Logger format: RAW") case "json": logger.SetFormatter(&logrus.JSONFormatter{}) From 325ce6a810fe3b2ad06a3ad1ba5e737a1dc80db4 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 15:45:45 +0300 Subject: [PATCH 07/24] Add todo to rename --logformat --- cmd/root.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index 3b5cbc18d87..f5d8fb02eba 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -134,7 +134,7 @@ func rootCmdPersistentFlagSet() *pflag.FlagSet { flags.BoolVar(&noColor, "no-color", false, "disable colored output") flags.StringVar(&logOutput, "log-output", "stderr", "change output to which logs go, possible values are stderr,stdout,none,loki[=host:port]") - flags.StringVar(&logFmt, "logformat", "", "log output format") + flags.StringVar(&logFmt, "logformat", "", "log output format") // TODO rename to log-format and warn on old usage flags.StringVarP(&address, "address", "a", "localhost:6565", "address for the api server") // TODO: Fix... This default value needed, so both CLI flags and environment variables work From ee774b96757e2ebf22540cd95c0e57b0d7c24fef Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:03:59 +0300 Subject: [PATCH 08/24] Support all logrus log levels --- log/loki.go | 23 +++++++---------------- log/loki_test.go | 4 ++-- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/log/loki.go b/log/loki.go index b4838d198f5..a29d9bda153 100644 --- a/log/loki.go +++ b/log/loki.go @@ -130,23 +130,14 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { func getLevels(level string) ([]logrus.Level, error) { // TODO figure out if `tracing`,`fatal` and `panic` should be included - levels := []logrus.Level{} - switch level { - case "debug": - levels = append(levels, logrus.DebugLevel) - fallthrough - case "info": - levels = append(levels, logrus.InfoLevel) - fallthrough - case "warning": - levels = append(levels, logrus.WarnLevel) - fallthrough - case "error": - levels = append(levels, logrus.ErrorLevel) - default: - return nil, fmt.Errorf("unknown log level %s", level) + lvl, err := logrus.ParseLevel(level) + if err != nil { + return nil, fmt.Errorf("unknown log level %s", level) // specifically use a custom error } - return levels, nil + index := sort.Search(len(logrus.AllLevels), func(i int) bool { + return logrus.AllLevels[i] > lvl + }) + return logrus.AllLevels[:index], nil } func (h *lokiHook) start() { diff --git a/log/loki_test.go b/log/loki_test.go index 80915a9afef..09c5e7c7dc2 100644 --- a/log/loki_test.go +++ b/log/loki_test.go @@ -49,13 +49,13 @@ func TestSyslogFromConfigLine(t *testing.T) { }, }, { - line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=debug,pushPeriod=5m32s,msgMaxSize=1231", + line: "loki=somewhere:1233,label.something=else,label.foo=bar,limit=32,level=info,pushPeriod=5m32s,msgMaxSize=1231", res: lokiHook{ ctx: context.Background(), addr: "somewhere:1233", limit: 32, pushPeriod: time.Minute*5 + time.Second*32, - levels: []logrus.Level{logrus.DebugLevel, logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}, + levels: logrus.AllLevels[:5], labels: [][2]string{{"something", "else"}, {"foo", "bar"}}, msgMaxSize: 1231, }, From 5f39dc2ef5c070b74c6b7d0fd7959ee323af071d Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:05:13 +0300 Subject: [PATCH 09/24] drop the start method --- log/loki.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/log/loki.go b/log/loki.go index a29d9bda153..8b3839c93bf 100644 --- a/log/loki.go +++ b/log/loki.go @@ -59,6 +59,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { pushPeriod: time.Second * 1, ctx: ctx, msgMaxSize: 1024 * 1024, // 1mb + ch: make(chan *logrus.Entry, 1000), } if line == "loki" { return h, nil @@ -124,7 +125,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { h.client = &http.Client{Timeout: h.pushPeriod} - h.start() + go h.loop() return h, nil } @@ -140,11 +141,6 @@ func getLevels(level string) ([]logrus.Level, error) { return logrus.AllLevels[:index], nil } -func (h *lokiHook) start() { - h.ch = make(chan *logrus.Entry, 1000) - go h.loop() -} - // fill one of two equally sized slices with entries and then push it while filling the other one // TODO benchmark this //nolint:funlen From ad54b4063cff2e1b472e76663969086f8930aaee Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:33:30 +0300 Subject: [PATCH 10/24] remove todo --- log/loki.go | 1 - 1 file changed, 1 deletion(-) diff --git a/log/loki.go b/log/loki.go index 8b3839c93bf..f5c3a99ee23 100644 --- a/log/loki.go +++ b/log/loki.go @@ -130,7 +130,6 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { } func getLevels(level string) ([]logrus.Level, error) { - // TODO figure out if `tracing`,`fatal` and `panic` should be included lvl, err := logrus.ParseLevel(level) if err != nil { return nil, fmt.Errorf("unknown log level %s", level) // specifically use a custom error From 2d0ea1952c1082884f7952f12620f24dbf6cf77a Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:51:13 +0300 Subject: [PATCH 11/24] slighly better/faster cutOff calculation --- log/loki.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/log/loki.go b/log/loki.go index f5c3a99ee23..d62d5fcce2b 100644 --- a/log/loki.go +++ b/log/loki.go @@ -150,7 +150,7 @@ func (h *lokiHook) loop() { dropped int count int ticker = time.NewTicker(h.pushPeriod) - pushCh = make(chan chan time.Duration) + pushCh = make(chan chan int64) ) defer ticker.Stop() @@ -162,14 +162,14 @@ func (h *lokiHook) loop() { msgsToPush, msgs = msgs, msgsToPush oldCount, oldDropped := count, dropped count, dropped = 0, 0 - bufferTime := <-ch + cutOff := <-ch close(ch) // signal that more buffering can continue copy(oldLogs[len(oldLogs):len(oldLogs)+oldCount], msgsToPush[:oldCount]) oldLogs = oldLogs[:len(oldLogs)+oldCount] t := time.Now() - cutOffIndex := sortAndSplitMsgs(oldLogs, bufferTime) + cutOffIndex := sortAndSplitMsgs(oldLogs, cutOff) if cutOffIndex == 0 { continue } @@ -237,13 +237,13 @@ func (h *lokiHook) loop() { t: entry.Time.UnixNano(), } count++ - case <-ticker.C: - ch := make(chan time.Duration) + case t := <-ticker.C: + ch := make(chan int64) pushCh <- ch - ch <- h.pushPeriod / 2 + ch <- t.Add(-(h.pushPeriod / 2)).UnixNano() <-ch case <-h.ctx.Done(): - ch := make(chan time.Duration) + ch := make(chan int64) pushCh <- ch ch <- 0 <-ch @@ -252,7 +252,7 @@ func (h *lokiHook) loop() { } } -func sortAndSplitMsgs(msgs []tmpMsg, bufferTime time.Duration) int { +func sortAndSplitMsgs(msgs []tmpMsg, cutOff int64) int { if len(msgs) == 0 { return 0 } @@ -263,10 +263,6 @@ func sortAndSplitMsgs(msgs []tmpMsg, bufferTime time.Duration) int { return msgs[i].t < msgs[j].t }) - // We can technically cutoff during the addMsg phase, which will be even better if we sort after - // it as well ... maybe - cutOff := time.Now().Add(-bufferTime).UnixNano() // probably better to be configurable - cutOffIndex := sort.Search(len(msgs), func(i int) bool { return !(msgs[i].t < cutOff) }) From dd740d68941f86cfdd7f11eb4d1169db05efd436 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:53:06 +0300 Subject: [PATCH 12/24] check that loki.limit > 0 --- log/loki.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/log/loki.go b/log/loki.go index d62d5fcce2b..4570c2c069a 100644 --- a/log/loki.go +++ b/log/loki.go @@ -102,6 +102,9 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { if err != nil { return nil, fmt.Errorf("couldn't parse the loki limit as a number %w", err) } + if !(h.limit > 0) { + return nil, fmt.Errorf("loki limit needs to be a posstive number, is %d", h.limit) + } case "msgMaxSize": h.msgMaxSize, err = strconv.Atoi(value) if err != nil { From 94e9a69c282be7176a6fdf2956afed7c6d80bcce Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:54:19 +0300 Subject: [PATCH 13/24] check that loki.msgMaxSize > 0 --- log/loki.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/log/loki.go b/log/loki.go index 4570c2c069a..a4b2d662e12 100644 --- a/log/loki.go +++ b/log/loki.go @@ -110,6 +110,9 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { if err != nil { return nil, fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err) } + if !(h.msgMaxSize > 0) { + return nil, fmt.Errorf("loki msgMaxSize needs to be a posstive number, is %d", h.msgMaxSize) + } case "level": h.levels, err = getLevels(value) if err != nil { From e9830352e04aff91649a7b66cfd351dbeef92b2a Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:55:42 +0300 Subject: [PATCH 14/24] just context.Background() - timeout is set elsewhere --- log/loki.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/log/loki.go b/log/loki.go index a4b2d662e12..3aeda5e460d 100644 --- a/log/loki.go +++ b/log/loki.go @@ -301,12 +301,9 @@ func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *l } func (h *lokiHook) push(b bytes.Buffer) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - body := b.Bytes() - req, err := http.NewRequestWithContext(ctx, "GET", h.addr, &b) + req, err := http.NewRequestWithContext(context.Background(), "GET", h.addr, &b) if err != nil { return err } From 5ff349935bf16d6cd450c959789a6066f41e410b Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 16:57:09 +0300 Subject: [PATCH 15/24] fix dropped message --- log/loki.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log/loki.go b/log/loki.go index 3aeda5e460d..42985c6995f 100644 --- a/log/loki.go +++ b/log/loki.go @@ -291,7 +291,7 @@ func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *l msg := tmpMsg{ labels: labels, - msg: fmt.Sprintf("k6 dropped some packages because they were above the limit of %d/%s", + msg: fmt.Sprintf("k6 dropped some log messages because they were above the limit of %d/%s", h.limit, h.pushPeriod), t: msgs[cutOffIndex-1].t, } From ee51ebb639102f18ebeac20662e281dc08cab794 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 17:15:01 +0300 Subject: [PATCH 16/24] Just return an error from push instead of fmt.Println --- log/loki.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log/loki.go b/log/loki.go index 42985c6995f..a6b2deefda9 100644 --- a/log/loki.go +++ b/log/loki.go @@ -318,7 +318,7 @@ func (h *lokiHook) push(b bytes.Buffer) error { if res != nil { if res.StatusCode == 400 { r, _ := ioutil.ReadAll(res.Body) // maybe limit it to something like the first 1000 characters? - fmt.Println("Got 400 from loki: " + string(r)) + return fmt.Errorf("Got 400 from loki: " + string(r)) } _, _ = io.Copy(ioutil.Discard, res.Body) _ = res.Body.Close() From 9ab822ac5f52f65868c05690fd00885625da5cc9 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 17:28:25 +0300 Subject: [PATCH 17/24] fixes to LokiFromConfigLine --- log/loki.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/log/loki.go b/log/loki.go index a6b2deefda9..666185351e4 100644 --- a/log/loki.go +++ b/log/loki.go @@ -85,8 +85,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { return nil, fmt.Errorf("loki arguments should be in the form `address,key1=value1,key2=value2`, got %s", arg) } - key := paramParts[0] - value := paramParts[1] + key, value := paramParts[0], paramParts[1] var err error switch key { @@ -103,7 +102,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { return nil, fmt.Errorf("couldn't parse the loki limit as a number %w", err) } if !(h.limit > 0) { - return nil, fmt.Errorf("loki limit needs to be a posstive number, is %d", h.limit) + return nil, fmt.Errorf("loki limit needs to be a positive number, is %d", h.limit) } case "msgMaxSize": h.msgMaxSize, err = strconv.Atoi(value) @@ -111,7 +110,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { return nil, fmt.Errorf("couldn't parse the loki msgMaxSize as a number %w", err) } if !(h.msgMaxSize > 0) { - return nil, fmt.Errorf("loki msgMaxSize needs to be a posstive number, is %d", h.msgMaxSize) + return nil, fmt.Errorf("loki msgMaxSize needs to be a positive number, is %d", h.msgMaxSize) } case "level": h.levels, err = getLevels(value) From 862bd0633b338d785c1366a3535b47973a338cc5 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Fri, 31 Jul 2020 17:30:42 +0300 Subject: [PATCH 18/24] Log responses for status code >=400 from loki Co-authored-by: na-- --- log/loki.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/log/loki.go b/log/loki.go index 666185351e4..e86109aa07d 100644 --- a/log/loki.go +++ b/log/loki.go @@ -315,9 +315,9 @@ func (h *lokiHook) push(b bytes.Buffer) error { res, err := h.client.Do(req) if res != nil { - if res.StatusCode == 400 { + if res.StatusCode >= 400 { r, _ := ioutil.ReadAll(res.Body) // maybe limit it to something like the first 1000 characters? - return fmt.Errorf("Got 400 from loki: " + string(r)) + return fmt.Errorf("Got %d from loki: %s", res.StatusCode, string(r)) } _, _ = io.Copy(ioutil.Discard, res.Body) _ = res.Body.Close() From abd5d93a866eb6b5c4f64e6eb114f210c0d7538c Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Tue, 11 Aug 2020 15:52:01 +0300 Subject: [PATCH 19/24] Update cmd/root.go Co-authored-by: na-- --- cmd/root.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/root.go b/cmd/root.go index f5d8fb02eba..a8f4c42543b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -133,7 +133,7 @@ func rootCmdPersistentFlagSet() *pflag.FlagSet { flags.BoolVarP(&quiet, "quiet", "q", false, "disable progress updates") flags.BoolVar(&noColor, "no-color", false, "disable colored output") flags.StringVar(&logOutput, "log-output", "stderr", - "change output to which logs go, possible values are stderr,stdout,none,loki[=host:port]") + "change the output for k6 logs, possible values are stderr,stdout,none,loki[=host:port]") flags.StringVar(&logFmt, "logformat", "", "log output format") // TODO rename to log-format and warn on old usage flags.StringVarP(&address, "address", "a", "localhost:6565", "address for the api server") From 605b7b027cb5cd588dca2068f6554d0f22de6a50 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Tue, 11 Aug 2020 16:10:04 +0300 Subject: [PATCH 20/24] lint fixes --- cmd/root.go | 10 +++++++- log/loki.go | 64 +++++++++++++++++++++++++++++++----------------- log/loki_test.go | 2 +- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index a8f4c42543b..dff29281c8b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -191,10 +191,18 @@ func setupLoggers(logger *logrus.Logger, logFmt string, logOutput string) error case "none": logger.SetOutput(ioutil.Discard) default: + fallbackLogger := &logrus.Logger{ + Out: os.Stderr, + Formatter: new(logrus.TextFormatter), + Hooks: make(logrus.LevelHooks), + Level: logrus.InfoLevel, + } + if !strings.HasPrefix(logOutput, "loki") { return fmt.Errorf("unsupported log output `%s`", logOutput) } - hook, err := log.LokiFromConfigLine(context.Background(), logOutput) // TODO use some context that we can cancel + // TODO use some context that we can cancel + hook, err := log.LokiFromConfigLine(context.Background(), fallbackLogger, logOutput) if err != nil { return err } diff --git a/log/loki.go b/log/loki.go index e86109aa07d..993d562a26c 100644 --- a/log/loki.go +++ b/log/loki.go @@ -37,29 +37,32 @@ import ( ) type lokiHook struct { - addr string - labels [][2]string - ch chan *logrus.Entry - limit int - msgMaxSize int - levels []logrus.Level - pushPeriod time.Duration - client *http.Client - ctx context.Context - profile bool + addr string + labels [][2]string + ch chan *logrus.Entry + limit int + msgMaxSize int + levels []logrus.Level + pushPeriod time.Duration + client *http.Client + ctx context.Context + fallbackLogger logrus.FieldLogger + profile bool } // LokiFromConfigLine returns a new logrus.Hook that pushes logrus.Entrys to loki and is configured // through the provided line -func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { +//nolint:funlen +func LokiFromConfigLine(ctx context.Context, fallbackLogger logrus.FieldLogger, line string) (logrus.Hook, error) { h := &lokiHook{ - addr: "http://127.0.0.1:3100/loki/api/v1/push", - limit: 100, - levels: logrus.AllLevels, - pushPeriod: time.Second * 1, - ctx: ctx, - msgMaxSize: 1024 * 1024, // 1mb - ch: make(chan *logrus.Entry, 1000), + addr: "http://127.0.0.1:3100/loki/api/v1/push", + limit: 100, + levels: logrus.AllLevels, + pushPeriod: time.Second * 1, + ctx: ctx, + msgMaxSize: 1024 * 1024, // 1mb + ch: make(chan *logrus.Entry, 1000), + fallbackLogger: fallbackLogger, } if line == "loki" { return h, nil @@ -121,6 +124,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { if strings.HasPrefix(key, "label.") { labelKey := strings.TrimPrefix(key, "label.") h.labels = append(h.labels, [2]string{labelKey, value}) + continue } @@ -131,6 +135,7 @@ func LokiFromConfigLine(ctx context.Context, line string) (logrus.Hook, error) { h.client = &http.Client{Timeout: h.pushPeriod} go h.loop() + return h, nil } @@ -142,6 +147,7 @@ func getLevels(level string) ([]logrus.Level, error) { index := sort.Search(len(logrus.AllLevels), func(i int) bool { return logrus.AllLevels[i] > lvl }) + return logrus.AllLevels[:index], nil } @@ -183,6 +189,7 @@ func (h *lokiHook) loop() { strms := h.createPushMessage(oldLogs, cutOffIndex, oldDropped) if cutOffIndex > len(oldLogs) { oldLogs = oldLogs[:0] + continue } oldLogs = oldLogs[:copy(oldLogs, oldLogs[cutOffIndex:])] @@ -191,7 +198,8 @@ func (h *lokiHook) loop() { var b bytes.Buffer _, err := strms.WriteTo(&b) if err != nil { - fmt.Printf("Error while marshaling logs for loki %s\n", err) + h.fallbackLogger.WithError(err).Error("Error while marshaling logs for loki") + continue } size := b.Len() @@ -199,13 +207,15 @@ func (h *lokiHook) loop() { err = h.push(b) if err != nil { - fmt.Printf("Error while sending logs to loki %s\n", err) + h.fallbackLogger.WithError(err).Error("Error while sending logs to loki") + continue } t4 := time.Since(t) - t3 - t2 - t1 if h.profile { - fmt.Printf("sorting=%s, adding=%s marshalling=%s sending=%s count=%d final_size=%d\n", + h.fallbackLogger.Infof( + "sorting=%s, adding=%s marshalling=%s sending=%s count=%d final_size=%d\n", t1, t2, t3, t4, cutOffIndex, size) } } @@ -216,6 +226,7 @@ func (h *lokiHook) loop() { case entry := <-h.ch: if count == h.limit { dropped++ + continue } @@ -252,6 +263,7 @@ func (h *lokiHook) loop() { pushCh <- ch ch <- 0 <-ch + return } } @@ -271,6 +283,7 @@ func sortAndSplitMsgs(msgs []tmpMsg, cutOff int64) int { cutOffIndex := sort.Search(len(msgs), func(i int) bool { return !(msgs[i].t < cutOff) }) + return cutOffIndex } @@ -296,6 +309,7 @@ func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *l } strms.add(msg) } + return strms } @@ -317,11 +331,13 @@ func (h *lokiHook) push(b bytes.Buffer) error { if res != nil { if res.StatusCode >= 400 { r, _ := ioutil.ReadAll(res.Body) // maybe limit it to something like the first 1000 characters? - return fmt.Errorf("Got %d from loki: %s", res.StatusCode, string(r)) + + return fmt.Errorf("got %d from loki: %s", res.StatusCode, string(r)) } _, _ = io.Copy(ioutil.Discard, res.Body) _ = res.Body.Close() } + return err } @@ -334,6 +350,7 @@ func mapEqual(a, b map[string]string) bool { return false } } + return true } @@ -342,6 +359,7 @@ func (strms *lokiPushMessage) add(entry tmpMsg) { for _, strm := range strms.Streams { if mapEqual(strm.Stream, entry.labels) { foundStrm = strm + break } } @@ -364,6 +382,7 @@ type tmpMsg struct { func (h *lokiHook) Fire(entry *logrus.Entry) error { h.ch <- entry + return nil } @@ -479,5 +498,6 @@ func (l logEntry) MarshalJSON() ([]byte, error) { b = append(b, '"', ',', '"') b = append(b, l.msg...) b = append(b, '"', ']') + return b, nil } diff --git a/log/loki_test.go b/log/loki_test.go index 09c5e7c7dc2..ff41dbf59b2 100644 --- a/log/loki_test.go +++ b/log/loki_test.go @@ -87,7 +87,7 @@ func TestSyslogFromConfigLine(t *testing.T) { t.Run(test.line, func(t *testing.T) { // no parallel because this is way too fast and parallel will only slow it down - res, err := LokiFromConfigLine(context.Background(), test.line) + res, err := LokiFromConfigLine(context.Background(), nil, test.line) if test.err { require.Error(t, err) From f93c23a53ff015b055f340ce63ba052fbc7c2db1 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Tue, 11 Aug 2020 16:14:39 +0300 Subject: [PATCH 21/24] renames --- log/loki.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/log/loki.go b/log/loki.go index 993d562a26c..92fcd682bd1 100644 --- a/log/loki.go +++ b/log/loki.go @@ -186,7 +186,7 @@ func (h *lokiHook) loop() { } t1 := time.Since(t) - strms := h.createPushMessage(oldLogs, cutOffIndex, oldDropped) + pushMsg := h.createPushMessage(oldLogs, cutOffIndex, oldDropped) if cutOffIndex > len(oldLogs) { oldLogs = oldLogs[:0] @@ -196,7 +196,7 @@ func (h *lokiHook) loop() { t2 := time.Since(t) - t1 var b bytes.Buffer - _, err := strms.WriteTo(&b) + _, err := pushMsg.WriteTo(&b) if err != nil { h.fallbackLogger.WithError(err).Error("Error while marshaling logs for loki") @@ -288,10 +288,10 @@ func sortAndSplitMsgs(msgs []tmpMsg, cutOff int64) int { } func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *lokiPushMessage { - strms := new(lokiPushMessage) - strms.msgMaxSize = h.msgMaxSize + pushMsg := new(lokiPushMessage) + pushMsg.maxSize = h.msgMaxSize for _, msg := range msgs[:cutOffIndex] { - strms.add(msg) + pushMsg.add(msg) } if dropped != 0 { labels := make(map[string]string, 2+len(h.labels)) @@ -307,10 +307,10 @@ func (h *lokiHook) createPushMessage(msgs []tmpMsg, cutOffIndex, dropped int) *l h.limit, h.pushPeriod), t: msgs[cutOffIndex-1].t, } - strms.add(msg) + pushMsg.add(msg) } - return strms + return pushMsg } func (h *lokiHook) push(b bytes.Buffer) error { @@ -354,9 +354,9 @@ func mapEqual(a, b map[string]string) bool { return true } -func (strms *lokiPushMessage) add(entry tmpMsg) { +func (pushMsg *lokiPushMessage) add(entry tmpMsg) { var foundStrm *stream - for _, strm := range strms.Streams { + for _, strm := range pushMsg.Streams { if mapEqual(strm.Stream, entry.labels) { foundStrm = strm @@ -366,7 +366,7 @@ func (strms *lokiPushMessage) add(entry tmpMsg) { if foundStrm == nil { foundStrm = &stream{Stream: entry.labels} - strms.Streams = append(strms.Streams, foundStrm) + pushMsg.Streams = append(pushMsg.Streams, foundStrm) } foundStrm.Values = append(foundStrm.Values, logEntry{t: entry.t, msg: entry.msg}) @@ -407,11 +407,11 @@ func (h *lokiHook) Levels() []logrus.Level { } */ type lokiPushMessage struct { - Streams []*stream `json:"streams"` - msgMaxSize int + Streams []*stream `json:"streams"` + maxSize int } -func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { +func (pushMsg *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { var k int write := func(b []byte) { if err != nil { @@ -424,7 +424,7 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { var nanoseconds [19]byte write([]byte(`{"streams":[`)) var b []byte - for i, str := range strms.Streams { + for i, str := range pushMsg.Streams { if i != 0 { write([]byte(`,`)) } @@ -453,12 +453,12 @@ func (strms *lokiPushMessage) WriteTo(w io.Writer) (n int64, err error) { strconv.AppendInt(nanoseconds[:0], v.t, 10) write(nanoseconds[:]) write([]byte(`",`)) - if len([]rune(v.msg)) > strms.msgMaxSize { - difference := int64(len(v.msg) - strms.msgMaxSize) + if len([]rune(v.msg)) > pushMsg.maxSize { + difference := int64(len(v.msg) - pushMsg.maxSize) omitMsg := append(strconv.AppendInt([]byte("... omitting "), difference, 10), " characters ..."...) v.msg = strings.Join([]string{ - string([]rune(v.msg)[:strms.msgMaxSize/2]), - string([]rune(v.msg)[len([]rune(v.msg))-strms.msgMaxSize/2:]), + string([]rune(v.msg)[:pushMsg.maxSize/2]), + string([]rune(v.msg)[len([]rune(v.msg))-pushMsg.maxSize/2:]), }, string(omitMsg)) } From c322ca8c555ea626aa81aed513b6938aab6215b1 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Wed, 12 Aug 2020 16:13:52 +0300 Subject: [PATCH 22/24] Support for log-output configuration through K6_LOG_OUTPUT env variable --- cmd/root.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/root.go b/cmd/root.go index dff29281c8b..ac6539088de 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -82,6 +82,9 @@ var RootCmd = &cobra.Command{ SilenceErrors: true, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { logger := logrus.StandardLogger() // don't use the global one to begin with + if envLogOutput, ok := os.LookupEnv("K6_LOG_OUTPUT"); ok { + logOutput = envLogOutput + } err := setupLoggers(logger, logFmt, logOutput) if err != nil { return err From 8a302916911c0ed872f81329f6160d6fa6734e0d Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Wed, 12 Aug 2020 16:49:36 +0300 Subject: [PATCH 23/24] fixup! Support for log-output configuration through K6_LOG_OUTPUT env variable --- cmd/root.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index ac6539088de..3eab1e610d1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -82,8 +82,11 @@ var RootCmd = &cobra.Command{ SilenceErrors: true, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { logger := logrus.StandardLogger() // don't use the global one to begin with - if envLogOutput, ok := os.LookupEnv("K6_LOG_OUTPUT"); ok { - logOutput = envLogOutput + cliLogOutput := getNullString(cmd.Flags(), "log-output") + if !cliLogOutput.Valid { + if envLogOutput, ok := os.LookupEnv("K6_LOG_OUTPUT"); ok { + logOutput = envLogOutput + } } err := setupLoggers(logger, logFmt, logOutput) if err != nil { From 18ae5a0f2d2aa47acc7d1336654b176c5520d151 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Wed, 12 Aug 2020 17:57:02 +0300 Subject: [PATCH 24/24] Use cmf.Flags().Changed() instead of getNullString() --- cmd/root.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 3eab1e610d1..34e6664f307 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -82,8 +82,7 @@ var RootCmd = &cobra.Command{ SilenceErrors: true, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { logger := logrus.StandardLogger() // don't use the global one to begin with - cliLogOutput := getNullString(cmd.Flags(), "log-output") - if !cliLogOutput.Valid { + if !cmd.Flags().Changed("log-output") { if envLogOutput, ok := os.LookupEnv("K6_LOG_OUTPUT"); ok { logOutput = envLogOutput }