Skip to content
This repository has been archived by the owner on Aug 9, 2018. It is now read-only.

Commit

Permalink
Correct race in iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
mildred committed Feb 10, 2016
1 parent 1dfacbd commit a741c35
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions stream/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package stream
import (
"fmt"
"math/big"
"sync/atomic"
)

type NodeIterator struct {
*ReadItem
LastError error
needFeedback bool
needFeedback atomic.Value
items chan *ReadItem
feedback chan error
feedbackClosed bool
Expand Down Expand Up @@ -156,16 +157,17 @@ func (s *ReadItem) ToFloat() (res float64, ok bool) {
func Iterate(r NodeReader, res_error *error) *NodeIterator {
it := make(chan *ReadItem)
feedback := make(chan error)
res := &NodeIterator{&ReadItem{}, nil, false, it, feedback, false}
res := &NodeIterator{&ReadItem{}, nil, atomic.Value{}, it, feedback, false}
res.needFeedback.Store(false)

go func() {
err := r.Read(func(path []interface{}, tokenType ReaderToken, value interface{}) error {
res.needFeedback = true
res.needFeedback.Store(true)
item := &ReadItem{path, tokenType, value}
it <- item
return <-feedback
})
res.needFeedback = false
res.needFeedback.Store(false)
close(it)
if err != nil && res_error != nil {
res.LastError = err
Expand All @@ -176,7 +178,7 @@ func Iterate(r NodeReader, res_error *error) *NodeIterator {
}

func (s *NodeIterator) Iter() bool {
if s.needFeedback {
if s.needFeedback.Load().(bool) {
s.feedback <- nil
}

Expand All @@ -202,16 +204,16 @@ func (s *NodeIterator) Skip() {
}

func (s *NodeIterator) StopError(e error) {
if !s.needFeedback {
if !s.needFeedback.Load().(bool) {
panic("Already stopped")
}

s.feedback <- e
s.needFeedback = false
s.needFeedback.Store(false)
}

func (s *NodeIterator) Close() error {
if s.needFeedback {
if s.needFeedback.Load().(bool) {
s.Abort()
}
for s.Iter() {
Expand Down

0 comments on commit a741c35

Please sign in to comment.