From 2670b960c5b2d6ba45c7e2346b59eebf505804f6 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 28 Apr 2024 13:25:41 +0300 Subject: [PATCH] feat(compress): add lz4hc support --- client.go | 33 +++++++---- compress/_golden/data_compressed_lz4_hc.hex | 3 + compress/_golden/data_compressed_lz4_hc.raw | Bin 0 -> 45 bytes compress/compress.go | 28 ++++++++-- compress/method_enum.go | 59 ++++++++------------ compress/reader.go | 8 +-- compress/writer.go | 39 +++++++++++-- compression_enum.go | 14 +++-- query_test.go | 1 + 9 files changed, 120 insertions(+), 65 deletions(-) create mode 100644 compress/_golden/data_compressed_lz4_hc.hex create mode 100644 compress/_golden/data_compressed_lz4_hc.raw diff --git a/client.go b/client.go index 51c7ae03..a8f0e2f7 100644 --- a/client.go +++ b/client.go @@ -291,7 +291,7 @@ func (c *Client) encode(v proto.AwareEncoder) { v.EncodeAware(c.buf, c.protocolVersion) } -//go:generate go run github.com/dmarkham/enumer -transform snake_upper -type Compression -trimprefix Compression -output compression_enum.go +//go:generate go run github.com/dmarkham/enumer -transform upper -type Compression -trimprefix Compression -output compression_enum.go // Compression setting. // @@ -307,19 +307,27 @@ const ( CompressionZSTD // CompressionNone uses no compression but data has checksums. CompressionNone + // CompressionLZ4HC enables LZ4HC compression for data. High CPU overhead. + CompressionLZ4HC ) +// CompressionLevel setting. A level == 0 is invalid and resolves to the default. +// +// Supported by: LZ4HC. +type CompressionLevel uint32 + // Options for Client. Zero value is valid. type Options struct { - Logger *zap.Logger // defaults to Nop. - Address string // 127.0.0.1:9000 - Database string // "default" - User string // "default" - Password string // blank string by default - QuotaKey string // blank string by default - Compression Compression // disabled by default - ClientName string // blank string by default - Settings []Setting // none by default + Logger *zap.Logger // defaults to Nop. + Address string // 127.0.0.1:9000 + Database string // "default" + User string // "default" + Password string // blank string by default + QuotaKey string // blank string by default + Compression Compression // disabled by default + CompressionLevel CompressionLevel // compression algorithm specific default + ClientName string // blank string by default + Settings []Setting // none by default // ReadTimeout is a timeout for reading a single packet from the server. // @@ -459,7 +467,7 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) { readTimeout: opt.ReadTimeout, - compressor: compress.NewWriter(), + compressor: compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel)), version: ver, protocolVersion: opt.ProtocolVersion, @@ -479,6 +487,9 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) { case CompressionLZ4: c.compression = proto.CompressionEnabled c.compressionMethod = compress.LZ4 + case CompressionLZ4HC: + c.compression = proto.CompressionEnabled + c.compressionMethod = compress.LZ4HC case CompressionZSTD: c.compression = proto.CompressionEnabled c.compressionMethod = compress.ZSTD diff --git a/compress/_golden/data_compressed_lz4_hc.hex b/compress/_golden/data_compressed_lz4_hc.hex new file mode 100644 index 00000000..ab9cf987 --- /dev/null +++ b/compress/_golden/data_compressed_lz4_hc.hex @@ -0,0 +1,3 @@ +00000000 34 39 ad b3 8d 96 d2 87 bb 3b aa 1e 3f 4b 64 f5 |49.......;..?Kd.| +00000010 82 1d 00 00 00 af 00 00 00 8f 48 65 6c 6c 6f 21 |..........Hello!| +00000020 0a 48 07 00 8d 70 48 65 6c 6c 6f 21 0a |.H...pHello!.| diff --git a/compress/_golden/data_compressed_lz4_hc.raw b/compress/_golden/data_compressed_lz4_hc.raw new file mode 100644 index 0000000000000000000000000000000000000000..e3784427dbf5792df3a9111c42433796e4d6f01c GIT binary patch literal 45 ucmXr9T)Vk<+NJj0)~n>~y;Ht6$ucl7tOw$LkJOx;d_^t~c81;pC>sEe3J+2M literal 0 HcmV?d00001 diff --git a/compress/compress.go b/compress/compress.go index a89c6401..a020e171 100644 --- a/compress/compress.go +++ b/compress/compress.go @@ -7,18 +7,38 @@ import ( "github.com/go-faster/city" ) -//go:generate go run github.com/dmarkham/enumer -transform snake_upper -type Method -output method_enum.go +//go:generate go run github.com/dmarkham/enumer -transform upper -type Method -output method_enum.go // Method is compression codec. type Method byte // Possible compression methods. const ( - None Method = 0x02 - LZ4 Method = 0x82 - ZSTD Method = 0x90 + None Method = iota + LZ4 + LZ4HC + ZSTD ) +type methodEncoding byte + +const ( + encodedNone methodEncoding = 0x02 + encodedLZ4 methodEncoding = 0x82 + encodedLZ4HC methodEncoding = encodedLZ4 + encodedZSTD methodEncoding = 0x90 +) + +var methodTable = map[Method]methodEncoding{ + None: encodedNone, + LZ4: encodedLZ4, + LZ4HC: encodedLZ4HC, + ZSTD: encodedZSTD, +} + +// Level for supporting compression codecs. +type Level uint32 + // Constants for compression encoding. // // See https://go-faster.org/docs/clickhouse/compression for reference. diff --git a/compress/method_enum.go b/compress/method_enum.go index 8d443520..373eb288 100644 --- a/compress/method_enum.go +++ b/compress/method_enum.go @@ -1,4 +1,4 @@ -// Code generated by "enumer -transform snake_upper -type Method -output method_enum.go"; DO NOT EDIT. +// Code generated by "enumer -transform upper -type Method -output method_enum.go"; DO NOT EDIT. package compress @@ -7,58 +7,47 @@ import ( "strings" ) -const ( - _MethodName_0 = "NONE" - _MethodLowerName_0 = "none" - _MethodName_1 = "LZ4" - _MethodLowerName_1 = "lz4" - _MethodName_2 = "ZSTD" - _MethodLowerName_2 = "zstd" -) +const _MethodName = "NONELZ4LZ4HCZSTD" -var ( - _MethodIndex_0 = [...]uint8{0, 4} - _MethodIndex_1 = [...]uint8{0, 3} - _MethodIndex_2 = [...]uint8{0, 4} -) +var _MethodIndex = [...]uint8{0, 4, 7, 12, 16} + +const _MethodLowerName = "nonelz4lz4hczstd" func (i Method) String() string { - switch { - case i == 2: - return _MethodName_0 - case i == 130: - return _MethodName_1 - case i == 144: - return _MethodName_2 - default: + if i >= Method(len(_MethodIndex)-1) { return fmt.Sprintf("Method(%d)", i) } + return _MethodName[_MethodIndex[i]:_MethodIndex[i+1]] } // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. func _MethodNoOp() { var x [1]struct{} - _ = x[None-(2)] - _ = x[LZ4-(130)] - _ = x[ZSTD-(144)] + _ = x[None-(0)] + _ = x[LZ4-(1)] + _ = x[LZ4HC-(2)] + _ = x[ZSTD-(3)] } -var _MethodValues = []Method{None, LZ4, ZSTD} +var _MethodValues = []Method{None, LZ4, LZ4HC, ZSTD} var _MethodNameToValueMap = map[string]Method{ - _MethodName_0[0:4]: None, - _MethodLowerName_0[0:4]: None, - _MethodName_1[0:3]: LZ4, - _MethodLowerName_1[0:3]: LZ4, - _MethodName_2[0:4]: ZSTD, - _MethodLowerName_2[0:4]: ZSTD, + _MethodName[0:4]: None, + _MethodLowerName[0:4]: None, + _MethodName[4:7]: LZ4, + _MethodLowerName[4:7]: LZ4, + _MethodName[7:12]: LZ4HC, + _MethodLowerName[7:12]: LZ4HC, + _MethodName[12:16]: ZSTD, + _MethodLowerName[12:16]: ZSTD, } var _MethodNames = []string{ - _MethodName_0[0:4], - _MethodName_1[0:3], - _MethodName_2[0:4], + _MethodName[0:4], + _MethodName[4:7], + _MethodName[7:12], + _MethodName[12:16], } // MethodString retrieves an enum value from the enum constants string name. diff --git a/compress/reader.go b/compress/reader.go index 6a26f9df..fc4a0e6b 100644 --- a/compress/reader.go +++ b/compress/reader.go @@ -70,8 +70,8 @@ func (r *Reader) readBlock() error { DataSize: dataSize, }, "mismatch") } - switch m := Method(r.header[hMethod]); m { - case LZ4: + switch m := methodEncoding(r.header[hMethod]); m { + case encodedLZ4: // == encodedLZ4HC, as decompression is similar for both n, err := lz4.UncompressBlock(r.raw[headerSize:], r.data) if err != nil { return errors.Wrap(err, "uncompress") @@ -81,7 +81,7 @@ func (r *Reader) readBlock() error { n, dataSize, ) } - case ZSTD: + case encodedZSTD: if r.zstd == nil { // Lazily initializing to prevent spawning goroutines in NewReader. // See https://github.com/golang/go/issues/47056#issuecomment-997436820 @@ -104,7 +104,7 @@ func (r *Reader) readBlock() error { ) } r.data = data - case None: + case encodedNone: copy(r.data, r.raw[headerSize:]) default: return errors.Errorf("compression 0x%02x not implemented", m) diff --git a/compress/writer.go b/compress/writer.go index 6094b055..35e32387 100644 --- a/compress/writer.go +++ b/compress/writer.go @@ -2,6 +2,7 @@ package compress import ( "encoding/binary" + "math" "github.com/go-faster/city" "github.com/go-faster/errors" @@ -9,12 +10,18 @@ import ( "github.com/pierrec/lz4/v4" ) +const ( + CompressionLevelLZ4HCDefault Level = 9 + CompressionLevelLZ4HCMax Level = 12 +) + // Writer encodes compressed blocks. type Writer struct { Data []byte - lz4 *lz4.Compressor - zstd *zstd.Encoder + lz4 *lz4.Compressor + lz4hc *lz4.CompressorHC + zstd *zstd.Encoder } // Compress buf into Data. @@ -22,7 +29,7 @@ func (w *Writer) Compress(m Method, buf []byte) error { maxSize := lz4.CompressBlockBound(len(buf)) w.Data = append(w.Data[:0], make([]byte, maxSize+headerSize)...) _ = w.Data[:headerSize] - w.Data[hMethod] = byte(m) + w.Data[hMethod] = byte(methodTable[m]) var n int @@ -33,6 +40,12 @@ func (w *Writer) Compress(m Method, buf []byte) error { return errors.Wrap(err, "block") } n = compressedSize + case LZ4HC: + compressedSize, err := w.lz4hc.CompressBlock(buf, w.Data[headerSize:]) + if err != nil { + return errors.Wrap(err, "block") + } + n = compressedSize case ZSTD: w.Data = w.zstd.EncodeAll(buf, w.Data[:headerSize]) n = len(w.Data) - headerSize @@ -51,7 +64,7 @@ func (w *Writer) Compress(m Method, buf []byte) error { return nil } -func NewWriter() *Writer { +func NewWriterWithLevel(l Level) *Writer { w, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault), zstd.WithEncoderConcurrency(1), @@ -60,8 +73,22 @@ func NewWriter() *Writer { if err != nil { panic(err) } + + // handle level for LZ4HC + levelLZ4HC := l + if levelLZ4HC == 0 { + levelLZ4HC = CompressionLevelLZ4HCDefault + } else { + levelLZ4HC = Level(math.Min(float64(levelLZ4HC), float64(CompressionLevelLZ4HCMax))) + } + return &Writer{ - lz4: &lz4.Compressor{}, - zstd: w, + lz4: &lz4.Compressor{}, + lz4hc: &lz4.CompressorHC{Level: lz4.CompressionLevel(1 << (8 + levelLZ4HC))}, + zstd: w, } } + +func NewWriter() *Writer { + return NewWriterWithLevel(0) +} diff --git a/compression_enum.go b/compression_enum.go index 67a4eaf9..0b971fe5 100644 --- a/compression_enum.go +++ b/compression_enum.go @@ -1,4 +1,4 @@ -// Code generated by "enumer -transform snake_upper -type Compression -trimprefix Compression -output compression_enum.go"; DO NOT EDIT. +// Code generated by "enumer -transform upper -type Compression -trimprefix Compression -output compression_enum.go"; DO NOT EDIT. package ch @@ -7,11 +7,11 @@ import ( "strings" ) -const _CompressionName = "DISABLEDLZ4ZSTDNONE" +const _CompressionName = "DISABLEDLZ4ZSTDNONELZ4HC" -var _CompressionIndex = [...]uint8{0, 8, 11, 15, 19} +var _CompressionIndex = [...]uint8{0, 8, 11, 15, 19, 24} -const _CompressionLowerName = "disabledlz4zstdnone" +const _CompressionLowerName = "disabledlz4zstdnonelz4hc" func (i Compression) String() string { if i >= Compression(len(_CompressionIndex)-1) { @@ -28,9 +28,10 @@ func _CompressionNoOp() { _ = x[CompressionLZ4-(1)] _ = x[CompressionZSTD-(2)] _ = x[CompressionNone-(3)] + _ = x[CompressionLZ4HC-(4)] } -var _CompressionValues = []Compression{CompressionDisabled, CompressionLZ4, CompressionZSTD, CompressionNone} +var _CompressionValues = []Compression{CompressionDisabled, CompressionLZ4, CompressionZSTD, CompressionNone, CompressionLZ4HC} var _CompressionNameToValueMap = map[string]Compression{ _CompressionName[0:8]: CompressionDisabled, @@ -41,6 +42,8 @@ var _CompressionNameToValueMap = map[string]Compression{ _CompressionLowerName[11:15]: CompressionZSTD, _CompressionName[15:19]: CompressionNone, _CompressionLowerName[15:19]: CompressionNone, + _CompressionName[19:24]: CompressionLZ4HC, + _CompressionLowerName[19:24]: CompressionLZ4HC, } var _CompressionNames = []string{ @@ -48,6 +51,7 @@ var _CompressionNames = []string{ _CompressionName[8:11], _CompressionName[11:15], _CompressionName[15:19], + _CompressionName[19:24], } // CompressionString retrieves an enum value from the enum constants string name. diff --git a/query_test.go b/query_test.go index a752bc3c..a395a41f 100644 --- a/query_test.go +++ b/query_test.go @@ -1061,6 +1061,7 @@ func TestClientCompression(t *testing.T) { } } t.Run("LZ4", testCompression(CompressionLZ4)) + t.Run("LZ4HC", testCompression(CompressionLZ4HC)) t.Run("ZSTD", testCompression(CompressionZSTD)) t.Run("None", testCompression(CompressionNone)) t.Run("Disabled", testCompression(CompressionDisabled))