From c3da6bf6e57d8d8e6cc648def60138085781b28b Mon Sep 17 00:00:00 2001 From: jonjohnsonjr Date: Thu, 28 Jan 2021 08:51:43 -0800 Subject: [PATCH] Buffer the output of gzip.Writer to avoid stalling (#923) Use a bufio.Writer to buffer gzipped output while we are reading from the other end of an io.Pipe to allow gzip to keep compressing its input. A 64K buffer was chosen for its humor value. The default size of bufio.Writer was too small when testing against a local registry. Increasing beyond 64K didn't seem to have any noticeable effect. It might make sense to make this smaller, but I don't see a reason to worry about it ATM. --- pkg/v1/internal/gzip/zip.go | 24 ++++++++++++++++++++++-- pkg/v1/stream/layer.go | 17 ++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/v1/internal/gzip/zip.go b/pkg/v1/internal/gzip/zip.go index 71b43e35d..6f7bfd931 100644 --- a/pkg/v1/internal/gzip/zip.go +++ b/pkg/v1/internal/gzip/zip.go @@ -15,6 +15,7 @@ package gzip import ( + "bufio" "bytes" "compress/gzip" "io" @@ -38,11 +39,19 @@ func ReadCloser(r io.ReadCloser) io.ReadCloser { func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser { pr, pw := io.Pipe() + // For highly compressible layers, gzip.Writer will output a very small + // number of bytes per Write(). This is normally fine, but when pushing + // to a registry, we want to ensure that we're taking full advantage of + // the available bandwidth instead of sending tons of tiny writes over + // the wire. + // 64K ought to be small enough for anybody. + bw := bufio.NewWriterSize(pw, 2<<16) + // Returns err so we can pw.CloseWithError(err) go func() error { // TODO(go1.14): Just defer {pw,gw,r}.Close like you'd expect. // Context: https://golang.org/issue/24283 - gw, err := gzip.NewWriterLevel(pw, level) + gw, err := gzip.NewWriterLevel(bw, level) if err != nil { return pw.CloseWithError(err) } @@ -52,9 +61,20 @@ func ReadCloserLevel(r io.ReadCloser, level int) io.ReadCloser { defer gw.Close() return pw.CloseWithError(err) } + + // Close gzip writer to Flush it and write gzip trailers. + if err := gw.Close(); err != nil { + return pw.CloseWithError(err) + } + + // Flush bufio writer to ensure we write out everything. + if err := bw.Flush(); err != nil { + return pw.CloseWithError(err) + } + + // We dont' really care if these fail. defer pw.Close() defer r.Close() - defer gw.Close() return nil }() diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go index ff44f487b..e91f57ab3 100644 --- a/pkg/v1/stream/layer.go +++ b/pkg/v1/stream/layer.go @@ -15,6 +15,7 @@ package stream import ( + "bufio" "compress/gzip" "crypto/sha256" "encoding/hex" @@ -130,6 +131,7 @@ type compressedReader struct { h, zh hash.Hash // collects digests of compressed and uncompressed stream. pr io.Reader + bw *bufio.Writer count *countWriter l *Layer // stream.Layer to update upon Close. @@ -144,7 +146,14 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { // capture compressed digest, and a countWriter to capture compressed // size. pr, pw := io.Pipe() - zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), l.compression) + + // Write compressed bytes to be read by the pipe.Reader, hashed by zh, and counted by count. + mw := io.MultiWriter(pw, zh, count) + + // Buffer the output of the gzip writer so we don't have to wait on pr to keep writing. + // 64K ought to be small enough for anybody. + bw := bufio.NewWriterSize(mw, 2<<16) + zw, err := gzip.NewWriterLevel(bw, l.compression) if err != nil { return nil, err } @@ -152,6 +161,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) { cr := &compressedReader{ closer: newMultiCloser(zw, l.blob), pr: pr, + bw: bw, h: h, zh: zh, count: count, @@ -183,6 +193,11 @@ func (cr *compressedReader) Close() error { return err } + // Flush the buffer. + if err := cr.bw.Flush(); err != nil { + return err + } + diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil))) if err != nil { return err