diff --git a/coding/Makefile b/coding/Makefile index 734bf30..3e2f44a 100644 --- a/coding/Makefile +++ b/coding/Makefile @@ -1,10 +1,22 @@ +bin/multicodec: + mkdir -p bin + go get -d github.com/jbenet/go-multicodec/multicodec + go build -o "$@" github.com/jbenet/go-multicodec/multicodec -pb/bin/multicodec: - $(MAKE) -C pb bin/multicodec +bin/json2cbor: + mkdir -p bin + go get -d github.com/whyrusleeping/cbor/go/json2cbor + go build -o "$@" github.com/whyrusleeping/cbor/go/json2cbor -json.testfile: pb/bin/multicodec Makefile +json.testfile: bin/multicodec Makefile test1.json : >$@ - pb/bin/multicodec header /multicodec >>$@ - pb/bin/multicodec header /json >>$@ - echo '{"@codec":"/json","abc":{"mlink":"QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39V"}}' >>$@ + bin/multicodec header /multicodec >>$@ + bin/multicodec header /json >>$@ + cat test1.json >>$@ + +cbor.testfile: bin/multicodec bin/json2cbor Makefile test1.json + : >$@ + bin/multicodec header /multicodec >>$@ + bin/multicodec header /cbor >>$@ + bin/json2cbor -i test1.json -o - >>$@ diff --git a/coding/cbor.testfile b/coding/cbor.testfile new file mode 100644 index 0000000..79e18c6 --- /dev/null +++ b/coding/cbor.testfile @@ -0,0 +1,3 @@ + /multicodec +/cbor +¢cabc¡emlinkx.QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39Vf@codece/json \ No newline at end of file diff --git a/coding/cbor_stream.go b/coding/cbor_stream.go new file mode 100644 index 0000000..33728e6 --- /dev/null +++ b/coding/cbor_stream.go @@ -0,0 +1,161 @@ +package ipfsld + +import ( + "io" + "log" + "math/big" + + reader "github.com/ipfs/go-ipld/reader" + cbor "github.com/whyrusleeping/cbor/go" +) + +type CBORDecoder struct { + r io.Reader +} + +type cborParser struct { + reader.BaseReader + decoder *cbor.Decoder +} + +func (d *CBORDecoder) Read(cb reader.ReadFun) error { + dec := cbor.NewDecoder(d.r) + return dec.DecodeAny(&cborParser{reader.CreateBaseReader(cb), dec}) +} + +func (p *cborParser) Prepare() error { + log.Printf("Prepare") + return nil +} + +func (p *cborParser) SetBytes(buf []byte) error { + log.Printf("SetBytes") + err := p.ExecCallback(reader.TokenValue, buf) + p.Descope() + return err +} + +func (p *cborParser) SetUint(i uint64) error { + log.Printf("SetUint") + err := p.ExecCallback(reader.TokenValue, i) + p.Descope() + return err +} + +func (p *cborParser) SetInt(i int64) error { + log.Printf("setint") + err := p.ExecCallback(reader.TokenValue, i) + p.Descope() + return err +} + +func (p *cborParser) SetFloat32(f float32) error { + log.Printf("setfloat32") + err := p.ExecCallback(reader.TokenValue, f) + p.Descope() + return err +} + +func (p *cborParser) SetFloat64(f float64) error { + log.Printf("setfloat64") + err := p.ExecCallback(reader.TokenValue, f) + p.Descope() + return err +} + +func (p *cborParser) SetBignum(i *big.Int) error { + log.Printf("setbignum") + err := p.ExecCallback(reader.TokenValue, i) + p.Descope() + return err +} + +func (p *cborParser) SetNil() error { + log.Printf("nil") + err := p.ExecCallback(reader.TokenValue, nil) + p.Descope() + return err +} + +func (p *cborParser) SetBool(b bool) error { + log.Printf("setbool") + err := p.ExecCallback(reader.TokenValue, b) + p.Descope() + return err +} + +func (p *cborParser) SetString(s string) error { + log.Printf("setstring") + err := p.ExecCallback(reader.TokenValue, s) + p.Descope() + return err +} + +func (p *cborParser) CreateMap() (cbor.DecodeValueMap, error) { + log.Printf("createmap") + return p, p.ExecCallback(reader.TokenNode, nil) +} + +func (p *cborParser) CreateMapKey() (cbor.DecodeValue, error) { + log.Printf("createmapkey") + return cbor.NewMemoryValue(""), nil +} + +func (p *cborParser) CreateMapValue(key cbor.DecodeValue) (cbor.DecodeValue, error) { + log.Printf("createmapvalue") + err := p.ExecCallback(reader.TokenKey, key.(*cbor.MemoryValue).Value) + p.Descope() + p.PushPath(key.(*cbor.MemoryValue).Value) + return p, err +} + +func (p *cborParser) SetMap(key, val cbor.DecodeValue) error { + log.Printf("setmap") + p.PopPath() + return nil +} + +func (p *cborParser) EndMap() error { + log.Printf("endmap") + err := p.ExecCallback(reader.TokenEndNode, nil) + p.Descope() + p.Descope() + return err +} + +func (p *cborParser) CreateArray(len int) (cbor.DecodeValueArray, error) { + log.Printf("createarray") + return p, p.ExecCallback(reader.TokenArray, nil) +} + +func (p *cborParser) GetArrayValue(index uint64) (cbor.DecodeValue, error) { + log.Printf("getarrvalue") + err := p.ExecCallback(reader.TokenIndex, index) + p.Descope() + p.PushPath(index) + return p, err +} + +func (p *cborParser) AppendArray(val cbor.DecodeValue) error { + log.Printf("appendarray") + p.PopPath() + return nil +} + +func (p *cborParser) EndArray() error { + log.Printf("endarray") + err := p.ExecCallback(reader.TokenEndArray, nil) + p.Descope() + p.Descope() + return err +} + +func (p *cborParser) CreateTag(tag uint64, decoder cbor.TagDecoder) (cbor.DecodeValue, interface{}, error) { + log.Printf("createtag") + return p, nil, nil +} + +func (p *cborParser) SetTag(tag uint64, decval cbor.DecodeValue, decoder cbor.TagDecoder, val interface{}) error { + log.Printf("settag") + return nil +} diff --git a/coding/coding.go b/coding/coding.go index ac4083e..0bd7632 100644 --- a/coding/coding.go +++ b/coding/coding.go @@ -1,14 +1,28 @@ package ipfsld import ( + "fmt" + "io" + mc "github.com/jbenet/go-multicodec" mccbor "github.com/jbenet/go-multicodec/cbor" + mcjson "github.com/jbenet/go-multicodec/json" mcmux "github.com/jbenet/go-multicodec/mux" ipld "github.com/ipfs/go-ipld" pb "github.com/ipfs/go-ipld/coding/pb" + reader "github.com/ipfs/go-ipld/reader" ) +var StreamCodecs map[string]func(io.Reader) (reader.NodeReader, error) = map[string]func(io.Reader) (reader.NodeReader, error){ + mcjson.HeaderPath: func(r io.Reader) (reader.NodeReader, error) { + return &JSONDecoder{r}, nil + }, + mccbor.HeaderPath: func(r io.Reader) (reader.NodeReader, error) { + return &CBORDecoder{r}, nil + }, +} + // defaultCodec is the default applied if user does not specify a codec. // Most new objects will never specify a codec. We track the codecs with // the object so that multiple people using the same object will continue @@ -26,6 +40,14 @@ func init() { jsonMulticodec(), pb.Multicodec(), }, selectCodec) + StreamCodecs = map[string]func(io.Reader) (reader.NodeReader, error){ + mcjson.HeaderPath: func(r io.Reader) (reader.NodeReader, error) { + return &JSONDecoder{r}, nil + }, + mccbor.HeaderPath: func(r io.Reader) (reader.NodeReader, error) { + return &CBORDecoder{r}, nil + }, + } } // Multicodec returns a muxing codec that marshals to @@ -73,3 +95,23 @@ func codecKey(n ipld.Node) (string, error) { return chdrs, nil } + +func Decode(r io.Reader) (reader.NodeReader, error) { + if err := mc.ConsumeHeader(r, mcmux.Header); err != nil { + return nil, err + } + + // get next header, to select codec + hdr, err := mc.ReadHeader(r) + if err != nil { + return nil, err + } + + hdrPath := string(mc.HeaderPath(hdr)) + + fun, ok := StreamCodecs[hdrPath] + if !ok { + return nil, fmt.Errorf("no codec for %s", hdr) + } + return fun(r) +} diff --git a/coding/coding_test.go b/coding/coding_test.go index a56712c..ab9ceac 100644 --- a/coding/coding_test.go +++ b/coding/coding_test.go @@ -1,18 +1,22 @@ package ipfsld import ( + "bytes" "io/ioutil" - "testing" "reflect" - "bytes" + "testing" ipld "github.com/ipfs/go-ipld" + reader "github.com/ipfs/go-ipld/reader" + readertest "github.com/ipfs/go-ipld/reader/test" mc "github.com/jbenet/go-multicodec" mctest "github.com/jbenet/go-multicodec/test" + assrt "github.com/mildred/assrt" ) var json_testfile []byte +var cbor_testfile []byte func init() { var err error @@ -20,6 +24,10 @@ func init() { if err != nil { panic("could not read json.testfile. please run: make json.testfile") } + cbor_testfile, err = ioutil.ReadFile("cbor.testfile") + if err != nil { + panic("could not read cbor.testfile. please run: make cbor.testfile") + } } type TC struct { @@ -113,7 +121,7 @@ func TestJsonDecodeEncode(t *testing.T) { } linksExpected := map[string]ipld.Link{ - "abc": ipld.Link { + "abc": ipld.Link{ "mlink": "QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39V", }, } @@ -140,3 +148,37 @@ func TestJsonDecodeEncode(t *testing.T) { } } +func TestStream(t *testing.T) { + a := assrt.NewAssert(t) + json, err := Decode(bytes.NewReader(json_testfile)) + a.MustNil(err) + + t.Logf("Reading json.testfile") + readertest.CheckReader(t, json, []readertest.Callback{ + readertest.Callback{[]interface{}{}, reader.TokenNode, nil}, + readertest.Callback{[]interface{}{}, reader.TokenKey, "@codec"}, + readertest.Callback{[]interface{}{"@codec"}, reader.TokenValue, "/json"}, + readertest.Callback{[]interface{}{}, reader.TokenKey, "abc"}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenNode, nil}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenKey, "mlink"}, + readertest.Callback{[]interface{}{"abc", "mlink"}, reader.TokenValue, "QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39V"}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenEndNode, nil}, + readertest.Callback{[]interface{}{}, reader.TokenEndNode, nil}, + }) + + cbor, err := Decode(bytes.NewReader(cbor_testfile)) + a.MustNil(err) + + t.Logf("Reading cbor.testfile") + readertest.CheckReader(t, cbor, []readertest.Callback{ + readertest.Callback{[]interface{}{}, reader.TokenNode, nil}, + readertest.Callback{[]interface{}{}, reader.TokenKey, "abc"}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenNode, nil}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenKey, "mlink"}, + readertest.Callback{[]interface{}{"abc", "mlink"}, reader.TokenValue, "QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39V"}, + readertest.Callback{[]interface{}{"abc"}, reader.TokenEndNode, nil}, + readertest.Callback{[]interface{}{}, reader.TokenKey, "@codec"}, + readertest.Callback{[]interface{}{"@codec"}, reader.TokenValue, "/json"}, + readertest.Callback{[]interface{}{}, reader.TokenEndNode, nil}, + }) +} diff --git a/coding/json_stream.go b/coding/json_stream.go new file mode 100644 index 0000000..d7401d3 --- /dev/null +++ b/coding/json_stream.go @@ -0,0 +1,164 @@ +package ipfsld + +import ( + "encoding/json" + "fmt" + "io" + + reader "github.com/ipfs/go-ipld/reader" +) + +type JSONDecoder struct { + r io.Reader +} + +type jsonParser struct { + reader.BaseReader + decoder *json.Decoder +} + +func (d *JSONDecoder) Read(cb reader.ReadFun) error { + jsonParser := &jsonParser{ + reader.CreateBaseReader(cb), + json.NewDecoder(d.r), + } + err := jsonParser.readValue() + if err == reader.NodeReadAbort { + err = nil + } + return err +} + +func (p *jsonParser) readValue() error { + token, err := p.decoder.Token() + if err != nil { + return err + } + //log.Printf("JSON: read token value %#v %T", token, token) + if delim, ok := token.(json.Delim); ok { + switch delim { + case '{': + err = p.ExecCallback(reader.TokenNode, nil) + if err != nil { + p.Descope() + return err + } + err = p.readNode() + if err != nil { + p.Descope() + return err + } + err = p.ExecCallback(reader.TokenEndNode, nil) + p.Descope() + p.Descope() + return err + break + case '[': + err = p.ExecCallback(reader.TokenArray, nil) + if err != nil { + p.Descope() + return err + } + err = p.readArray() + if err != nil { + p.Descope() + return err + } + err = p.ExecCallback(reader.TokenEndArray, nil) + p.Descope() + p.Descope() + return err + break + default: + return fmt.Errorf("JSON: unexpected delimiter token %#v", token.(json.Delim)) + } + } else { + switch token.(type) { + + case json.Number: + intValue, err1 := token.(json.Number).Int64() + if err1 != nil { + token = intValue + } else { + token, err = token.(json.Number).Float64() + if err != nil { + return fmt.Errorf("JSON: failed to convert %v to float64: %v", token.(json.Number), err) + } + } + case float64: + if sintValue := int(token.(float64)); token.(float64) == float64(sintValue) { + token = sintValue + } else if intValue := int64(token.(float64)); token.(float64) == float64(intValue) { + token = intValue + } else if uintValue := uint64(token.(float64)); token.(float64) == float64(uintValue) { + token = uintValue + } + case string: + case bool: + case nil: + default: + return fmt.Errorf("JSON: Unexpected token %#v", token) + } + err := p.ExecCallback(reader.TokenValue, token) + p.Descope() + if err != nil { + return err + } + } + return nil +} + +func (p *jsonParser) readNode() error { + for { + token, err := p.decoder.Token() + if err != nil { + return err + } + //log.Printf("JSON: read token node %#v %T", token, token) + + if delim, ok := token.(json.Delim); ok && delim == '}' { + return nil + } + + strValue, isStr := token.(string) + if !isStr { + return fmt.Errorf("JSON: expect string for object key: got %#v", token) + } + err = p.ExecCallback(reader.TokenKey, strValue) + p.Descope() + if err != nil { + return err + } + + p.PushPath(strValue) + err = p.readValue() + p.PopPath() + if err != nil { + return err + } + } +} + +func (p *jsonParser) readArray() error { + var index int = 0 + for { + token, err := p.decoder.Token() + if err != nil { + return err + } + //log.Printf("JSON: read token array %#v %T", token, token) + + if delim, ok := token.(json.Delim); ok && delim == ']' { + return nil + } + + p.PushPath(index) + err = p.readValue() + p.PopPath() + if err != nil { + return err + } + + index++ + } +} diff --git a/coding/pb/Makefile b/coding/pb/Makefile index 9c635b2..a3a654e 100644 --- a/coding/pb/Makefile +++ b/coding/pb/Makefile @@ -10,18 +10,16 @@ clean: rm -f *.pb.go rm -f *.go -testfile: bin/multicodec bin/msgio - bin/multicodec header /mdagv1 >testfile - bin/multicodec header /protobuf/msgio >>testfile +testfile: ../bin/multicodec bin/msgio + ../bin/multicodec header /mdagv1 >testfile + ../bin/multicodec header /protobuf/msgio >>testfile hash=`ipfs add -q -r . | tail -n1` && \ ipfs object get "$$hash" --enc=protobuf | bin/msgio wrap >>testfile -bin/multicodec: - mkdir -p bin - go get -d github.com/jbenet/go-multicodec/multicodec - go build -o "$@" github.com/jbenet/go-multicodec/multicodec - bin/msgio: mkdir -p bin go get -d github.com/jbenet/go-msgio/msgio go build -o "$@" github.com/jbenet/go-msgio/msgio + +../bin/multicodec: + $(MAKE) -C .. bin/multicodec diff --git a/coding/test1.json b/coding/test1.json new file mode 100644 index 0000000..615aa1e --- /dev/null +++ b/coding/test1.json @@ -0,0 +1 @@ +{"@codec":"/json","abc":{"mlink":"QmXg9Pp2ytZ14xgmQjYEiHjVjMFXzCVVEcRTWJBmLgR39V"}} diff --git a/reader/base_reader.go b/reader/base_reader.go new file mode 100644 index 0000000..8124244 --- /dev/null +++ b/reader/base_reader.go @@ -0,0 +1,48 @@ +package reader + +type BaseReader struct { + Callback ReadFun + cbEnabled []bool + path []interface{} + tokens []ReaderToken +} + +func CreateBaseReader(cb ReadFun) BaseReader { + return BaseReader{cb, []bool{}, []interface{}{}, []ReaderToken{}} +} + +// Executes the callback and stay in scope for sub elements +func (p *BaseReader) ExecCallback(token ReaderToken, value interface{}) error { + var err error + enabled := !p.Skipping() + if enabled { + err = p.Callback(p.path, token, value) + enabled = err != NodeReadSkip + } + p.cbEnabled = append(p.cbEnabled, enabled) + return err +} + +// Return true if a parent callback wants to skip processing of its children +func (p *BaseReader) Skipping() bool { + enabled := true + if len(p.cbEnabled) > 0 { + enabled = p.cbEnabled[len(p.cbEnabled)-1] + } + return !enabled +} + +// Must be called after all sub elements below a ExecCallback are processed +func (p *BaseReader) Descope() { + p.cbEnabled = p.cbEnabled[:len(p.cbEnabled)-1] +} + +// Push a path element +func (p *BaseReader) PushPath(elem interface{}) { + p.path = append(p.path, elem) +} + +// Pop a path element +func (p *BaseReader) PopPath() { + p.path = p.path[:len(p.path)-1] +}