diff --git a/message.go b/message.go index fd0d1d90b..c6f35a3f5 100644 --- a/message.go +++ b/message.go @@ -42,6 +42,28 @@ func (cc CompressionCodec) String() string { }[int(cc)] } +// UnmarshalText returns a CompressionCodec from its string representation. +func (cc *CompressionCodec) UnmarshalText(text []byte) error { + codecs := map[string]CompressionCodec{ + "none": CompressionNone, + "gzip": CompressionGZIP, + "snappy": CompressionSnappy, + "lz4": CompressionLZ4, + "zstd": CompressionZSTD, + } + codec, ok := codecs[string(text)] + if !ok { + return fmt.Errorf("cannot parse %q as a compression codec", string(text)) + } + *cc = codec + return nil +} + +// MarshalText transforms a CompressionCodec into its string representation. +func (cc CompressionCodec) MarshalText() ([]byte, error) { + return []byte(cc.String()), nil +} + // Message is a kafka message type type Message struct { Codec CompressionCodec // codec used to compress the message contents diff --git a/message_test.go b/message_test.go index a6c7cff2a..d7bd430d3 100644 --- a/message_test.go +++ b/message_test.go @@ -244,3 +244,32 @@ func TestMessageDecodingUnknownVersions(t *testing.T) { t.Error("Decoding an unknown magic byte produced an unknown error ", err) } } + +func TestCompressionCodecUnmarshal(t *testing.T) { + cases := []struct { + Input string + Expected CompressionCodec + ExpectedError bool + }{ + {"none", CompressionNone, false}, + {"zstd", CompressionZSTD, false}, + {"gzip", CompressionGZIP, false}, + {"unknown", CompressionNone, true}, + } + for _, c := range cases { + var cc CompressionCodec + err := cc.UnmarshalText([]byte(c.Input)) + if err != nil && !c.ExpectedError { + t.Errorf("UnmarshalText(%q) error:\n%+v", c.Input, err) + continue + } + if err == nil && c.ExpectedError { + t.Errorf("UnmarshalText(%q) got %v but expected error", c.Input, cc) + continue + } + if cc != c.Expected { + t.Errorf("UnmarshalText(%q) got %v but expected %v", c.Input, cc, c.Expected) + continue + } + } +}