Skip to content

Commit

Permalink
Add zstd decompression support to HTTPServerSettings
Browse files Browse the repository at this point in the history
This adds ability to decompress zstd-compressed HTTP requests to
all receivers that use HTTPServerSettings.

Also added missing error handling for the case when an unsupported
compression type was used in the request. Now it correctly returns
400 Bad Request. Also added a unit test to verify this case.

Once this is merged I will submit a PR in contrib repo to add end-to-end
tests that use zstd compression in the testbed.
  • Loading branch information
tigrannajaryan committed Jun 20, 2023
1 parent 25129b7 commit a80df15
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 6 deletions.
17 changes: 17 additions & 0 deletions .chloggen/add-zstd-decompression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: HTTPServerSettings

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add zstd support to HTTPServerSettings

# One or more tracking issues or pull requests related to the change
issues: [7927]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This adds ability to decompress zstd-compressed HTTP requests to|
all receivers that use HTTPServerSettings.
16 changes: 16 additions & 0 deletions .chloggen/fix-unsupported-content-encoding-bug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: HTTPServerSettings

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure requests with unsupported Content-Encoding return HTTP 400 Bad Request

# One or more tracking issues or pull requests related to the change
issues: [7927]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
26 changes: 23 additions & 3 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"net/http"

"github.com/klauspost/compress/zstd"

"go.opentelemetry.io/collector/config/configcompression"
)

Expand Down Expand Up @@ -102,7 +105,7 @@ func (d *decompressor) wrap(h http.Handler) http.Handler {
defer newBody.Close()
// "Content-Encoding" header is removed to avoid decompressing twice
// in case the next handler(s) have implemented a similar mechanism.
r.Header.Del("Content-Encoding")
r.Header.Del(headerContentEncoding)
// "Content-Length" is set to -1 as the size of the decompressed body is unknown.
r.Header.Del("Content-Length")
r.ContentLength = -1
Expand All @@ -113,7 +116,8 @@ func (d *decompressor) wrap(h http.Handler) http.Handler {
}

func newBodyReader(r *http.Request) (io.ReadCloser, error) {
switch r.Header.Get("Content-Encoding") {
encoding := r.Header.Get(headerContentEncoding)
switch encoding {
case "gzip":
gr, err := gzip.NewReader(r.Body)
if err != nil {
Expand All @@ -126,8 +130,24 @@ func newBodyReader(r *http.Request) (io.ReadCloser, error) {
return nil, err
}
return zr, nil
case "zstd":
zr, err := zstd.NewReader(
r.Body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return io.NopCloser(zr), nil
case "":
// Not a compressed payload. Nothing to do.
return nil, nil
}
return nil, nil
return nil, fmt.Errorf("unsupported %s: %s", headerContentEncoding, encoding)
}

// defaultErrorHandler writes the error message in plain text.
Expand Down
28 changes: 27 additions & 1 deletion config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
reqBody: compressZlib(t, testBody),
respCode: 200,
},
{
name: "ValidZstd",
encoding: "zstd",
reqBody: compressZstd(t, testBody),
respCode: 200,
},
{
name: "InvalidGzip",
encoding: "gzip",
Expand All @@ -155,14 +161,34 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
respCode: 400,
respBody: "zlib: invalid header\n",
},
{
name: "InvalidZstd",
encoding: "zstd",
reqBody: bytes.NewBuffer(testBody),
respCode: 400,
respBody: "invalid input: magic number mismatch",
},
{
name: "UnsupportedCompression",
encoding: "nosuchcompression",
reqBody: bytes.NewBuffer(testBody),
respCode: 400,
respBody: "unsupported Content-Encoding: nosuchcompression\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}

require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(200)
w.WriteHeader(http.StatusOK)
})))
t.Cleanup(srv.Close)

Expand Down
54 changes: 52 additions & 2 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -153,6 +154,11 @@ func TestJsonHttp(t *testing.T) {
encoding: "gzip",
contentType: "application/json",
},
{
name: "JSONZstdCompressed",
encoding: "zstd",
contentType: "application/json",
},
{
name: "NotGRPCError",
encoding: "",
Expand Down Expand Up @@ -374,8 +380,13 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco
case "gzip":
buf, err = compressGzip(traceJSON)
require.NoError(t, err, "Error while gzip compressing trace: %v", err)
default:
case "zstd":
buf, err = compressZstd(traceJSON)
require.NoError(t, err, "Error while zstd compressing trace: %v", err)
case "":
buf = bytes.NewBuffer(traceJSON)
default:
t.Fatalf("Unsupported compression type %v", encoding)
}
sink.SetConsumeError(expectedErr)
req, err := http.NewRequest(http.MethodPost, url, buf)
Expand Down Expand Up @@ -427,6 +438,10 @@ func TestProtoHttp(t *testing.T) {
name: "ProtoGzipCompressed",
encoding: "gzip",
},
{
name: "ProtoZstdCompressed",
encoding: "zstd",
},
{
name: "NotGRPCError",
encoding: "",
Expand Down Expand Up @@ -477,8 +492,13 @@ func createHTTPProtobufRequest(
case "gzip":
buf, err = compressGzip(traceBytes)
require.NoError(t, err, "Error while gzip compressing trace: %v", err)
default:
case "zstd":
buf, err = compressZstd(traceBytes)
require.NoError(t, err, "Error while zstd compressing trace: %v", err)
case "":
buf = bytes.NewBuffer(traceBytes)
default:
t.Fatalf("Unsupported compression type %v", encoding)
}
req, err := http.NewRequest(http.MethodPost, url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
Expand Down Expand Up @@ -567,6 +587,18 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) {
},
status: 400,
},
{
name: "ProtoZstdUncompressed",
content: "application/x-protobuf",
encoding: "zstd",
reqBodyFunc: func() (*bytes.Buffer, error) {
return bytes.NewBuffer([]byte(`{"key": "value"}`)), nil
},
resBodyFunc: func() ([]byte, error) {
return proto.Marshal(status.New(codes.InvalidArgument, "invalid input: magic number mismatch").Proto())
},
status: 400,
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -969,6 +1001,24 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {
return &buf, nil
}

func compressZstd(body []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer

zw, err := zstd.NewWriter(&buf)
if err != nil {
return nil, err
}

defer zw.Close()

_, err = zw.Write(body)
if err != nil {
return nil, err
}

return &buf, nil
}

type senderFunc func(td ptrace.Traces)

func TestShutdown(t *testing.T) {
Expand Down

0 comments on commit a80df15

Please sign in to comment.