From dd1bf3d423384caa642791bc83049b50aa693683 Mon Sep 17 00:00:00 2001 From: A Gardner <3100188+actgardner@users.noreply.github.com> Date: Wed, 4 Nov 2020 09:47:06 -0500 Subject: [PATCH] Add generic support for deserializing binary data --- v7/compiler/compile.go | 6 +- v7/generic/array.go | 42 ++++++++++++ v7/generic/datum.go | 39 +++++++++++ v7/generic/enum.go | 37 ++++++++++ v7/generic/map.go | 45 +++++++++++++ v7/generic/primitive.go | 45 +++++++++++++ v7/generic/record.go | 49 ++++++++++++++ v7/generic/record_test.go | 137 ++++++++++++++++++++++++++++++++++++++ v7/generic/serde.go | 49 ++++++++++++++ v7/generic/union.go | 32 +++++++++ v7/go.mod | 6 +- v7/go.sum | 6 ++ v7/vm/eval.go | 2 +- 13 files changed, 488 insertions(+), 7 deletions(-) create mode 100644 v7/generic/array.go create mode 100644 v7/generic/datum.go create mode 100644 v7/generic/enum.go create mode 100644 v7/generic/map.go create mode 100644 v7/generic/primitive.go create mode 100644 v7/generic/record.go create mode 100644 v7/generic/record_test.go create mode 100644 v7/generic/serde.go create mode 100644 v7/generic/union.go diff --git a/v7/compiler/compile.go b/v7/compiler/compile.go index 9e424446..397880ea 100644 --- a/v7/compiler/compile.go +++ b/v7/compiler/compile.go @@ -13,12 +13,12 @@ import ( // If you're reading records from an OCF you can use the NewReader() // method that's generated for you, which will parse the schemas automatically. func CompileSchemaBytes(writer, reader []byte, opts ...Option) (*vm.Program, error) { - readerType, err := parseSchema(reader) + readerType, err := ParseSchema(reader) if err != nil { return nil, err } - writerType, err := parseSchema(writer) + writerType, err := ParseSchema(writer) if err != nil { return nil, err } @@ -26,7 +26,7 @@ func CompileSchemaBytes(writer, reader []byte, opts ...Option) (*vm.Program, err return Compile(writerType, readerType, opts...) } -func parseSchema(s []byte) (schema.AvroType, error) { +func ParseSchema(s []byte) (schema.AvroType, error) { ns := parser.NewNamespace(false) sType, err := ns.TypeForSchema(s) if err != nil { diff --git a/v7/generic/array.go b/v7/generic/array.go new file mode 100644 index 00000000..a03ab551 --- /dev/null +++ b/v7/generic/array.go @@ -0,0 +1,42 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type arrayDatum struct { + itemType schema.AvroType + items []Datum +} + +func (r *arrayDatum) Datum() interface{} { + v := make([]interface{}, len(r.items)) + for i, item := range r.items { + v[i] = item.Datum() + } + return v +} + +func (r *arrayDatum) SetBoolean(v bool) {} + +func (r *arrayDatum) SetInt(v int32) {} +func (r *arrayDatum) SetLong(v int64) {} +func (r *arrayDatum) SetFloat(v float32) {} +func (r *arrayDatum) SetDouble(v float64) {} +func (r *arrayDatum) SetBytes(v []byte) {} +func (r *arrayDatum) SetString(v string) {} + +func (r *arrayDatum) Get(i int) types.Field { panic("") } +func (r *arrayDatum) SetDefault(i int) {} + +func (r *arrayDatum) AppendMap(key string) types.Field { panic("") } + +func (r *arrayDatum) AppendArray() types.Field { + d := DatumForType(r.itemType) + r.items = append(r.items, d) + return d +} + +func (r *arrayDatum) NullField(t int) {} +func (r *arrayDatum) Finalize() {} diff --git a/v7/generic/datum.go b/v7/generic/datum.go new file mode 100644 index 00000000..ddae8794 --- /dev/null +++ b/v7/generic/datum.go @@ -0,0 +1,39 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type Datum interface { + types.Field + Datum() interface{} +} + +func DatumForType(t schema.AvroType) Datum { + switch st := t.(type) { + case *schema.BoolField, *schema.BytesField, *schema.FloatField, *schema.DoubleField, *schema.IntField, *schema.LongField, *schema.StringField, *schema.NullField: + return &primitiveDatum{} + case *schema.MapField: + return &mapDatum{itemType: st.ItemType()} + case *schema.ArrayField: + return &arrayDatum{itemType: st.ItemType()} + case *schema.Reference: + return datumForReference(st) + case *schema.UnionField: + return &unionDatum{itemTypes: st.ItemTypes()} + } + panic("") +} + +func datumForReference(ref *schema.Reference) Datum { + switch d := ref.Def.(type) { + case *schema.RecordDefinition: + return newRecordDatum(d) + case *schema.EnumDefinition: + return newEnumDatum(d) + case *schema.FixedDefinition: + return &primitiveDatum{} + } + panic("") +} diff --git a/v7/generic/enum.go b/v7/generic/enum.go new file mode 100644 index 00000000..67018ad4 --- /dev/null +++ b/v7/generic/enum.go @@ -0,0 +1,37 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type enumDatum struct { + symbols []string + value string +} + +func newEnumDatum(def *schema.EnumDefinition) *enumDatum { + return &enumDatum{ + symbols: def.Symbols(), + } +} + +func (r *enumDatum) Datum() interface{} { + return r.value +} + +func (r *enumDatum) SetBoolean(v bool) { panic("") } +func (r *enumDatum) SetInt(v int32) { panic("") } +func (r *enumDatum) SetLong(v int64) { + r.value = r.symbols[v] +} +func (r *enumDatum) SetFloat(v float32) { panic("") } +func (r *enumDatum) SetDouble(v float64) { panic("") } +func (r *enumDatum) SetBytes(v []byte) { panic("") } +func (r *enumDatum) SetString(v string) { panic("") } +func (r *enumDatum) Get(i int) types.Field { panic("") } +func (r *enumDatum) SetDefault(i int) {} +func (r *enumDatum) AppendMap(key string) types.Field { panic("") } +func (r *enumDatum) AppendArray() types.Field { panic("") } +func (r *enumDatum) NullField(t int) { panic("") } +func (r *enumDatum) Finalize() { panic("") } diff --git a/v7/generic/map.go b/v7/generic/map.go new file mode 100644 index 00000000..e2c15ec8 --- /dev/null +++ b/v7/generic/map.go @@ -0,0 +1,45 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type mapDatum struct { + itemType schema.AvroType + items map[string]Datum +} + +func (r *mapDatum) Datum() interface{} { + v := make(map[string]interface{}) + for k, item := range r.items { + v[k] = item.Datum() + } + return v +} + +func (r *mapDatum) SetBoolean(v bool) {} + +func (r *mapDatum) SetInt(v int32) {} +func (r *mapDatum) SetLong(v int64) {} +func (r *mapDatum) SetFloat(v float32) {} +func (r *mapDatum) SetDouble(v float64) {} +func (r *mapDatum) SetBytes(v []byte) {} +func (r *mapDatum) SetString(v string) {} + +func (r *mapDatum) Get(i int) types.Field { panic("") } +func (r *mapDatum) SetDefault(i int) {} + +func (r *mapDatum) AppendMap(key string) types.Field { + if r.items == nil { + r.items = make(map[string]Datum) + } + d := DatumForType(r.itemType) + r.items[key] = d + return d +} + +func (r *mapDatum) AppendArray() types.Field { panic("") } + +func (r *mapDatum) NullField(t int) {} +func (r *mapDatum) Finalize() {} diff --git a/v7/generic/primitive.go b/v7/generic/primitive.go new file mode 100644 index 00000000..6cf72ff4 --- /dev/null +++ b/v7/generic/primitive.go @@ -0,0 +1,45 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type primitiveDatum struct { + value interface{} +} + +func (r *primitiveDatum) Datum() interface{} { + return r.value +} + +func (r *primitiveDatum) SetBoolean(v bool) { + r.value = v +} + +func (r *primitiveDatum) SetInt(v int32) { + r.value = v +} +func (r *primitiveDatum) SetLong(v int64) { + r.value = v +} +func (r *primitiveDatum) SetFloat(v float32) { + r.value = v +} +func (r *primitiveDatum) SetDouble(v float64) { + r.value = v +} +func (r *primitiveDatum) SetBytes(v []byte) { + r.value = v +} +func (r *primitiveDatum) SetString(v string) { + r.value = v +} +func (r *primitiveDatum) Get(i int) types.Field { panic("") } +func (r *primitiveDatum) SetDefault(i int) {} + +func (r *primitiveDatum) AppendMap(key string) types.Field { panic("") } + +func (r *primitiveDatum) AppendArray() types.Field { panic("") } + +func (r *primitiveDatum) NullField(t int) {} +func (r *primitiveDatum) Finalize() {} diff --git a/v7/generic/record.go b/v7/generic/record.go new file mode 100644 index 00000000..cf4db82f --- /dev/null +++ b/v7/generic/record.go @@ -0,0 +1,49 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type recordDatum struct { + def *schema.RecordDefinition + fields []Datum +} + +func newRecordDatum(def *schema.RecordDefinition) *recordDatum { + return &recordDatum{ + def: def, + fields: make([]Datum, len(def.Fields())), + } +} + +func (r *recordDatum) Datum() interface{} { + m := make(map[string]interface{}) + for i, f := range r.def.Fields() { + m[f.Name()] = r.fields[i].Datum() + } + return m +} + +func (r *recordDatum) SetBoolean(v bool) { panic("") } +func (r *recordDatum) SetInt(v int32) { panic("") } +func (r *recordDatum) SetLong(v int64) { panic("") } +func (r *recordDatum) SetFloat(v float32) { panic("") } +func (r *recordDatum) SetDouble(v float64) { panic("") } +func (r *recordDatum) SetBytes(v []byte) { panic("") } +func (r *recordDatum) SetString(v string) { panic("") } +func (r *recordDatum) Get(i int) types.Field { + field := r.def.Fields()[i] + r.fields[i] = DatumForType(field.Type()) + return r.fields[i] +} +func (r *recordDatum) SetDefault(i int) { + field := r.def.Fields()[i] + r.fields[i] = &primitiveDatum{field.Default()} +} +func (r *recordDatum) AppendMap(key string) types.Field { panic("") } +func (r *recordDatum) AppendArray() types.Field { panic("") } +func (r *recordDatum) NullField(t int) { + r.fields[t] = &primitiveDatum{nil} +} +func (r *recordDatum) Finalize() {} diff --git a/v7/generic/record_test.go b/v7/generic/record_test.go new file mode 100644 index 00000000..af1a2295 --- /dev/null +++ b/v7/generic/record_test.go @@ -0,0 +1,137 @@ +package generic + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPrimitive(t *testing.T) { + s := []byte(` +{ + "type": "record", + "name": "primitive", + "fields": [ + { + "name": "intfield", + "type": "int", + "default": 1 + }, + { + "name": "bytesfield", + "type": "bytes", + "default": "\u00fe" + } + ] +} +`) + + r := bytes.NewBuffer([]byte{2, 4, byte('h'), byte('i')}) + + codec, err := NewCodecFromSchema(s, s) + assert.NoError(t, err) + + datum, err := codec.Deserialize(r) + assert.NoError(t, err) + + assert.Equal(t, map[string]interface{}{"intfield": int32(1), "bytesfield": []byte{'h', 'i'}}, datum) +} + +func TestComplex(t *testing.T) { + s := []byte(` +{ + "type": "record", + "name": "complex", + "fields": [ + { + "name": "mapfield", + "type": { + "type": "map", + "values": "int" + } + }, + { + "name": "arrayfield", + "type": { + "type": "array", + "items": "string" + } + } + ] +} +`) + r := bytes.NewBuffer([]byte{2, 6, 'k', 'e', 'y', 7, 0, 4, 2, 'a', 2, 'b', 0}) + codec, err := NewCodecFromSchema(s, s) + assert.NoError(t, err) + + datum, err := codec.Deserialize(r) + assert.NoError(t, err) + + assert.Equal(t, map[string]interface{}{"mapfield": map[string]interface{}{"key": int32(-4)}, "arrayfield": []interface{}{"a", "b"}}, datum) +} + +func TestUnion(t *testing.T) { + s := []byte(` +[ + "int", + "long", + "string" +] +`) + codec, err := NewCodecFromSchema(s, s) + assert.NoError(t, err) + + for _, f := range []struct { + data []byte + expected interface{} + }{ + {data: []byte{0, 2}, expected: int32(1)}, + {data: []byte{2, 4}, expected: int64(2)}, + {data: []byte{4, 4, 'h', 'i'}, expected: string("hi")}, + } { + r := bytes.NewBuffer(f.data) + datum, err := codec.Deserialize(r) + assert.NoError(t, err) + + assert.Equal(t, f.expected, datum) + } +} + +func TestEnum(t *testing.T) { +} + +func TestLinkedList(t *testing.T) { + s := []byte(` +{ + "type": "record", + "name": "elem", + "fields": [ + { + "name": "next", + "type": ["null", "elem"] + }, + { + "name": "val", + "type": "int" + } + ] +} +`) + codec, err := NewCodecFromSchema(s, s) + assert.NoError(t, err) + + for _, f := range []struct { + data []byte + expected interface{} + }{ + {data: []byte{0, 1}, expected: map[string]interface{}{"next": nil, "val": int32(-1)}}, + {data: []byte{2, 0, 4, 2}, expected: map[string]interface{}{"next": map[string]interface{}{"next": nil, "val": int32(2)}, "val": int32(1)}}, + } { + r := bytes.NewBuffer(f.data) + datum, err := codec.Deserialize(r) + assert.NoError(t, err) + + assert.Equal(t, f.expected, datum) + } +} diff --git a/v7/generic/serde.go b/v7/generic/serde.go new file mode 100644 index 00000000..5cb416fe --- /dev/null +++ b/v7/generic/serde.go @@ -0,0 +1,49 @@ +package generic + +import ( + "io" + + "github.com/actgardner/gogen-avro/v7/compiler" + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm" +) + +type Codec struct { + t schema.AvroType + deser *vm.Program +} + +func NewCodecFromSchema(writer, reader []byte) (*Codec, error) { + readerType, err := compiler.ParseSchema(reader) + if err != nil { + return nil, err + } + + writerType, err := compiler.ParseSchema(writer) + if err != nil { + return nil, err + } + + return NewCodec(writerType, readerType) +} + +func NewCodec(writer, reader schema.AvroType) (*Codec, error) { + prog, err := compiler.Compile(writer, reader) + if err != nil { + return nil, err + } + + return &Codec{ + t: reader, + deser: prog, + }, nil +} + +func (c *Codec) Deserialize(r io.Reader) (interface{}, error) { + datum := DatumForType(c.t) + err := vm.Eval(r, c.deser, datum) + if err != nil { + return nil, err + } + return datum.Datum(), nil +} diff --git a/v7/generic/union.go b/v7/generic/union.go new file mode 100644 index 00000000..3065421f --- /dev/null +++ b/v7/generic/union.go @@ -0,0 +1,32 @@ +package generic + +import ( + "github.com/actgardner/gogen-avro/v7/schema" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type unionDatum struct { + itemTypes []schema.AvroType + datum Datum +} + +func (r *unionDatum) Datum() interface{} { + return r.datum.Datum() +} + +func (r *unionDatum) SetBoolean(v bool) { panic("") } +func (r *unionDatum) SetInt(v int32) { panic("") } +func (r *unionDatum) SetLong(v int64) {} +func (r *unionDatum) SetFloat(v float32) { panic("") } +func (r *unionDatum) SetDouble(v float64) { panic("") } +func (r *unionDatum) SetBytes(v []byte) { panic("") } +func (r *unionDatum) SetString(v string) { panic("") } +func (r *unionDatum) Get(i int) types.Field { + r.datum = DatumForType(r.itemTypes[i]) + return r.datum +} +func (r *unionDatum) SetDefault(i int) {} +func (r *unionDatum) AppendMap(key string) types.Field { panic("") } +func (r *unionDatum) AppendArray() types.Field { panic("") } +func (r *unionDatum) NullField(t int) { panic("") } +func (r *unionDatum) Finalize() {} diff --git a/v7/go.mod b/v7/go.mod index c3e3ae1b..8ff72c98 100644 --- a/v7/go.mod +++ b/v7/go.mod @@ -4,10 +4,10 @@ go 1.14 require ( github.com/davecgh/go-spew v1.1.1 - github.com/golang/snappy v0.0.1 + github.com/golang/snappy v0.0.2 github.com/linkedin/goavro v2.1.0+incompatible - github.com/linkedin/goavro/v2 v2.9.7 + github.com/linkedin/goavro/v2 v2.9.8 github.com/pmezard/go-difflib v1.0.0 github.com/stretchr/testify v1.6.1 - gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect ) diff --git a/v7/go.sum b/v7/go.sum index eb4b6912..371bcdae 100644 --- a/v7/go.sum +++ b/v7/go.sum @@ -5,11 +5,15 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pO github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw= +github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/linkedin/goavro v1.0.5 h1:6ds0AI8upkEoafDk0a5r9q1p/xRtMq47jCilZYEqbmg= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM= github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= +github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -22,3 +26,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/v7/vm/eval.go b/v7/vm/eval.go index 32be12f5..a8132949 100644 --- a/v7/vm/eval.go +++ b/v7/vm/eval.go @@ -23,7 +23,7 @@ func Eval(r io.Reader, program *Program, target types.Field) (err error) { var pc int defer func() { if r := recover(); r != nil { - err = fmt.Errorf("Panic at pc %v - %v", pc, r) + err = fmt.Errorf("Panic at pc %v - %v (%v)", pc, r, program.Instructions[pc]) } }()