diff --git a/.chloggen/fix_followup-fix-kinesis-compression.yaml b/.chloggen/fix_followup-fix-kinesis-compression.yaml new file mode 100644 index 000000000000..0b90087591c8 --- /dev/null +++ b/.chloggen/fix_followup-fix-kinesis-compression.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awskinesisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fixed compressed data not generating the compression footers + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32860] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/awskinesisexporter/internal/compress/compresser.go b/exporter/awskinesisexporter/internal/compress/compresser.go index 07a221b0f489..766274e86bb9 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser.go +++ b/exporter/awskinesisexporter/internal/compress/compresser.go @@ -31,7 +31,6 @@ func NewCompressor(format string) (Compressor, error) { func flateCompressor(in []byte) ([]byte, error) { var buf bytes.Buffer w, _ := flate.NewWriter(&buf, flate.BestSpeed) - defer w.Close() _, err := w.Write(in) @@ -44,13 +43,17 @@ func flateCompressor(in []byte) ([]byte, error) { return nil, err } + err = w.Close() + if err != nil { + return nil, err + } + return buf.Bytes(), nil } func gzipCompressor(in []byte) ([]byte, error) { var buf bytes.Buffer w, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) - defer w.Close() _, err := w.Write(in) @@ -59,6 +62,12 @@ func gzipCompressor(in []byte) ([]byte, error) { } err = w.Flush() + if err != nil { + + return nil, err + } + + err = w.Close() if err != nil { return nil, err } @@ -69,7 +78,6 @@ func gzipCompressor(in []byte) ([]byte, error) { func zlibCompressor(in []byte) ([]byte, error) { var buf bytes.Buffer w, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed) - defer w.Close() _, err := w.Write(in) @@ -82,6 +90,11 @@ func zlibCompressor(in []byte) ([]byte, error) { return nil, err } + err = w.Close() + if err != nil { + return nil, err + } + return buf.Bytes(), nil } diff --git a/exporter/awskinesisexporter/internal/compress/compresser_test.go b/exporter/awskinesisexporter/internal/compress/compresser_test.go index 324a584ebe90..3ac82cf7330c 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser_test.go +++ b/exporter/awskinesisexporter/internal/compress/compresser_test.go @@ -4,7 +4,12 @@ package compress_test import ( + "bytes" + "compress/flate" + "compress/gzip" + "compress/zlib" "fmt" + "io" "math/rand" "sync" "testing" @@ -29,7 +34,8 @@ func TestCompressorFormats(t *testing.T) { {format: "flate"}, } - const data = "You know nothing Jon Snow" + data := createRandomString(1024) + for _, tc := range testCases { t.Run(fmt.Sprintf("format_%s", tc.format), func(t *testing.T) { c, err := compress.NewCompressor(tc.format) @@ -39,12 +45,44 @@ func TestCompressorFormats(t *testing.T) { out, err := c([]byte(data)) assert.NoError(t, err, "Must not error when processing data") assert.NotNil(t, out, "Must have a valid record") + + // now data gets decompressed and the original string gets compared with the decompressed one + var dc []byte + var err2 error + + switch tc.format { + case "gzip": + dc, err2 = decompressGzip(out) + case "zlib": + dc, err2 = decompressZlib(out) + case "flate": + dc, err2 = decompressFlate(out) + case "noop", "none": + dc = out + default: + dc = out + } + + assert.NoError(t, err2) + assert.Equal(t, data, string(dc)) }) } _, err := compress.NewCompressor("invalid-format") assert.Error(t, err, "Must error when an invalid compression format is given") } +func createRandomString(length int) string { + // some characters for the random generation + const letterBytes = " ,.;:*-+/[]{}<>abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + + b := make([]byte, length) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + + return string(b) +} + func BenchmarkNoopCompressor_1000Bytes(b *testing.B) { benchmarkCompressor(b, "none", 1000) } @@ -178,3 +216,48 @@ func concurrentCompressFunc(t *testing.T) { t.Errorf("Error encountered on concurrent compression: %v", err) } } + +func decompressGzip(input []byte) ([]byte, error) { + r, err := gzip.NewReader(bytes.NewReader(input)) + if err != nil { + return nil, err + } + + defer r.Close() + + decompressedData, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return decompressedData, nil +} + +func decompressZlib(input []byte) ([]byte, error) { + r, err := zlib.NewReader(bytes.NewReader(input)) + if err != nil { + return nil, err + } + + defer r.Close() + + decompressedData, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return decompressedData, nil +} + +func decompressFlate(input []byte) ([]byte, error) { + + r := flate.NewReader(bytes.NewReader(input)) + defer r.Close() + + decompressedData, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return decompressedData, nil +}