Skip to content

Commit

Permalink
[configgrpc] Use own compressors for zstd (#10323) (#10324)
Browse files Browse the repository at this point in the history
Backport of #10323

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
mx-psi and jpkrohling authored Jun 5, 2024
1 parent 0ab388b commit 10e89bd
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 6 deletions.
13 changes: 13 additions & 0 deletions .chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: Use own compressors for zstd

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Before this change, the zstd compressor we used didn't respect the max message size.

# One or more tracking issues or pull requests related to the change
issues: [10323]
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
Expand All @@ -28,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
case configcompression.TypeSnappy:
return snappy.Name, nil
case configcompression.TypeZstd:
return zstd.Name, nil
return grpcInternal.ZstdName, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
}
Expand Down
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"testing"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) {

compressors := make([]encoding.Compressor, 0)
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
compressors = append(compressors, encoding.GetCompressor(internal.ZstdName))
compressors = append(compressors, encoding.GetCompressor(snappy.Name))

for _, payload := range payloads {
Expand Down
2 changes: 1 addition & 1 deletion config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc
go 1.21.0

require (
github.com/klauspost/compress v1.17.2
github.com/mostynb/go-grpc-compression v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.102.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
Expand Down
83 changes: 83 additions & 0 deletions config/configgrpc/internal/zstd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// Copyright 2017 gRPC authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"

import (
"errors"
"io"
"sync"

"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)

const ZstdName = "zstd"

func init() {
encoding.RegisterCompressor(NewZstdCodec())
}

type writer struct {
*zstd.Encoder
pool *sync.Pool
}

func NewZstdCodec() encoding.Compressor {
c := &compressor{}
c.poolCompressor.New = func() any {
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
return &writer{Encoder: zw, pool: &c.poolCompressor}
}
return c
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
}

func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Encoder.Close()
}

type reader struct {
*zstd.Decoder
pool *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := zstd.NewReader(r)
if err != nil {
return nil, err
}
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if errors.Is(err, io.EOF) {
z.pool.Put(z)
}
return n, err
}

func (c *compressor) Name() string {
return ZstdName
}

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}
41 changes: 41 additions & 0 deletions config/configgrpc/internal/zstd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/require"
)

func Test_zstdCodec_CompressDecompress(t *testing.T) {
// prepare
msg := []byte("Hello world.")
compressed := &bytes.Buffer{}

// zstd header, for sanity checking
header := []byte{40, 181, 47, 253}

c := NewZstdCodec()
cWriter, err := c.Compress(compressed)
require.NoError(t, err)
require.NotNil(t, cWriter)

_, err = cWriter.Write(msg)
require.NoError(t, err)
cWriter.Close()

cReader, err := c.Decompress(compressed)
require.NoError(t, err)
require.NotNil(t, cReader)

uncompressed, err := io.ReadAll(cReader)
require.NoError(t, err)
require.Equal(t, msg, uncompressed)

// test header
require.Equal(t, header, compressed.Bytes()[:4])
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) {
require.NoError(t, err)

td := testdata.GenerateTraces(50000)
require.Error(t, exportTraces(cc, td))
err = exportTraces(cc, td)
require.Error(t, err)
assert.NoError(t, cc.Close())
require.NoError(t, recv.Shutdown(context.Background()))

Expand Down

0 comments on commit 10e89bd

Please sign in to comment.