Skip to content

Commit

Permalink
Add support for canonical form and Avro 64-bit CRC
Browse files Browse the repository at this point in the history
  • Loading branch information
actgardner committed Mar 9, 2020
1 parent 0cfc926 commit aac6fb6
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 0 deletions.
96 changes: 96 additions & 0 deletions schema/canonical/canonical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package canonical

import (
"fmt"

"github.com/actgardner/gogen-avro/schema"
)

type CanonicalFields struct {
Name *string `json:"name,omitempty"`
Type interface{} `json:"type,omitempty"`
Fields []interface{} `json:"fields,omitempty"`
Symbols []string `json:"symbols,omitempty"`
Items interface{} `json:"items,omitempty"`
Values interface{} `json:"values,omitempty"`
Size *int `json:"size,omitempty"`
}

func CanonicalForm(t schema.AvroType) interface{} {
return canonicalForm(t, make(map[string]interface{}))
}

func canonicalForm(t schema.AvroType, visited map[string]interface{}) interface{} {
switch v := t.(type) {
case *schema.BoolField:
return "boolean"
case *schema.BytesField:
return "bytes"
case *schema.DoubleField:
return "double"
case *schema.FloatField:
return "float"
case *schema.IntField:
return "int"
case *schema.LongField:
return "long"
case *schema.NullField:
return "null"
case *schema.StringField:
return "string"
case *schema.UnionField:
members := make([]interface{}, 0)
for _, m := range v.AvroTypes() {
members = append(members, canonicalForm(m, visited))
}
return members
case *schema.ArrayField:
return &CanonicalFields{
Type: "array",
Items: canonicalForm(v.ItemType(), visited),
}
case *schema.MapField:
return &CanonicalFields{
Type: "map",
Values: canonicalForm(v.ItemType(), visited),
}
case *schema.Reference:
name := v.Def.AvroName().String()
if _, ok := visited[name]; ok {
return name
} else {
visited[name] = true
switch def := v.Def.(type) {
case *schema.RecordDefinition:
fields := make([]interface{}, 0)
for _, f := range def.Fields() {
fn := f.Name()
fields = append(fields, &CanonicalFields{
Name: &fn,
Type: canonicalForm(f.Type(), visited),
})
}

return &CanonicalFields{
Name: &name,
Type: "record",
Fields: fields,
}
case *schema.EnumDefinition:
return &CanonicalFields{
Name: &name,
Type: "enum",
Symbols: def.Symbols(),
}
case *schema.FixedDefinition:
size := def.SizeBytes()
return &CanonicalFields{
Name: &name,
Type: "fixed",
Size: &size,
}
}
}
}
panic(fmt.Sprintf("Unkonwn type: %T", t))
}
73 changes: 73 additions & 0 deletions schema/canonical/canonical_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package canonical

import (
"encoding/json"
"testing"

"github.com/actgardner/gogen-avro/parser"
"github.com/actgardner/gogen-avro/resolver"
"github.com/stretchr/testify/assert"
)

const inputSchema = `
{
"type": "record",
"name": "Interop",
"namespace": "org.apache.avro",
"fields": [
{"name": "intField", "type": "int"},
{"name": "longField", "type": "long"},
{"name": "stringField", "type": "string"},
{"name": "boolField", "type": "boolean"},
{"name": "floatField", "type": "float"},
{"name": "doubleField", "type": "double"},
{"name": "bytesField", "type": "bytes"},
{"name": "nullField", "type": "null"},
{"name": "arrayField", "type": {"type": "array", "items": "double"}},
{
"name": "mapField",
"type": {
"type": "map",
"values": {"name": "Foo",
"type": "record",
"fields": [{"name": "label", "type": "string"}]}
}
},
{
"name": "unionField",
"type": ["boolean", "double", {"type": "array", "items": "bytes"}]
},
{
"name": "enumField",
"type": {"type": "enum", "name": "Kind", "symbols": ["A", "B", "C"]}
},
{
"name": "fixedField",
"type": {"type": "fixed", "name": "MD5", "size": 16}
},
{
"name": "recordField",
"type": {"type": "record",
"name": "Node",
"fields": [{"name": "label", "type": "string"},
{"name": "children",
"type": {"type": "array",
"items": "Node"}}]}
}
]
}
`

