Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zapcore: Add Buffered Writer #961

Merged
merged 74 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
c1e68d7
add buffer sync
sysulq Feb 8, 2020
df42521
support config bufferSize and flushInterval, improve logic
sysulq Feb 11, 2020
7f5e097
improve
sysulq Feb 13, 2020
7eec047
update comment
sysulq Feb 13, 2020
3c8ec7b
WriterSyncer support Close method
sysulq Feb 25, 2020
046355f
improve
sysulq Feb 25, 2020
afe872f
fix spell
sysulq Feb 25, 2020
cce50b1
improve cancel logic
sysulq Mar 3, 2020
ef808a2
improve
sysulq Mar 3, 2020
3af046a
remove close
sysulq Mar 3, 2020
e0a440d
rename cancel to close and keep syncing
sysulq May 29, 2020
e571e25
fix lint
sysulq Jun 1, 2020
4a25c9f
100% test coverage
sysulq Jun 1, 2020
99f4ea8
improve comment
sysulq Jun 1, 2020
223dd97
improve comment
sysulq Jun 1, 2020
dd3698e
fix test error
sysulq Jun 1, 2020
f3079de
fix race condition in test case
sysulq Jun 1, 2020
2c09dc4
100% test coverage
sysulq Jun 1, 2020
f0f2a28
add loop and fix typo
sysulq Jun 5, 2020
fb6efc3
Update zapcore/write_syncer.go
sysulq Jun 9, 2020
0977e02
improve test
sysulq Jun 9, 2020
65775e1
validate loop logic in test case
sysulq Jun 9, 2020
470b7fa
group default config
sysulq Jun 9, 2020
160e84d
improve close logic
sysulq Jun 23, 2020
ee04403
Update zapcore/write_syncer.go
sysulq Oct 9, 2020
80433c2
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
9bda819
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
dc370da
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
2ecf1f8
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
a522baf
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
4273b8e
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
d884534
improve close logic and use goleak
sysulq Oct 13, 2020
0392f30
Drop `type CloseFunc`
abhinav Nov 2, 2020
ddace4d
doc: Rewrite Buffer documentation
abhinav Nov 2, 2020
e96c0ee
Prefix default{BufferSize, FlushInterval} with _
abhinav Nov 2, 2020
570d2ff
Buffer/close: return a bound method
abhinav Nov 2, 2020
d1e244a
Buffer/close: Close the channel instead of posting
abhinav Nov 2, 2020
83f6331
_default{BufferSize, FlushInterval}: docs
abhinav Nov 2, 2020
ba0b65e
buffer: Move loop into a method
abhinav Nov 2, 2020
b8a0b28
buffer/close: stop the ticker
abhinav Nov 2, 2020
1c5daa9
Update zapcore/write_syncer_test.go
sysulq Nov 6, 2020
f74c558
improve code style
sysulq Nov 6, 2020
6f88240
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
963142b
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
f753926
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
1db4692
add comment
sysulq Nov 6, 2020
545d164
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
c777047
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
1b0fcfd
update
sysulq Nov 6, 2020
02c9d3f
Remove Lock/Unlock from tests
prashantv Jan 5, 2021
4804188
remove double buffer check
sysulq Jan 6, 2021
4ca46e9
set timer to zero
sysulq Jan 6, 2021
b1a95ff
remove errorWriter
sysulq Jan 6, 2021
6d58a7d
add SyncBuffer
sysulq Jan 6, 2021
2312b39
Add buffered write syncer
moisesvega May 25, 2021
7c32a14
Set default Clock zapcore/buffered_write_syncer.go
moisesvega Jun 1, 2021
ffba68b
Use require instead assert zapcore/buffered_write_syncer_bench_test.go
moisesvega Jun 1, 2021
a3fbb8e
Update Close() comment, use filepath to create tmpDir and mutate Cloc…
moisesvega Jun 1, 2021
39f7316
Use ioutil for creating tmp file
moisesvega Jun 1, 2021
10b45db
Sync underlying writeSyncer
moisesvega Jun 4, 2021
0962b0f
Update default documentation and don't double lock
moisesvega Jun 4, 2021
7dedbc0
Add test case with lockedWriteSyncer
moisesvega Jun 4, 2021
f8f3a3d
Use empty assert.Empty
moisesvega Jun 8, 2021
acfd294
Use zapcore.Lock for BufferedWriteSyncer tests
moisesvega Jun 8, 2021
f32b796
Add asserts on close/remove in benchmark buffered write and add test …
moisesvega Jun 8, 2021
17cdba5
Take a pass over the BufferedWriteSyncer docs
abhinav Jun 8, 2021
97ae2a6
Move consts to the top of the file
abhinav Jun 8, 2021
b58819e
Rename Close to Stop
abhinav Jun 8, 2021
afe558a
Stop: Wait until flushLoop stops
abhinav Jun 8, 2021
49444fc
fixup! Rename Close to Stop
abhinav Jun 8, 2021
43d9b54
Stop/Sync: handle uninitialized calls
abhinav Jun 8, 2021
7c7fb4d
Rename WriteSyncer field to WS
abhinav Jun 8, 2021
f30d68b
Overwrite WS instead of maintaining to WriteSyncer fields
abhinav Jun 8, 2021
ac5b8ad
Drop Lock unpacking, check initialized, fix docs
abhinav Jun 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -23,17 +25,23 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
187 changes: 187 additions & 0 deletions zapcore/buffered_write_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bufio"
"sync"
"time"

