Skip to content

Commit

Permalink
add support for additional decoders
Browse files Browse the repository at this point in the history
This provides the ability to add support for compression types which are
not supported in core, to be supported by individual components.

Signed-off-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
Alex Boten committed Jun 27, 2023
1 parent 8278824 commit 57dd104
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 36 deletions.
85 changes: 51 additions & 34 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,68 @@ func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
type decompressor struct {
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
base http.Handler
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
}

// httpContentDecompressor offloads the task of handling compressed HTTP requests
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)) http.Handler {
func httpContentDecompressor(h http.Handler, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
errHandler = eh
}
return &decompressor{

d := &decompressor{
errHandler: errHandler,
base: h,
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(body io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
},
}
d.decoders["deflate"] = d.decoders["zlib"]

for key, dec := range decoders {
d.decoders[key] = dec
}

return d
}

func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
newBody, err := newBodyReader(r)
newBody, err := d.newBodyReader(r)
if err != nil {
d.errHandler(w, r, err.Error(), http.StatusBadRequest)
return
Expand All @@ -104,39 +147,13 @@ func (d *decompressor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
d.base.ServeHTTP(w, r)
}

func newBodyReader(r *http.Request) (io.ReadCloser, error) {
func (d *decompressor) newBodyReader(r *http.Request) (io.ReadCloser, error) {
encoding := r.Header.Get(headerContentEncoding)
switch encoding {
case string(configcompression.Gzip):
gr, err := gzip.NewReader(r.Body)
if err != nil {
return nil, err
}
return gr, nil
case string(configcompression.Deflate), string(configcompression.Zlib):
zr, err := zlib.NewReader(r.Body)
if err != nil {
return nil, err
}
return zr, nil
case "zstd":
zr, err := zstd.NewReader(
r.Body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
case "":
// Not a compressed payload. Nothing to do.
return nil, nil
decoder, ok := d.decoders[encoding]
if !ok {
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
}
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
return decoder(r.Body)
}

// defaultErrorHandler writes the error message in plain text.
Expand Down
52 changes: 51 additions & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/golang/snappy"
Expand Down Expand Up @@ -114,8 +115,44 @@ func TestHTTPClientCompression(t *testing.T) {
}
}

func TestHTTPCustomDecompression(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}

require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, "decompressed body", string(body))
w.WriteHeader(http.StatusOK)
})
decoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"custom-encoding": func(body io.ReadCloser) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader("decompressed body")), nil
},
}
srv := httptest.NewServer(httpContentDecompressor(handler, defaultErrorHandler, decoders))

t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, bytes.NewBuffer([]byte("123deompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "custom-encoding")

client := http.Client{}
res, err := client.Do(req)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, res.StatusCode, "test handler returned unexpected status code ")
_, err = io.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
}

func TestHTTPContentDecompressionHandler(t *testing.T) {
testBody := []byte("uncompressed_text")
noDecoders := map[string]func(io.ReadCloser) (io.ReadCloser, error){}
tests := []struct {
name string
encoding string
Expand All @@ -129,6 +166,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: bytes.NewBuffer(testBody),
respCode: http.StatusOK,
},
{
name: "ValidDeflate",
encoding: "deflate",
reqBody: compressZlib(t, testBody),
respCode: 200,
},
{
name: "ValidGzip",
encoding: "gzip",
Expand All @@ -147,6 +190,13 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZstd(t, testBody),
respCode: http.StatusOK,
},
{
name: "InvalidDeflate",
encoding: "deflate",
reqBody: bytes.NewBuffer(testBody),
respCode: 400,
respBody: "zlib: invalid header\n",
},
{
name: "InvalidGzip",
encoding: "gzip",
Expand Down Expand Up @@ -189,7 +239,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(http.StatusOK)
}), defaultErrorHandler))
}), defaultErrorHandler, noDecoders))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
Expand Down
12 changes: 11 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp"
import (
"crypto/tls"
"errors"
"io"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -248,6 +249,7 @@ func (hss *HTTPServerSettings) ToListener() (net.Listener, error) {
// returned by HTTPServerSettings.ToServer().
type toServerOptions struct {
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
}

// ToServerOption is an option to change the behavior of the HTTP server
Expand All @@ -262,6 +264,14 @@ func WithErrorHandler(e func(w http.ResponseWriter, r *http.Request, errorMsg st
}
}

// WithDecoder provides support for additional decoders to be configured
// by the caller.
func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)) ToServerOption {
return func(opts *toServerOptions) {
opts.decoders[key] = dec
}
}

// ToServer creates an http.Server from settings object.
func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint)
Expand All @@ -271,7 +281,7 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.
o(serverOpts)
}

handler = httpContentDecompressor(handler, serverOpts.errHandler)
handler = httpContentDecompressor(handler, serverOpts.errHandler, serverOpts.decoders)

if hss.MaxRequestBodySize > 0 {
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)
Expand Down

0 comments on commit 57dd104

Please sign in to comment.