Skip to content
This repository has been archived by the owner on Aug 9, 2018. It is now read-only.

Commit

Permalink
Improve JSON and CBOR readers for multiple reads
Browse files Browse the repository at this point in the history
The reader always read at the beginning of the stream
  • Loading branch information
mildred committed Jan 22, 2016
1 parent 671af91 commit 8ca058f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 26 deletions.
51 changes: 28 additions & 23 deletions coding/cbor_stream.go
Original file line number Diff line number Diff line change
@@ -1,161 +1,166 @@
package ipfsld

import (
"fmt"
"io"
"log"
"math/big"

reader "github.com/ipfs/go-ipld/reader"
cbor "github.com/whyrusleeping/cbor/go"
)

type CBORDecoder struct {
r io.Reader
r io.Reader
pos int64
}

type cborParser struct {
reader.BaseReader
decoder *cbor.Decoder
}

func NewCBORDecoder(r io.Reader) (*CBORDecoder, error) {
s := r.(io.Seeker)
if s == nil {
return &CBORDecoder{r, -1}, nil
} else {
offset, err := s.Seek(0, 1)
if err != nil {
return nil, err
}
return &CBORDecoder{r, offset}, nil
}
}

func (d *CBORDecoder) Read(cb reader.ReadFun) error {
if d.pos == -2 {
return ErrAlreadyRead
} else if d.pos == -1 {
d.pos = -2
} else {
newoffset, err := d.r.(io.Seeker).Seek(d.pos, 0)
if err != nil {
return err
} else if newoffset != d.pos {
return fmt.Errorf("Failed to seek to position %d", d.pos)
}
}
dec := cbor.NewDecoder(d.r)
return dec.DecodeAny(&cborParser{reader.CreateBaseReader(cb), dec})
}

func (p *cborParser) Prepare() error {
log.Printf("Prepare")
return nil
}

func (p *cborParser) SetBytes(buf []byte) error {
log.Printf("SetBytes")
err := p.ExecCallback(reader.TokenValue, buf)
p.Descope()
return err
}

func (p *cborParser) SetUint(i uint64) error {
log.Printf("SetUint")
err := p.ExecCallback(reader.TokenValue, i)
p.Descope()
return err
}

func (p *cborParser) SetInt(i int64) error {
log.Printf("setint")
err := p.ExecCallback(reader.TokenValue, i)
p.Descope()
return err
}

func (p *cborParser) SetFloat32(f float32) error {
log.Printf("setfloat32")
err := p.ExecCallback(reader.TokenValue, f)
p.Descope()
return err
}

func (p *cborParser) SetFloat64(f float64) error {
log.Printf("setfloat64")
err := p.ExecCallback(reader.TokenValue, f)
p.Descope()
return err
}

func (p *cborParser) SetBignum(i *big.Int) error {
log.Printf("setbignum")
err := p.ExecCallback(reader.TokenValue, i)
p.Descope()
return err
}

func (p *cborParser) SetNil() error {
log.Printf("nil")
err := p.ExecCallback(reader.TokenValue, nil)
p.Descope()
return err
}

func (p *cborParser) SetBool(b bool) error {
log.Printf("setbool")
err := p.ExecCallback(reader.TokenValue, b)
p.Descope()
return err
}

func (p *cborParser) SetString(s string) error {
log.Printf("setstring")
err := p.ExecCallback(reader.TokenValue, s)
p.Descope()
return err
}

func (p *cborParser) CreateMap() (cbor.DecodeValueMap, error) {
log.Printf("createmap")
return p, p.ExecCallback(reader.TokenNode, nil)
}

func (p *cborParser) CreateMapKey() (cbor.DecodeValue, error) {
log.Printf("createmapkey")
return cbor.NewMemoryValue(""), nil
}

