Skip to content

Commit

Permalink
Buffer the output of gzip.Writer to avoid stalling (#923)
Browse files Browse the repository at this point in the history
Use a bufio.Writer to buffer gzipped output while we are reading from
the other end of an io.Pipe to allow gzip to keep compressing its input.

A 64K buffer was chosen for its humor value. The default size of
bufio.Writer was too small when testing against a local registry.
Increasing beyond 64K didn't seem to have any noticeable effect. It
might make sense to make this smaller, but I don't see a reason to worry
about it ATM.
  • Loading branch information
jonjohnsonjr authored Jan 28, 2021
1 parent 76199f1 commit c3da6bf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
24 changes: 22 additions & 2 deletions pkg/v1/internal/gzip/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package gzip

import (
"bufio"
"bytes"
"compress/gzip"
"io"
Expand All @@ -38,11 +39,19 @@ func ReadCloser(r io.ReadCloser) io.ReadCloser {
func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
pr, pw := io.Pipe()

// For highly compressible layers, gzip.Writer will output a very small
// number of bytes per Write(). This is normally fine, but when pushing
// to a registry, we want to ensure that we're taking full advantage of
// the available bandwidth instead of sending tons of tiny writes over
// the wire.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(pw, 2<<16)

// Returns err so we can pw.CloseWithError(err)
go func() error {
// TODO(go1.14): Just defer {pw,gw,r}.Close like you'd expect.
// Context: https://golang.org/issue/24283
gw, err := gzip.NewWriterLevel(pw, level)
gw, err := gzip.NewWriterLevel(bw, level)
if err != nil {
return pw.CloseWithError(err)
}
Expand All @@ -52,9 +61,20 @@ func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser {
defer gw.Close()
return pw.CloseWithError(err)
}

// Close gzip writer to Flush it and write gzip trailers.
if err := gw.Close(); err != nil {
return pw.CloseWithError(err)
}

// Flush bufio writer to ensure we write out everything.
if err := bw.Flush(); err != nil {
return pw.CloseWithError(err)
}

// We dont' really care if these fail.
defer pw.Close()
defer r.Close()
defer gw.Close()

return nil
}()
Expand Down
17 changes: 16 additions & 1 deletion pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package stream

import (
"bufio"
"compress/gzip"
"crypto/sha256"
"encoding/hex"
Expand Down Expand Up @@ -130,6 +131,7 @@ type compressedReader struct {

h, zh hash.Hash // collects digests of compressed and uncompressed stream.
pr io.Reader
bw *bufio.Writer
count *countWriter

l *Layer // stream.Layer to update upon Close.
Expand All @@ -144,14 +146,22 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
// capture compressed digest, and a countWriter to capture compressed
// size.
pr, pw := io.Pipe()
zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), l.compression)

// Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count.
mw := io.MultiWriter(pw, zh, count)

// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(mw, 2<<16)
zw, err := gzip.NewWriterLevel(bw, l.compression)
if err != nil {
return nil, err
}

cr := &compressedReader{
closer: newMultiCloser(zw, l.blob),
pr: pr,
bw: bw,
h: h,
zh: zh,
count: count,
Expand Down Expand Up @@ -183,6 +193,11 @@ func (cr *compressedReader) Close() error {
return err
}

// Flush the buffer.
if err := cr.bw.Flush(); err != nil {
return err
}

diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil)))
if err != nil {
return err
Expand Down

0 comments on commit c3da6bf

Please sign in to comment.