"go.uber.org/multierr"
)

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

// A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before
// flushing them to a wrapped WriteSyncer after reaching some limit, or at some
// fixed interval--whichever comes first.
//
// BufferedWriteSyncer is safe for concurrent use. You don't need to use
// zapcore.Lock for WriteSyncers with BufferedWriteSyncer.
type BufferedWriteSyncer struct {
// WS is the WriteSyncer around which BufferedWriteSyncer will buffer
// writes. The provided WriteSyncer must not be used directly once
// wrapped in a BufferedWriteSyncer.
//
// This field is required.
WS WriteSyncer

// Size specifies the maximum amount of data the writer will buffered
// before flushing.
//
// Defaults to 256 kB if unspecified.
Size int

// FlushInterval specifies how often the writer should flush data if
// there have been no writes.
//
// Defaults to 30 seconds if unspecified.
FlushInterval time.Duration

// Clock, if specified, provides control of the source of time for the
// writer.
//
// Defaults to the system clock.
Clock Clock

// unexported fields for state
mu sync.Mutex
initialized bool // whether initialize() has run
writer *bufio.Writer
ticker *time.Ticker
stop chan struct{} // closed when flushLoop should stop
done chan struct{} // closed when flushLoop has stopped
}

func (s *BufferedWriteSyncer) initialize() {
size := s.Size
if size == 0 {
size = _defaultBufferSize
}

flushInterval := s.FlushInterval
if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

if s.Clock == nil {
s.Clock = DefaultClock
}
s.ticker = s.Clock.NewTicker(flushInterval)

// Unpack to the underlying WriteSyncer if we have a lockedWriteSyncer
// to avoid double-locking. Note that there's a risk here if the user
// tries to use the Lock-ed WriteSyncer directly in addition to using
// it with BufferedWriteSyncer, so we declare that the wrapped
// WriteSyncer is only ours to use once wrapped.
if w, ok := s.WS.(*lockedWriteSyncer); ok {
s.WS = w.ws
}
abhinav marked this conversation as resolved.
Show resolved Hide resolved
s.writer = bufio.NewWriterSize(s.WS, size)

s.stop = make(chan struct{})
s.done = make(chan struct{})
s.initialized = true
go s.flushLoop()
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
s.initialize()
}

// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
if err := s.writer.Flush(); err != nil {
return 0, err
}
}

return s.writer.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *BufferedWriteSyncer) Sync() error {
s.mu.Lock()
defer s.mu.Unlock()

var err error
if w := s.writer; w != nil {
// w is nil if we haven't yet been initialized.
err = w.Flush()
}
abhinav marked this conversation as resolved.
Show resolved Hide resolved

return multierr.Append(err, s.WS.Sync())
}

// flushLoop flushes the buffer at the configured interval until Stop is
// called.
func (s *BufferedWriteSyncer) flushLoop() {
defer close(s.done)

for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
case <-s.stop:
return
}
}
}

// Stop closes the buffer, cleans up background goroutines, and flushes
// remaining unwritten data.
//
// This must be called exactly once per BufferedWriteSyncer.
abhinav marked this conversation as resolved.
Show resolved Hide resolved
func (s *BufferedWriteSyncer) Stop() error {
// Critical section.
func() {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
return
}

s.ticker.Stop()
close(s.stop) // tell flushLoop to stop
<-s.done // and wait until it has
}()

return s.Sync()
}
53 changes: 53 additions & 0 deletions zapcore/buffered_write_syncer_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func BenchmarkBufferedWriteSyncer(b *testing.B) {
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
require.NoError(b, err)

defer func() {
assert.NoError(b, file.Close())
assert.NoError(b, os.Remove(file.Name()))
}()

w := &BufferedWriteSyncer{
WS: AddSync(file),
}
defer w.Stop()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
Loading