func (p *cborParser) CreateMapValue(key cbor.DecodeValue) (cbor.DecodeValue, error) {
log.Printf("createmapvalue")
err := p.ExecCallback(reader.TokenKey, key.(*cbor.MemoryValue).Value)
p.Descope()
p.PushPath(key.(*cbor.MemoryValue).Value)
return p, err
}

func (p *cborParser) SetMap(key, val cbor.DecodeValue) error {
log.Printf("setmap")
p.PopPath()
return nil
}

func (p *cborParser) EndMap() error {
log.Printf("endmap")
err := p.ExecCallback(reader.TokenEndNode, nil)
p.Descope()
p.Descope()
return err
}

func (p *cborParser) CreateArray(len int) (cbor.DecodeValueArray, error) {
log.Printf("createarray")
return p, p.ExecCallback(reader.TokenArray, nil)
}

func (p *cborParser) GetArrayValue(index uint64) (cbor.DecodeValue, error) {
log.Printf("getarrvalue")
err := p.ExecCallback(reader.TokenIndex, index)
p.Descope()
p.PushPath(index)
return p, err
}

func (p *cborParser) AppendArray(val cbor.DecodeValue) error {
log.Printf("appendarray")
p.PopPath()
return nil
}

func (p *cborParser) EndArray() error {
log.Printf("endarray")
err := p.ExecCallback(reader.TokenEndArray, nil)
p.Descope()
p.Descope()
return err
}

func (p *cborParser) CreateTag(tag uint64, decoder cbor.TagDecoder) (cbor.DecodeValue, interface{}, error) {
log.Printf("createtag")
return p, nil, nil
}

func (p *cborParser) SetTag(tag uint64, decval cbor.DecodeValue, decoder cbor.TagDecoder, val interface{}) error {
log.Printf("settag")
return nil
}
9 changes: 7 additions & 2 deletions coding/coding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,27 @@ var defaultCodec string

var muxCodec *mcmux.Multicodec

var ErrAlreadyRead error = fmt.Errorf("Stream already read: unable to read it a second time")

func init() {
HeaderPath = "/mdagv1"
Header = mc.Header([]byte(HeaderPath))

// by default, always encode things as cbor
defaultCodec = string(mc.HeaderPath(mccbor.Header))

muxCodec = mcmux.MuxMulticodec([]mc.Multicodec{
CborMulticodec(),
JsonMulticodec(),
pb.Multicodec(),
}, selectCodec)

StreamCodecs = map[string]func(io.Reader) (reader.NodeReader, error){
mcjson.HeaderPath: func(r io.Reader) (reader.NodeReader, error) {
return &JSONDecoder{r}, nil
return NewJSONDecoder(r)
},
mccbor.HeaderPath: func(r io.Reader) (reader.NodeReader, error) {
return &CBORDecoder{r}, nil
return NewCBORDecoder(r)
},
}
}
Expand Down
28 changes: 27 additions & 1 deletion coding/json_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,41 @@ import (
)

type JSONDecoder struct {
r io.Reader
r io.Reader
pos int64
}

type jsonParser struct {
reader.BaseReader
decoder *json.Decoder
}

func NewJSONDecoder(r io.Reader) (*JSONDecoder, error) {
s := r.(io.Seeker)
if s == nil {
return &JSONDecoder{r, -1}, nil
} else {
offset, err := s.Seek(0, 1)
if err != nil {
return nil, err
}
return &JSONDecoder{r, offset}, nil
}
}

func (d *JSONDecoder) Read(cb reader.ReadFun) error {
if d.pos == -2 {
return ErrAlreadyRead
} else if d.pos == -1 {
d.pos = -2
} else {
newoffset, err := d.r.(io.Seeker).Seek(d.pos, 0)
if err != nil {
return err
} else if newoffset != d.pos {
return fmt.Errorf("Failed to seek to position %d", d.pos)
}
}
jsonParser := &jsonParser{
reader.CreateBaseReader(cb),
json.NewDecoder(d.r),
Expand Down

0 comments on commit 8ca058f

Please sign in to comment.