Skip to content

Commit

Permalink
data race: avoid unprotected access to sb.file
Browse files Browse the repository at this point in the history
79575d8 (included in klog v1.130.0) added a small data race with regards to
accessing the syncBuffer.file field when calling Sync.

We can eliminate the entire redirectBuffer interface that led to this mistake
and instead work directly with syncBuffer for flushing and syncing.

To avoid allocating a slice inside flushAll, a fixed-sized struct is
returned. A benchmark confirms that flushAll+syncAll can be called without
allocations.
  • Loading branch information
pohly committed Jun 18, 2024
1 parent 16c7d26 commit 2327d4c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 61 deletions.
77 changes: 25 additions & 52 deletions klog.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,6 @@ func (t *traceLocation) Set(value string) error {
return nil
}

// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
Flush() error
Sync() error
io.Writer
}

var logging loggingT
var commandLine flag.FlagSet

Expand Down Expand Up @@ -486,7 +479,7 @@ type settings struct {
// Access to all of the following fields must be protected via a mutex.

// file holds writer for each of the log types.
file [severity.NumSeverity]flushSyncWriter
file [severity.NumSeverity]io.Writer
// flushInterval is the interval for periodic flushing. If zero,
// the global default will be used.
flushInterval time.Duration
Expand Down Expand Up @@ -831,32 +824,12 @@ func (l *loggingT) printS(err error, s severity.Severity, depth int, msg string,
buffer.PutBuffer(b)
}

// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {
w io.Writer
}

func (rb *redirectBuffer) Sync() error {
return nil
}

func (rb *redirectBuffer) Flush() error {
return nil
}

func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {
return rb.w.Write(bytes)
}

// SetOutput sets the output destination for all severities
func SetOutput(w io.Writer) {
logging.mu.Lock()
defer logging.mu.Unlock()
for s := severity.FatalLog; s >= severity.InfoLog; s-- {
rb := &redirectBuffer{
w: w,
}
logging.file[s] = rb
logging.file[s] = w
}
}

Expand All @@ -868,10 +841,7 @@ func SetOutputBySeverity(name string, w io.Writer) {
if !ok {
panic(fmt.Sprintf("SetOutputBySeverity(%q): unrecognized severity name", name))
}
rb := &redirectBuffer{
w: w,
}
logging.file[sev] = rb
logging.file[sev] = w
}

// LogToStderr sets whether to log exclusively to stderr, bypassing outputs
Expand Down Expand Up @@ -1011,8 +981,8 @@ func (l *loggingT) exit(err error) {
logExitFunc(err)
return
}
files := l.flushAll()
l.syncAll(files)
needToSync := l.flushAll()
l.syncAll(needToSync)
OsExit(2)
}

Expand All @@ -1029,10 +999,6 @@ type syncBuffer struct {
maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}

func (sb *syncBuffer) Sync() error {
return sb.file.Sync()
}

