From ee081f1cd116346b1f7f79d7d3ec87f2ea42fd73 Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Mon, 26 Aug 2019 22:16:55 +0700 Subject: [PATCH] lib/netext/httpext: add support decompression of stacked compressed response Close #1108 --- js/modules/k6/http/request_test.go | 11 +- lib/netext/httpext/compression.go | 203 +++++++++++++++++++++++++++++ lib/netext/httpext/request.go | 192 +-------------------------- lib/testutils/httpmultibin.go | 22 ++++ 4 files changed, 237 insertions(+), 191 deletions(-) create mode 100644 lib/netext/httpext/compression.go diff --git a/js/modules/k6/http/request_test.go b/js/modules/k6/http/request_test.go index 6d2183303b1..8687dd6a755 100644 --- a/js/modules/k6/http/request_test.go +++ b/js/modules/k6/http/request_test.go @@ -153,7 +153,7 @@ func TestRequestAndBatch(t *testing.T) { defer tb.Cleanup() sr := tb.Replacer.Replace - // Handple paths with custom logic + // Handle paths with custom logic tb.Mux.HandleFunc("/digest-auth/failure", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(2 * time.Second) })) @@ -353,6 +353,15 @@ func TestRequestAndBatch(t *testing.T) { `)) assert.NoError(t, err) }) + t.Run("zstd-br", func(t *testing.T) { + _, err := common.RunString(rt, sr(` + let res = http.get("HTTPSBIN_IP_URL/zstd-br"); + if (res.json()['compression'] != 'zstd, br') { + throw new Error("unexpected compression: " + res.json()['compression']) + } + `)) + assert.NoError(t, err) + }) }) t.Run("CompressionWithAcceptEncodingHeader", func(t *testing.T) { t.Run("gzip", func(t *testing.T) { diff --git a/lib/netext/httpext/compression.go b/lib/netext/httpext/compression.go new file mode 100644 index 00000000000..c4746529fe3 --- /dev/null +++ b/lib/netext/httpext/compression.go @@ -0,0 +1,203 @@ +package httpext + +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + + "github.com/andybalholm/brotli" + "github.com/klauspost/compress/zstd" + + "github.com/loadimpact/k6/lib" +) + +// CompressionType is used to specify what compression is to be used to compress the body of a +// request +// The conversion and validation methods are auto-generated with https://github.com/alvaroloes/enumer: +//nolint: lll +//go:generate enumer -type=CompressionType -transform=snake -trimprefix CompressionType -output compression_type_gen.go +type CompressionType uint + +const ( + // CompressionTypeGzip compresses through gzip + CompressionTypeGzip CompressionType = iota + // CompressionTypeDeflate compresses through flate + CompressionTypeDeflate + // CompressionTypeZstd compresses through zstd + CompressionTypeZstd + // CompressionTypeBr compresses through brotli + CompressionTypeBr + // TODO: add compress(lzw), maybe bzip2 and others listed at + // https://en.wikipedia.org/wiki/HTTP_compression#Content-Encoding_tokens +) + +func compressBody(algos []CompressionType, body io.ReadCloser) (*bytes.Buffer, string, error) { + var contentEncoding string + var prevBuf io.Reader = body + var buf *bytes.Buffer + for _, compressionType := range algos { + if buf != nil { + prevBuf = buf + } + buf = new(bytes.Buffer) + + if contentEncoding != "" { + contentEncoding += ", " + } + contentEncoding += compressionType.String() + var w io.WriteCloser + switch compressionType { + case CompressionTypeGzip: + w = gzip.NewWriter(buf) + case CompressionTypeDeflate: + w = zlib.NewWriter(buf) + case CompressionTypeZstd: + w, _ = zstd.NewWriter(buf) + case CompressionTypeBr: + w = brotli.NewWriter(buf) + default: + return nil, "", fmt.Errorf("unknown compressionType %s", compressionType) + } + // we don't close in defer because zlib will write it's checksum again if it closes twice :( + var _, err = io.Copy(w, prevBuf) + if err != nil { + _ = w.Close() + return nil, "", err + } + + if err = w.Close(); err != nil { + return nil, "", err + } + } + + return buf, contentEncoding, body.Close() +} + +//nolint:gochecknoglobals +var decompressionErrors = [...]error{ + zlib.ErrChecksum, zlib.ErrDictionary, zlib.ErrHeader, + gzip.ErrChecksum, gzip.ErrHeader, + // TODO: handle brotli errors - currently unexported + zstd.ErrReservedBlockType, zstd.ErrCompressedSizeTooBig, zstd.ErrBlockTooSmall, zstd.ErrMagicMismatch, + zstd.ErrWindowSizeExceeded, zstd.ErrWindowSizeTooSmall, zstd.ErrDecoderSizeExceeded, zstd.ErrUnknownDictionary, + zstd.ErrFrameSizeExceeded, zstd.ErrCRCMismatch, zstd.ErrDecoderClosed, +} + +func newDecompressionError(originalErr error) K6Error { + return NewK6Error( + responseDecompressionErrorCode, + fmt.Sprintf("error decompressing response body (%s)", originalErr.Error()), + originalErr, + ) +} + +func wrapDecompressionError(err error) error { + if err == nil { + return nil + } + + // TODO: something more optimized? for example, we won't get zstd errors if + // we don't use it... maybe the code that builds the decompression readers + // could also add an appropriate error-wrapper layer? + for _, decErr := range decompressionErrors { + if err == decErr { + return newDecompressionError(err) + } + } + if strings.HasPrefix(err.Error(), "brotli: ") { // TODO: submit an upstream patch and fix... + return newDecompressionError(err) + } + return err +} + +func readResponseBody( + state *lib.State, + respType ResponseType, + resp *http.Response, + respErr error, +) (interface{}, error) { + if resp == nil || respErr != nil { + return nil, respErr + } + + if respType == ResponseTypeNone { + _, err := io.Copy(ioutil.Discard, resp.Body) + _ = resp.Body.Close() + if err != nil { + respErr = err + } + return nil, respErr + } + + rc := &readCloser{resp.Body} + // Ensure that the entire response body is read and closed, e.g. in case of decoding errors + defer func(respBody io.ReadCloser) { + _, _ = io.Copy(ioutil.Discard, respBody) + _ = respBody.Close() + }(resp.Body) + + contentEncodings := strings.Split(resp.Header.Get("Content-Encoding"), ",") + // Transparently decompress the body if it's has a content-encoding we + // support. If not, simply return it as it is. + for i := len(contentEncodings) - 1; i >= 0; i-- { + contentEncoding := strings.TrimSpace(contentEncodings[i]) + if compression, err := CompressionTypeString(contentEncoding); err == nil { + var decoder io.Reader + var err error + switch compression { + case CompressionTypeDeflate: + decoder, err = zlib.NewReader(rc) + case CompressionTypeGzip: + decoder, err = gzip.NewReader(rc) + case CompressionTypeZstd: + decoder, err = zstd.NewReader(rc) + case CompressionTypeBr: + decoder = brotli.NewReader(rc) + default: + // We have not implemented a compression ... :( + err = fmt.Errorf( + "unsupported compression type %s - this is a bug in k6, please report it", + compression, + ) + } + if err != nil { + return nil, newDecompressionError(err) + } + rc = &readCloser{decoder} + } + } + buf := state.BPool.Get() + defer state.BPool.Put(buf) + buf.Reset() + _, err := io.Copy(buf, rc.Reader) + if err != nil { + respErr = wrapDecompressionError(err) + } + + err = rc.Close() + if err != nil && respErr == nil { // Don't overwrite previous errors + respErr = wrapDecompressionError(err) + } + + var result interface{} + // Binary or string + switch respType { + case ResponseTypeText: + result = buf.String() + case ResponseTypeBinary: + // Copy the data to a new slice before we return the buffer to the pool, + // because buf.Bytes() points to the underlying buffer byte slice. + binData := make([]byte, buf.Len()) + copy(binData, buf.Bytes()) + result = binData + default: + respErr = fmt.Errorf("unknown responseType %s", respType) + } + + return result, respErr +} diff --git a/lib/netext/httpext/request.go b/lib/netext/httpext/request.go index e0e5e7f8d24..dccb2690e3e 100644 --- a/lib/netext/httpext/request.go +++ b/lib/netext/httpext/request.go @@ -22,10 +22,7 @@ package httpext import ( "bytes" - "compress/gzip" - "compress/zlib" "context" - "fmt" "io" "io/ioutil" "net" @@ -36,9 +33,7 @@ import ( "strings" "time" - ntlmssp "github.com/Azure/go-ntlmssp" - "github.com/andybalholm/brotli" - "github.com/klauspost/compress/zstd" + "github.com/Azure/go-ntlmssp" "github.com/sirupsen/logrus" null "gopkg.in/guregu/null.v3" @@ -71,26 +66,6 @@ func (u URL) GetURL() *url.URL { return u.u } -// CompressionType is used to specify what compression is to be used to compress the body of a -// request -// The conversion and validation methods are auto-generated with https://github.com/alvaroloes/enumer: -//nolint: lll -//go:generate enumer -type=CompressionType -transform=snake -trimprefix CompressionType -output compression_type_gen.go -type CompressionType uint - -const ( - // CompressionTypeGzip compresses through gzip - CompressionTypeGzip CompressionType = iota - // CompressionTypeDeflate compresses through flate - CompressionTypeDeflate - // CompressionTypeZstd compresses through zstd - CompressionTypeZstd - // CompressionTypeBr compresses through brotli - CompressionTypeBr - // TODO: add compress(lzw), maybe bzip2 and others listed at - // https://en.wikipedia.org/wiki/HTTP_compression#Content-Encoding_tokens -) - // Request represent an http request type Request struct { Method string `json:"method"` @@ -146,170 +121,7 @@ func stdCookiesToHTTPRequestCookies(cookies []*http.Cookie) map[string][]*HTTPRe return result } -func compressBody(algos []CompressionType, body io.ReadCloser) (*bytes.Buffer, string, error) { - var contentEncoding string - var prevBuf io.Reader = body - var buf *bytes.Buffer - for _, compressionType := range algos { - if buf != nil { - prevBuf = buf - } - buf = new(bytes.Buffer) - - if contentEncoding != "" { - contentEncoding += ", " - } - contentEncoding += compressionType.String() - var w io.WriteCloser - switch compressionType { - case CompressionTypeGzip: - w = gzip.NewWriter(buf) - case CompressionTypeDeflate: - w = zlib.NewWriter(buf) - case CompressionTypeZstd: - w, _ = zstd.NewWriter(buf) - case CompressionTypeBr: - w = brotli.NewWriter(buf) - default: - return nil, "", fmt.Errorf("unknown compressionType %s", compressionType) - } - // we don't close in defer because zlib will write it's checksum again if it closes twice :( - var _, err = io.Copy(w, prevBuf) - if err != nil { - _ = w.Close() - return nil, "", err - } - - if err = w.Close(); err != nil { - return nil, "", err - } - } - - return buf, contentEncoding, body.Close() -} - -//nolint:gochecknoglobals -var decompressionErrors = [...]error{ - zlib.ErrChecksum, zlib.ErrDictionary, zlib.ErrHeader, - gzip.ErrChecksum, gzip.ErrHeader, - //TODO: handle brotli errors - currently unexported - zstd.ErrReservedBlockType, zstd.ErrCompressedSizeTooBig, zstd.ErrBlockTooSmall, zstd.ErrMagicMismatch, - zstd.ErrWindowSizeExceeded, zstd.ErrWindowSizeTooSmall, zstd.ErrDecoderSizeExceeded, zstd.ErrUnknownDictionary, - zstd.ErrFrameSizeExceeded, zstd.ErrCRCMismatch, zstd.ErrDecoderClosed, -} - -func newDecompressionError(originalErr error) K6Error { - return NewK6Error( - responseDecompressionErrorCode, - fmt.Sprintf("error decompressing response body (%s)", originalErr.Error()), - originalErr, - ) -} - -func wrapDecompressionError(err error) error { - if err == nil { - return nil - } - - // TODO: something more optimized? for example, we won't get zstd errors if - // we don't use it... maybe the code that builds the decompression readers - // could also add an appropriate error-wrapper layer? - for _, decErr := range decompressionErrors { - if err == decErr { - return newDecompressionError(err) - } - } - if strings.HasPrefix(err.Error(), "brotli: ") { //TODO: submit an upstream patch and fix... - return newDecompressionError(err) - } - return err -} - -func readResponseBody( - state *lib.State, respType ResponseType, resp *http.Response, respErr error, -) (interface{}, error) { - - if resp == nil || respErr != nil { - return nil, respErr - } - - if respType == ResponseTypeNone { - _, err := io.Copy(ioutil.Discard, resp.Body) - _ = resp.Body.Close() - if err != nil { - respErr = err - } - return nil, respErr - } - - rc := &readCloser{resp.Body} - // Ensure that the entire response body is read and closed, e.g. in case of decoding errors - defer func(respBody io.ReadCloser) { - _, _ = io.Copy(ioutil.Discard, respBody) - _ = respBody.Close() - }(resp.Body) - - // Transparently decompress the body if it's has a content-encoding we - // support. If not, simply return it as it is. - contentEncoding := strings.TrimSpace(resp.Header.Get("Content-Encoding")) - //TODO: support stacked compressions, e.g. `deflate, gzip` - if compression, err := CompressionTypeString(contentEncoding); err == nil { - var decoder io.Reader - var err error - switch compression { - case CompressionTypeDeflate: - decoder, err = zlib.NewReader(resp.Body) - case CompressionTypeGzip: - decoder, err = gzip.NewReader(resp.Body) - case CompressionTypeZstd: - decoder, err = zstd.NewReader(resp.Body) - case CompressionTypeBr: - decoder = brotli.NewReader(resp.Body) - default: - // We have not implemented a compression ... :( - err = fmt.Errorf( - "unsupported compression type %s - this is a bug in k6, please report it", - compression, - ) - } - if err != nil { - return nil, newDecompressionError(err) - } - rc = &readCloser{decoder} - } - - buf := state.BPool.Get() - defer state.BPool.Put(buf) - buf.Reset() - _, err := io.Copy(buf, rc.Reader) - if err != nil { - respErr = wrapDecompressionError(err) - } - - err = rc.Close() - if err != nil && respErr == nil { // Don't overwrite previous errors - respErr = wrapDecompressionError(err) - } - - var result interface{} - // Binary or string - switch respType { - case ResponseTypeText: - result = buf.String() - case ResponseTypeBinary: - // Copy the data to a new slice before we return the buffer to the pool, - // because buf.Bytes() points to the underlying buffer byte slice. - binData := make([]byte, buf.Len()) - copy(binData, buf.Bytes()) - result = binData - default: - respErr = fmt.Errorf("unknown responseType %s", respType) - } - - return result, respErr -} - -//TODO: move as a response method? or constructor? +// TODO: move as a response method? or constructor? func updateK6Response(k6Response *Response, finishedReq *finishedRequest) { k6Response.ErrorCode = int(finishedReq.errorCode) k6Response.Error = finishedReq.errorMsg diff --git a/lib/testutils/httpmultibin.go b/lib/testutils/httpmultibin.go index b1c0bc45158..d6e6b82f6a8 100644 --- a/lib/testutils/httpmultibin.go +++ b/lib/testutils/httpmultibin.go @@ -164,6 +164,27 @@ func getEncodedHandler(t testing.TB, compressionType httpext.CompressionType) ht }) } +func getZstdBrHandler(t testing.TB) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + encoding := "zstd, br" + rw.Header().Set("Content-Type", "application/json") + rw.Header().Add("Content-Encoding", encoding) + data := jsonBody{ + Header: req.Header, + Compression: encoding, + } + + bw := brotli.NewWriter(rw) + zw, _ := zstd.NewWriter(bw) + defer func() { + _ = zw.Close() + _ = bw.Close() + }() + + require.NoError(t, writeJSON(zw, data)) + }) +} + // NewHTTPMultiBin returns a fully configured and running HTTPMultiBin func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { // Create a http.ServeMux and set the httpbin handler as the default @@ -172,6 +193,7 @@ func NewHTTPMultiBin(t testing.TB) *HTTPMultiBin { mux.Handle("/ws-echo", getWebsocketEchoHandler(t)) mux.Handle("/ws-close", getWebsocketCloserHandler(t)) mux.Handle("/zstd", getEncodedHandler(t, httpext.CompressionTypeZstd)) + mux.Handle("/zstd-br", getZstdBrHandler(t)) mux.Handle("/", httpbin.New().Handler()) // Initialize the HTTP server and get its details