Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/netext/httpext: add support decompression of stacked compressed response #1125

Merged
merged 1 commit into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion js/modules/k6/http/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestRequestAndBatch(t *testing.T) {
defer tb.Cleanup()
sr := tb.Replacer.Replace

// Handple paths with custom logic
// Handle paths with custom logic
tb.Mux.HandleFunc("/digest-auth/failure", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second)
}))
Expand Down Expand Up @@ -353,6 +353,15 @@ func TestRequestAndBatch(t *testing.T) {
`))
assert.NoError(t, err)
})
t.Run("zstd-br", func(t *testing.T) {
_, err := common.RunString(rt, sr(`
let res = http.get("HTTPSBIN_IP_URL/zstd-br");
if (res.json()['compression'] != 'zstd, br') {
throw new Error("unexpected compression: " + res.json()['compression'])
}
`))
assert.NoError(t, err)
})
})
t.Run("CompressionWithAcceptEncodingHeader", func(t *testing.T) {
t.Run("gzip", func(t *testing.T) {
Expand Down
203 changes: 203 additions & 0 deletions lib/netext/httpext/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package httpext

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"

"github.com/loadimpact/k6/lib"
)

// CompressionType is used to specify what compression is to be used to compress the body of a
// request
// The conversion and validation methods are auto-generated with https://github.com/alvaroloes/enumer:
//nolint: lll
//go:generate enumer -type=CompressionType -transform=snake -trimprefix CompressionType -output compression_type_gen.go
type CompressionType uint

const (
// CompressionTypeGzip compresses through gzip
CompressionTypeGzip CompressionType = iota
// CompressionTypeDeflate compresses through flate
CompressionTypeDeflate
// CompressionTypeZstd compresses through zstd
CompressionTypeZstd
// CompressionTypeBr compresses through brotli
CompressionTypeBr
// TODO: add compress(lzw), maybe bzip2 and others listed at
// https://en.wikipedia.org/wiki/HTTP_compression#Content-Encoding_tokens
)

func compressBody(algos []CompressionType, body io.ReadCloser) (*bytes.Buffer, string, error) {
var contentEncoding string
var prevBuf io.Reader = body
var buf *bytes.Buffer
for _, compressionType := range algos {
if buf != nil {
prevBuf = buf
}
buf = new(bytes.Buffer)

if contentEncoding != "" {
contentEncoding += ", "
}
contentEncoding += compressionType.String()
var w io.WriteCloser
switch compressionType {
case CompressionTypeGzip:
w = gzip.NewWriter(buf)
case CompressionTypeDeflate:
w = zlib.NewWriter(buf)
case CompressionTypeZstd:
w, _ = zstd.NewWriter(buf)
case CompressionTypeBr:
w = brotli.NewWriter(buf)
default:
return nil, "", fmt.Errorf("unknown compressionType %s", compressionType)
}
// we don't close in defer because zlib will write it's checksum again if it closes twice :(
var _, err = io.Copy(w, prevBuf)
if err != nil {
_ = w.Close()
return nil, "", err
}

if err = w.Close(); err != nil {
return nil, "", err
}
}

return buf, contentEncoding, body.Close()
}

//nolint:gochecknoglobals
var decompressionErrors = [...]error{
zlib.ErrChecksum, zlib.ErrDictionary, zlib.ErrHeader,
gzip.ErrChecksum, gzip.ErrHeader,
// TODO: handle brotli errors - currently unexported
zstd.ErrReservedBlockType, zstd.ErrCompressedSizeTooBig, zstd.ErrBlockTooSmall, zstd.ErrMagicMismatch,
zstd.ErrWindowSizeExceeded, zstd.ErrWindowSizeTooSmall, zstd.ErrDecoderSizeExceeded, zstd.ErrUnknownDictionary,
zstd.ErrFrameSizeExceeded, zstd.ErrCRCMismatch, zstd.ErrDecoderClosed,
}

func newDecompressionError(originalErr error) K6Error {
return NewK6Error(
responseDecompressionErrorCode,
fmt.Sprintf("error decompressing response body (%s)", originalErr.Error()),
originalErr,
)
}

func wrapDecompressionError(err error) error {
if err == nil {
return nil
}

// TODO: something more optimized? for example, we won't get zstd errors if
// we don't use it... maybe the code that builds the decompression readers
// could also add an appropriate error-wrapper layer?
for _, decErr := range decompressionErrors {
if err == decErr {
return newDecompressionError(err)
}
}
if strings.HasPrefix(err.Error(), "brotli: ") { // TODO: submit an upstream patch and fix...
return newDecompressionError(err)
}
return err
}

func readResponseBody(
state *lib.State,
respType ResponseType,
resp *http.Response,
respErr error,
) (interface{}, error) {
if resp == nil || respErr != nil {
return nil, respErr
}

if respType == ResponseTypeNone {
_, err := io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()
if err != nil {
respErr = err
}
return nil, respErr
}

rc := &readCloser{resp.Body}
// Ensure that the entire response body is read and closed, e.g. in case of decoding errors
defer func(respBody io.ReadCloser) {
_, _ = io.Copy(ioutil.Discard, respBody)
_ = respBody.Close()
}(resp.Body)

contentEncodings := strings.Split(resp.Header.Get("Content-Encoding"), ",")
// Transparently decompress the body if it's has a content-encoding we
// support. If not, simply return it as it is.
for i := len(contentEncodings) - 1; i >= 0; i-- {
contentEncoding := strings.TrimSpace(contentEncodings[i])
if compression, err := CompressionTypeString(contentEncoding); err == nil {
var decoder io.Reader
var err error
switch compression {
case CompressionTypeDeflate:
decoder, err = zlib.NewReader(rc)
case CompressionTypeGzip:
decoder, err = gzip.NewReader(rc)
case CompressionTypeZstd:
decoder, err = zstd.NewReader(rc)
case CompressionTypeBr:
decoder = brotli.NewReader(rc)
default:
// We have not implemented a compression ... :(
err = fmt.Errorf(
"unsupported compression type %s - this is a bug in k6, please report it",
compression,
)
}
if err != nil {
return nil, newDecompressionError(err)
}
rc = &readCloser{decoder}
}
}
buf := state.BPool.Get()
defer state.BPool.Put(buf)
buf.Reset()
_, err := io.Copy(buf, rc.Reader)
if err != nil {
respErr = wrapDecompressionError(err)
}

err = rc.Close()
if err != nil && respErr == nil { // Don't overwrite previous errors
respErr = wrapDecompressionError(err)
}

var result interface{}
// Binary or string
switch respType {
case ResponseTypeText:
result = buf.String()
case ResponseTypeBinary:
// Copy the data to a new slice before we return the buffer to the pool,
// because buf.Bytes() points to the underlying buffer byte slice.
binData := make([]byte, buf.Len())
copy(binData, buf.Bytes())
result = binData
default:
respErr = fmt.Errorf("unknown responseType %s", respType)
}

return result, respErr
}
Loading