// CalculateMaxSize returns the real max size in bytes after considering the default max size and the flag options.
func CalculateMaxSize() uint64 {
if logging.logFile != "" {
Expand Down Expand Up @@ -1224,37 +1190,44 @@ func StartFlushDaemon(interval time.Duration) {
// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
files := l.flushAll()
needToSync := l.flushAll()
l.mu.Unlock()
// Some environments are slow when syncing and holding the lock might cause contention.
l.syncAll(files)
l.syncAll(needToSync)
}

// flushAll flushes all the logs
// l.mu is held.
func (l *loggingT) flushAll() []flushSyncWriter {
files := make([]flushSyncWriter, 0, severity.NumSeverity)
//
// The result is the number of files which need to be synced and the pointers to them.
func (l *loggingT) flushAll() fileArray {
var needToSync fileArray

// Flush from fatal down, in case there's trouble flushing.
for s := severity.FatalLog; s >= severity.InfoLog; s-- {
file := l.file[s]
if file != nil {
_ = file.Flush() // ignore error
if sb, ok := file.(*syncBuffer); ok && sb.file != nil {
_ = sb.Flush() // ignore error
needToSync.files[needToSync.num] = sb.file
needToSync.num++
}
files = append(files, file)
}
if logging.loggerOptions.flush != nil {
logging.loggerOptions.flush()
}
return files
return needToSync
}

type fileArray struct {
num int
files [severity.NumSeverity]*os.File
}

// syncAll attempts to "sync" their data to disk.
func (l *loggingT) syncAll(files []flushSyncWriter) {
func (l *loggingT) syncAll(needToSync fileArray) {
// Flush from fatal down, in case there's trouble flushing.
for _, file := range files {
if file != nil {
_ = file.Sync() // ignore error
}
for i := 0; i < needToSync.num; i++ {
_ = needToSync.files[i].Sync() // ignore error
}
}

Expand Down
65 changes: 56 additions & 9 deletions klog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
stdLog "log"
"os"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (f *flushBuffer) Sync() error {
}

// swap sets the log writers and returns the old array.
func (l *loggingT) swap(writers [severity.NumSeverity]flushSyncWriter) (old [severity.NumSeverity]flushSyncWriter) {
func (l *loggingT) swap(writers [severity.NumSeverity]io.Writer) (old [severity.NumSeverity]io.Writer) {
l.mu.Lock()
defer l.mu.Unlock()
old = l.file
Expand All @@ -82,8 +83,8 @@ func (l *loggingT) swap(writers [severity.NumSeverity]flushSyncWriter) (old [sev
}

// newBuffers sets the log writers to all new byte buffers and returns the old array.
func (l *loggingT) newBuffers() [severity.NumSeverity]flushSyncWriter {
return l.swap([severity.NumSeverity]flushSyncWriter{new(flushBuffer), new(flushBuffer), new(flushBuffer), new(flushBuffer)})
func (l *loggingT) newBuffers() [severity.NumSeverity]io.Writer {
return l.swap([severity.NumSeverity]io.Writer{new(flushBuffer), new(flushBuffer), new(flushBuffer), new(flushBuffer)})
}

// contents returns the specified log value as a string.
Expand Down Expand Up @@ -540,14 +541,17 @@ func TestOpenAppendOnStart(t *testing.T) {

// Logging creates the file
Info(x)
_, ok := logging.file[severity.InfoLog].(*syncBuffer)
sb, ok := logging.file[severity.InfoLog].(*syncBuffer)
if !ok {
t.Fatal("info wasn't created")
}

// ensure we wrote what we expected
files := logging.flushAll()
logging.syncAll(files)
needToSync := logging.flushAll()
if needToSync.num != 1 || needToSync.files[0] != sb.file {
t.Errorf("Should have received exactly the file from severity.InfoLog for syncing, got instead: %+v", needToSync)
}
logging.syncAll(needToSync)
b, err := ioutil.ReadFile(logging.logFile)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -811,15 +815,58 @@ func BenchmarkLogs(b *testing.B) {
Severity: severity.FatalLog,
}
logging.logFile = testFile.Name()
logging.swap([severity.NumSeverity]flushSyncWriter{nil, nil, nil, nil})
logging.swap([severity.NumSeverity]io.Writer{nil, nil, nil, nil})

for i := 0; i < b.N; i++ {
Error("error")
Warning("warning")
Info("info")
}
files := logging.flushAll()
logging.syncAll(files)
needToSync := logging.flushAll()
sb, ok := logging.file[severity.InfoLog].(*syncBuffer)
if !ok {
b.Fatal("info wasn't created")
}
if needToSync.num != 1 || needToSync.files[0] != sb.file {
b.Fatalf("Should have received exactly the file from severity.InfoLog for syncing, got instead: %+v", needToSync)
}
logging.syncAll(needToSync)
}

func BenchmarkFlush(b *testing.B) {
defer CaptureState().Restore()
setFlags()
defer logging.swap(logging.newBuffers())

testFile, err := ioutil.TempFile("", "test.log")
if err != nil {
b.Fatal("unable to create temporary file")
}
defer os.Remove(testFile.Name())

require.NoError(b, logging.verbosity.Set("0"))
logging.toStderr = false
logging.alsoToStderr = false
logging.stderrThreshold = severityValue{
Severity: severity.FatalLog,
}
logging.logFile = testFile.Name()
logging.swap([severity.NumSeverity]io.Writer{nil, nil, nil, nil})

// Create output file.
Info("info")
needToSync := logging.flushAll()

if needToSync.num != 1 {
b.Fatalf("expected exactly one file to sync, got: %+v", needToSync)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
needToSync := logging.flushAll()
logging.syncAll(needToSync)
}
}

// Test the logic on checking log size limitation.
Expand Down

0 comments on commit 2327d4c

Please sign in to comment.