diff --git a/.chloggen/jpkroehling_allow-compression-list-to-be-overridden.yaml b/.chloggen/jpkroehling_allow-compression-list-to-be-overridden.yaml new file mode 100644 index 00000000000..e9c1eef9dc3 --- /dev/null +++ b/.chloggen/jpkroehling_allow-compression-list-to-be-overridden.yaml @@ -0,0 +1,18 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow the compression list to be overridden + +# One or more tracking issues or pull requests related to the change +issues: [10295] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Allows Collector administrators to control which compression algorithms to enable for HTTP-based receivers. \ No newline at end of file diff --git a/config/confighttp/README.md b/config/confighttp/README.md index a0227c2402b..49893a0fd63 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -75,6 +75,7 @@ will not be enabled. not set, browsers use a default of 5 seconds. - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `0` (no restriction) +- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"] - [`tls`](../configtls/README.md) - [`auth`](../configauth/README.md) @@ -98,6 +99,7 @@ receivers: - Example-Header max_age: 7200 endpoint: 0.0.0.0:55690 + compression_algorithms: ["", "gzip"] processors: attributes: actions: diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index a700bec845b..4498fefe864 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -25,6 +25,53 @@ type compressRoundTripper struct { compressor *compressor } +var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ + "": func(io.ReadCloser) (io.ReadCloser, error) { + // Not a compressed payload. Nothing to do. + return nil, nil + }, + "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { + gr, err := gzip.NewReader(body) + if err != nil { + return nil, err + } + return gr, nil + }, + "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zstd.NewReader( + body, + // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless + // for our use-case (a server accepting decoding http requests). + // Disabling async improves performance (I benchmarked it previously when working + // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). + zstd.WithDecoderConcurrency(1), + ) + if err != nil { + return nil, err + } + return zr.IOReadCloser(), nil + }, + "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zlib.NewReader(body) + if err != nil { + return nil, err + } + return zr, nil + }, + "snappy": func(body io.ReadCloser) (io.ReadCloser, error) { + sr := snappy.NewReader(body) + sb := new(bytes.Buffer) + _, err := io.Copy(sb, sr) + if err != nil { + return nil, err + } + if err = body.Close(); err != nil { + return nil, err + } + return io.NopCloser(sb), nil + }, +} + func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) { encoder, err := newCompressor(compressionType) if err != nil { @@ -77,64 +124,27 @@ type decompressor struct { // by identifying the compression format in the "Content-Encoding" header and re-writing // request body so that the handlers further in the chain can work on decompressed data. // It supports gzip and deflate/zlib compression. -func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { +func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), enableDecoders []string, decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler { errHandler := defaultErrorHandler if eh != nil { errHandler = eh } + enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){} + for _, dec := range enableDecoders { + enabled[dec] = availableDecoders[dec] + + if dec == "deflate" { + enabled["deflate"] = availableDecoders["zlib"] + } + } + d := &decompressor{ maxRequestBodySize: maxRequestBodySize, errHandler: errHandler, base: h, - decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){ - "": func(io.ReadCloser) (io.ReadCloser, error) { - // Not a compressed payload. Nothing to do. - return nil, nil - }, - "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { - gr, err := gzip.NewReader(body) - if err != nil { - return nil, err - } - return gr, nil - }, - "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zstd.NewReader( - body, - // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless - // for our use-case (a server accepting decoding http requests). - // Disabling async improves performance (I benchmarked it previously when working - // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). - zstd.WithDecoderConcurrency(1), - ) - if err != nil { - return nil, err - } - return zr.IOReadCloser(), nil - }, - "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zlib.NewReader(body) - if err != nil { - return nil, err - } - return zr, nil - }, - "snappy": func(body io.ReadCloser) (io.ReadCloser, error) { - sr := snappy.NewReader(body) - sb := new(bytes.Buffer) - _, err := io.Copy(sb, sr) - if err != nil { - return nil, err - } - if err = body.Close(); err != nil { - return nil, err - } - return io.NopCloser(sb), nil - }, - }, + decoders: enabled, } - d.decoders["deflate"] = d.decoders["zlib"] for key, dec := range decoders { d.decoders[key] = dec diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index db2f7b3b3c0..a4fcb013f4f 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -134,7 +134,7 @@ func TestHTTPCustomDecompression(t *testing.T) { return io.NopCloser(strings.NewReader("decompressed body")), nil }, } - srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, decoders)) + srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders)) t.Cleanup(srv.Close) @@ -253,7 +253,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { require.NoError(t, err, "failed to read request body: %v", err) assert.EqualValues(t, testBody, string(body)) w.WriteHeader(http.StatusOK) - }), defaultMaxRequestBodySize, defaultErrorHandler, noDecoders)) + }), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders)) t.Cleanup(srv.Close) req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody) @@ -349,6 +349,31 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) { require.Error(t, err) } +func TestOverrideCompressionList(t *testing.T) { + // prepare + configuredDecoders := []string{"none", "zlib"} + + srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil)) + t.Cleanup(srv.Close) + + req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body"))) + require.NoError(t, err, "failed to create request to test handler") + req.Header.Set("Content-Encoding", "snappy") + + client := http.Client{} + + // test + res, err := client.Do(req) + require.NoError(t, err) + + // verify + assert.Equal(t, http.StatusBadRequest, res.StatusCode, "test handler returned unexpected status code ") + _, err = io.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) +} + func compressGzip(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer gw := gzip.NewWriter(&buf) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 71b2f17ee2f..ec4ffed3795 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -31,6 +31,7 @@ import ( const headerContentEncoding = "Content-Encoding" const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB +var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate"} // ClientConfig defines settings for creating an HTTP client. type ClientConfig struct { @@ -280,6 +281,9 @@ type ServerConfig struct { // Additional headers attached to each HTTP response sent to the client. // Header values are opaque since they may be sensitive. ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"` + + // CompressionAlgorithms configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"] + CompressionAlgorithms []string `mapstructure:"compression_algorithms"` } // ToListener creates a net.Listener. @@ -345,7 +349,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin hss.MaxRequestBodySize = defaultMaxRequestBodySize } - handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders) + if hss.CompressionAlgorithms == nil { + hss.CompressionAlgorithms = defaultCompressionAlgorithms + } + + handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, hss.CompressionAlgorithms, serverOpts.decoders) if hss.MaxRequestBodySize > 0 { handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)