Skip to content

Commit

Permalink
feat(compress): add lz4hc support
Browse files Browse the repository at this point in the history
  • Loading branch information
tomershafir committed Apr 28, 2024
1 parent 7736dc1 commit bf828e4
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 63 deletions.
31 changes: 21 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,27 @@ const (
CompressionZSTD
// CompressionNone uses no compression but data has checksums.
CompressionNone
// CompressionLZ4HC enables LZ4 HC compression for data. High CPU overhead.
CompressionLZ4HC
)

// CompressionLevel setting. A level == 0 is invalid and resolves to the default.
//
// Supported by: LZ4HC.
type CompressionLevel uint32

// Options for Client. Zero value is valid.
type Options struct {
Logger *zap.Logger // defaults to Nop.
Address string // 127.0.0.1:9000
Database string // "default"
User string // "default"
Password string // blank string by default
QuotaKey string // blank string by default
Compression Compression // disabled by default
ClientName string // blank string by default
Settings []Setting // none by default
Logger *zap.Logger // defaults to Nop.
Address string // 127.0.0.1:9000
Database string // "default"
User string // "default"
Password string // blank string by default
QuotaKey string // blank string by default
Compression Compression // disabled by default
CompressionLevel CompressionLevel // compression algorithm specific default
ClientName string // blank string by default
Settings []Setting // none by default

// ReadTimeout is a timeout for reading a single packet from the server.
//
Expand Down Expand Up @@ -459,7 +467,7 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {

readTimeout: opt.ReadTimeout,

compressor: compress.NewWriter(),
compressor: compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel)),

version: ver,
protocolVersion: opt.ProtocolVersion,
Expand All @@ -479,6 +487,9 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
case CompressionLZ4:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4
case CompressionLZ4HC:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4HC
case CompressionZSTD:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.ZSTD
Expand Down
3 changes: 3 additions & 0 deletions compress/_golden/data_compressed_lz4_hc.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
00000000 34 39 ad b3 8d 96 d2 87 bb 3b aa 1e 3f 4b 64 f5 |49.......;..?Kd.|
00000010 82 1d 00 00 00 af 00 00 00 8f 48 65 6c 6c 6f 21 |..........Hello!|
00000020 0a 48 07 00 8d 70 48 65 6c 6c 6f 21 0a |.H...pHello!.|
Binary file added compress/_golden/data_compressed_lz4_hc.raw
Binary file not shown.
28 changes: 24 additions & 4 deletions compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,38 @@ import (
"github.com/go-faster/city"
)

//go:generate go run github.com/dmarkham/enumer -transform snake_upper -type Method -output method_enum.go
//go:generate go run github.com/dmarkham/enumer -transform upper -type Method -output method_enum.go

// Method is compression codec.
type Method byte

// Possible compression methods.
const (
None Method = 0x02
LZ4 Method = 0x82
ZSTD Method = 0x90
None Method = iota
LZ4
LZ4HC
ZSTD
)

type methodEncoding byte

const (
encodedNone methodEncoding = 0x02
encodedLZ4 methodEncoding = 0x82
encodedLZ4HC methodEncoding = encodedLZ4
encodedZSTD methodEncoding = 0x90
)

var methodTable = map[Method]methodEncoding{
None: encodedNone,
LZ4: encodedLZ4,
LZ4HC: encodedLZ4HC,
ZSTD: encodedZSTD,
}

// Level for supporting compression codecs.
type Level uint32

