Skip to content

Commit

Permalink
Merge pull request #620 from Unknwon/dataflow-readline-timeout
Browse files Browse the repository at this point in the history
mgr: get raw data ReadLine change to use timeout
  • Loading branch information
wonderflow authored Jul 20, 2018
2 parents 66480a8 + eb85c96 commit a3cd663
Showing 1 changed file with 32 additions and 38 deletions.
70 changes: 32 additions & 38 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/json-iterator/go"
Expand Down Expand Up @@ -51,17 +52,15 @@ func RawData(readerConfig conf.MapConf) (string, error) {
os.RemoveAll(metaPath)
}()

var rawData string

// DataReader 设定超时,普通 Reader 设定重试
if dr, ok := rd.(reader.DataReader); ok {
type dataErr struct {
data string
err error
}
// Note: 添加一位缓冲保证 goroutine 在 runner 已经超时的情况下能够正常退出,避免资源泄露
readChan := make(chan dataErr, 1)
go func() {
var timeoutStatus int32
type dataErr struct {
data string
err error
}
// Note: 添加一位缓冲保证 goroutine 在 runner 已经超时的情况下能够正常退出,避免资源泄露
readChan := make(chan dataErr, 1)
go func() {
if dr, ok := rd.(reader.DataReader); ok {
data, _, err := dr.ReadData()
if err != nil && err != io.EOF {
readChan <- dataErr{"", err}
Expand All @@ -70,40 +69,35 @@ func RawData(readerConfig conf.MapConf) (string, error) {

p, err := jsoniter.MarshalIndent(data, "", " ")
readChan <- dataErr{string(p), err}
}()

timeout := time.NewTimer(time.Minute)
select {
case de := <-readChan:
rawData, err = de.data, de.err
if err != nil {
return "", fmt.Errorf("reader %q - error: %v", rd.Name(), err)
}

case <-timeout.C:
return "", fmt.Errorf("reader %q read data timeout, no data received", rd.Name())
return
}

} else {
tryCount := DefaultTryTimes
for {
if tryCount <= 0 {
return "", fmt.Errorf("reader %q read line timeout, no data received", rd.Name())
}
tryCount--

rawData, err = rd.ReadLine()
// ReadLine 是可能读到空值的,在接收器宣布超时或读取到数据之前需要不停循环读取
for atomic.LoadInt32(&timeoutStatus) == 0 {
str, err := rd.ReadLine()
if err != nil && err != io.EOF {
return "", fmt.Errorf("reader %q - error: %v", rd.Name(), err)
readChan <- dataErr{"", err}
return
}
if err == io.EOF {
return rawData, nil
if err == io.EOF || len(str) > 0 {
readChan <- dataErr{str, nil}
return
}
}
}()

if len(rawData) > 0 {
break
}
var rawData string
timeout := time.NewTimer(time.Minute)
select {
case de := <-readChan:
rawData, err = de.data, de.err
if err != nil {
return "", fmt.Errorf("reader %q - error: %v", rd.Name(), err)
}

case <-timeout.C:
atomic.StoreInt32(&timeoutStatus, 1)
return "", fmt.Errorf("reader %q read timeout, no data received", rd.Name())
}

if len(rawData) >= DefaultMaxBatchSize {
Expand Down

0 comments on commit a3cd663

Please sign in to comment.