const expected = `{"name":"org.apache.avro.Interop","type":"record","fields":[{"name":"intField","type":"int"},{"name":"longField","type":"long"},{"name":"stringField","type":"string"},{"name":"boolField","type":"boolean"},{"name":"floatField","type":"float"},{"name":"doubleField","type":"double"},{"name":"bytesField","type":"bytes"},{"name":"nullField","type":"null"},{"name":"arrayField","type":{"type":"array","items":"double"}},{"name":"mapField","type":{"type":"map","values":{"name":"org.apache.avro.Foo","type":"record","fields":[{"name":"label","type":"string"}]}}},{"name":"unionField","type":["boolean","double",{"type":"array","items":"bytes"}]},{"name":"enumField","type":{"name":"org.apache.avro.Kind","type":"enum","symbols":["A","B","C"]}},{"name":"fixedField","type":{"name":"org.apache.avro.MD5","type":"fixed","size":16}},{"name":"recordField","type":{"name":"org.apache.avro.Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"org.apache.avro.Node"}}]}}]}`

func TestCanonicalForm(t *testing.T) {
ns := parser.NewNamespace(false)
s, err := ns.TypeForSchema([]byte(inputSchema))
assert.Nil(t, err)
for _, def := range ns.Roots {
assert.Nil(t, resolver.ResolveDefinition(def, ns.Definitions))
}
canonical, err := json.Marshal(CanonicalForm(s))
assert.Nil(t, err)
assert.Equal(t, expected, string(canonical))
}
57 changes: 57 additions & 0 deletions schema/canonical/crc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package canonical

import (
"bytes"
"encoding/binary"
"encoding/hex"
)

// IMplementation of Avro's 64-bit Rabin code for fingerprinting schemas

/*
long fingerprint64(byte[] buf) {
if (FP_TABLE == null) initFPTable();
long fp = EMPTY;
for (int i = 0; i < buf.length; i++)
fp = (fp >>> 8) ^ FP_TABLE[(int)(fp ^ buf[i]) & 0xff];
return fp;
}
static long EMPTY = 0xc15d213aa4d7a795L;
static long[] FP_TABLE = null;
void initFPTable() {
FP_TABLE = new long[256];
for (int i = 0; i < 256; i++) {
long fp = i;
for (int j = 0; j < 8; j++)
fp = (fp >>> 1) ^ (EMPTY & -(fp & 1L));
FP_TABLE[i] = fp;
}
}
*/

var FP_TABLE []uint64

const EMPTY uint64 = 0xc15d213aa4d7a795

func init() {
FP_TABLE = make([]uint64, 256)
for i := range FP_TABLE {
fp := uint64(i)
for j := 0; j < 8; j++ {
fp = (fp >> 1) ^ (EMPTY & -(fp & 1))
}
FP_TABLE[i] = fp
}
}

func AvroCRC64Fingerprint(data []byte) string {
fp := EMPTY
for _, d := range data {
fp = (fp >> 8) ^ FP_TABLE[(fp^uint64(d))&0xff]
}
output := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(output, binary.LittleEndian, fp)
return hex.EncodeToString(output.Bytes())
}
24 changes: 24 additions & 0 deletions schema/canonical/crc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package canonical

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAvroCRC64Fingerprint(t *testing.T) {
cases := []struct {
schema string
fingerprint string
}{
{`"int"`, "8f5c393f1ad57572"},
{`"float"`, "90d7a83ecb027c4d"},
{`"long"`, "b71df49344e154d0"},
{`"double"`, "7e95ab32c035758e"},
{`"bytes"`, "651920c3da16c04f"},
}

for _, c := range cases {
assert.Equal(t, c.fingerprint, AvroCRC64Fingerprint([]byte(c.schema)))
}
}

0 comments on commit aac6fb6

Please sign in to comment.