// Constants for compression encoding.
//
// See https://go-faster.org/docs/clickhouse/compression for reference.
Expand Down
59 changes: 24 additions & 35 deletions compress/method_enum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions compress/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (r *Reader) readBlock() error {
DataSize: dataSize,
}, "mismatch")
}
switch m := Method(r.header[hMethod]); m {
case LZ4:
switch m := methodEncoding(r.header[hMethod]); m {
case encodedLZ4: // == encodedLZ4HC, as decompression is similar for both
n, err := lz4.UncompressBlock(r.raw[headerSize:], r.data)
if err != nil {
return errors.Wrap(err, "uncompress")
Expand All @@ -81,7 +81,7 @@ func (r *Reader) readBlock() error {
n, dataSize,
)
}
case ZSTD:
case encodedZSTD:
if r.zstd == nil {
// Lazily initializing to prevent spawning goroutines in NewReader.
// See https://github.com/golang/go/issues/47056#issuecomment-997436820
Expand All @@ -104,7 +104,7 @@ func (r *Reader) readBlock() error {
)
}
r.data = data
case None:
case encodedNone:
copy(r.data, r.raw[headerSize:])
default:
return errors.Errorf("compression 0x%02x not implemented", m)
Expand Down
39 changes: 33 additions & 6 deletions compress/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,34 @@ package compress

import (
"encoding/binary"
"math"

"github.com/go-faster/city"
"github.com/go-faster/errors"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)

const (
CompressionLevelLZ4HCDefault Level = 9
CompressionLevelLZ4HCMax Level = 12
)

// Writer encodes compressed blocks.
type Writer struct {
Data []byte

lz4 *lz4.Compressor
zstd *zstd.Encoder
lz4 *lz4.Compressor
lz4hc *lz4.CompressorHC
zstd *zstd.Encoder
}

// Compress buf into Data.
func (w *Writer) Compress(m Method, buf []byte) error {
maxSize := lz4.CompressBlockBound(len(buf))
w.Data = append(w.Data[:0], make([]byte, maxSize+headerSize)...)
_ = w.Data[:headerSize]
w.Data[hMethod] = byte(m)
w.Data[hMethod] = byte(methodTable[m])

var n int

Expand All @@ -33,6 +40,12 @@ func (w *Writer) Compress(m Method, buf []byte) error {
return errors.Wrap(err, "block")
}
n = compressedSize
case LZ4HC:
compressedSize, err := w.lz4hc.CompressBlock(buf, w.Data[headerSize:])
if err != nil {
return errors.Wrap(err, "block")
}
n = compressedSize
case ZSTD:
w.Data = w.zstd.EncodeAll(buf, w.Data[:headerSize])
n = len(w.Data) - headerSize
Expand All @@ -51,7 +64,7 @@ func (w *Writer) Compress(m Method, buf []byte) error {
return nil
}

func NewWriter() *Writer {
func NewWriterWithLevel(l Level) *Writer {
w, err := zstd.NewWriter(nil,
zstd.WithEncoderLevel(zstd.SpeedDefault),
zstd.WithEncoderConcurrency(1),
Expand All @@ -60,8 +73,22 @@ func NewWriter() *Writer {
if err != nil {
panic(err)
}

// handle level for LZ4HC
levelLZ4HC := l
if levelLZ4HC == 0 {
levelLZ4HC = CompressionLevelLZ4HCDefault
} else {
levelLZ4HC = Level(math.Min(float64(levelLZ4HC), float64(CompressionLevelLZ4HCMax)))
}

return &Writer{
lz4: &lz4.Compressor{},
zstd: w,
lz4: &lz4.Compressor{},
lz4hc: &lz4.CompressorHC{Level: lz4.CompressionLevel(1 << (8 + levelLZ4HC))},
zstd: w,
}
}

func NewWriter() *Writer {
return NewWriterWithLevel(0)
}
12 changes: 8 additions & 4 deletions compression_enum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ func TestClientCompression(t *testing.T) {
}
}
t.Run("LZ4", testCompression(CompressionLZ4))
t.Run("LZ4HC", testCompression(CompressionLZ4HC))
t.Run("ZSTD", testCompression(CompressionZSTD))
t.Run("None", testCompression(CompressionNone))
t.Run("Disabled", testCompression(CompressionDisabled))
Expand Down

0 comments on commit bf828e4

Please sign in to comment.