Skip to content

Commit

Permalink
sorter/file_backend: do not hold bytes buffer in writer
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Aug 24, 2021
1 parent 7ab7a31 commit f3c987f
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,9 @@ func (r *fileBackEndReader) resetAndClose() error {
}

type fileBackEndWriter struct {
backEnd *fileBackEnd
f *os.File
writer *bufio.Writer
rawBytesBuf []byte
backEnd *fileBackEnd
f *os.File
writer *bufio.Writer

bytesWritten int64
eventsWritten int64
Expand All @@ -345,12 +344,14 @@ func (w *fileBackEndWriter) writeFileHeader() error {

func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {
var err error
w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf)
// Note, do not hold the buffer in writer to avoid hogging memory.
var rawBytesBuf []byte
rawBytesBuf, err = w.backEnd.serde.marshal(event, rawBytesBuf)
if err != nil {
return errors.Trace(wrapIOError(err))
}

size := len(w.rawBytesBuf)
size := len(rawBytesBuf)
if size == 0 {
log.Panic("fileSorterBackEnd: serialized to empty byte array. Bug?")
}
Expand All @@ -368,7 +369,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error {
// short writes are possible with bufio
offset := 0
for offset < size {
n, err := w.writer.Write(w.rawBytesBuf[offset:])
n, err := w.writer.Write(rawBytesBuf[offset:])
if err != nil {
return errors.Trace(wrapIOError(err))
}
Expand Down

0 comments on commit f3c987f

Please sign in to comment.