Skip to content
This repository has been archived by the owner on Aug 9, 2018. It is now read-only.

Commit

Permalink
Ensure only one NodeReader is active at a single time
Browse files Browse the repository at this point in the history
  • Loading branch information
mildred committed Feb 17, 2016
1 parent ec90690 commit e4f6be6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
11 changes: 8 additions & 3 deletions coding/cbor/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"math/big"
"sync"

links "github.com/ipfs/go-ipld/links"
reader "github.com/ipfs/go-ipld/stream"
Expand All @@ -25,8 +26,9 @@ func init() {
}

type CBORDecoder struct {
r io.ReadSeeker
pos int64
r io.ReadSeeker
pos int64
lock sync.Mutex
}

type cborParser struct {
Expand All @@ -40,10 +42,13 @@ func NewCBORDecoder(r io.ReadSeeker) (*CBORDecoder, error) {
if err != nil {
return nil, err
}
return &CBORDecoder{r, offset}, nil
return &CBORDecoder{r, offset, sync.Mutex{}}, nil
}

func (d *CBORDecoder) Read(cb reader.ReadFun) error {
d.lock.Lock()
defer d.lock.Unlock()

if d.pos == -2 {
return ErrAlreadyRead
} else if d.pos == -1 {
Expand Down
26 changes: 13 additions & 13 deletions coding/json/stream-go1.4.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ipfs/go-ipld/Godeps/_workspace/src/github.com/golang/go/src/encoding/json"
"fmt"
"io"
"sync"

reader "github.com/ipfs/go-ipld/stream"
mc "github.com/jbenet/go-multicodec"
Expand All @@ -24,35 +25,34 @@ func init() {
}

type JSONDecoder struct {
r io.Reader
pos int64
r io.ReadSeeker
pos int64
lock sync.Mutex
}

type jsonParser struct {
reader.BaseReader
decoder *json.Decoder
}

func NewJSONDecoder(r io.Reader) (*JSONDecoder, error) {
s := r.(io.Seeker)
if s == nil {
return &JSONDecoder{r, -1}, nil
} else {
offset, err := s.Seek(0, 1)
if err != nil {
return nil, err
}
return &JSONDecoder{r, offset}, nil
func NewJSONDecoder(r io.ReadSeeker) (*JSONDecoder, error) {
offset, err := r.Seek(0, 1)
if err != nil {
return nil, err
}
return &JSONDecoder{r, offset, sync.Mutex{}}, nil
}

func (d *JSONDecoder) Read(cb reader.ReadFun) error {
d.lock.Lock()
defer d.lock.Unlock()

if d.pos == -2 {
return ErrAlreadyRead
} else if d.pos == -1 {
d.pos = -2
} else {
newoffset, err := d.r.(io.Seeker).Seek(d.pos, 0)
newoffset, err := d.r.Seek(d.pos, 0)
if err != nil {
return err
} else if newoffset != d.pos {
Expand Down
11 changes: 8 additions & 3 deletions coding/json/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"sync"

reader "github.com/ipfs/go-ipld/stream"
mc "github.com/jbenet/go-multicodec"
Expand All @@ -24,8 +25,9 @@ func init() {
}

type JSONDecoder struct {
r io.ReadSeeker
pos int64
r io.ReadSeeker
pos int64
lock sync.Mutex
}

type jsonParser struct {
Expand All @@ -38,10 +40,13 @@ func NewJSONDecoder(r io.ReadSeeker) (*JSONDecoder, error) {
if err != nil {
return nil, err
}
return &JSONDecoder{r, offset}, nil
return &JSONDecoder{r, offset, sync.Mutex{}}, nil
}

func (d *JSONDecoder) Read(cb reader.ReadFun) error {
d.lock.Lock()
defer d.lock.Unlock()

if d.pos == -2 {
return ErrAlreadyRead
} else if d.pos == -1 {
Expand Down

0 comments on commit e4f6be6

Please sign in to comment.