Skip to content

Commit

Permalink
fix: stdout/stderr truncating for ibex (#1058)
Browse files Browse the repository at this point in the history
  • Loading branch information
kongfei605 authored Sep 21, 2024
1 parent e48f495 commit fe22938
Showing 1 changed file with 45 additions and 16 deletions.
61 changes: 45 additions & 16 deletions ibex/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Task struct {
Args string
Account string
StdinStr string

outCh chan struct{}
errCh chan struct{}
}

func (t *Task) SetStatus(status string) {
Expand Down Expand Up @@ -88,7 +91,6 @@ func (t *Task) GetStdout() string {
default:
out = buf.String()
}

t.Unlock()
return out
}
Expand All @@ -112,7 +114,6 @@ func (t *Task) GetStderr() string {
default:
out = buf.String()
}

t.Unlock()
return out
}
Expand Down Expand Up @@ -160,6 +161,7 @@ func (t *Task) prepare() error {
// already prepared
return nil
}
t.pipeCreate()

IdDir := filepath.Join(config.Config.Ibex.MetaDir, fmt.Sprint(t.Id))
err := file.EnsureDir(IdDir)
Expand Down Expand Up @@ -291,7 +293,6 @@ func (t *Task) start() {
if t.GetAlive() {
return
}

err := t.prepare()
if err != nil {
return
Expand Down Expand Up @@ -370,37 +371,70 @@ func (t *Task) kill() {
go killProcess(t)
}

func (t *Task) pipeDrain() {
<-t.outCh
<-t.errCh
}

func (t *Task) pipeCreate() {
t.outCh = make(chan struct{})
t.errCh = make(chan struct{})
}

func (t *Task) stdoutFlush() {
metaDir := config.Config.Ibex.MetaDir
stdoutFile := filepath.Join(metaDir, fmt.Sprint(t.Id), "stdout")
file.WriteString(stdoutFile, t.GetStdout())
close(t.outCh)
}

func (t *Task) stderrFlush() {
metaDir := config.Config.Ibex.MetaDir
stderrFile := filepath.Join(metaDir, fmt.Sprint(t.Id), "stderr")
file.WriteString(stderrFile, t.GetStderr())
close(t.errCh)
}

func runProcessRealtime(stdout io.ReadCloser, stderr io.ReadCloser, t *Task) {
t.SetAlive(true)
defer t.SetAlive(false)

reader := bufio.NewReader(stdout)

go func() {
defer t.stdoutFlush()
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
if len(line) != 0 {
t.Stdout.WriteString(line)
}
if err2 != nil {
if err2 != io.EOF {
log.Println("W! read stdout fail:", err2)
}
break
}
t.Stdout.WriteString(line)

persistResult(t)
}
}()

errReader := bufio.NewReader(stderr)

go func() {
defer t.stderrFlush()
for {
line, err2 := errReader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
if len(line) != 0 {
t.Stderr.WriteString(line)
}
if err2 != nil {
if err2 != io.EOF {
log.Println("W! read stdout fail:", err2)
}
break
}
t.Stderr.WriteString(line)
persistResult(t)
}
}()

t.pipeDrain()
err := t.Cmd.Wait()
if err != nil {
if strings.Contains(err.Error(), "signal: killed") {
Expand All @@ -424,12 +458,7 @@ func runProcessRealtime(stdout io.ReadCloser, stderr io.ReadCloser, t *Task) {

func persistResult(t *Task) {
metadir := config.Config.Ibex.MetaDir
stdout := filepath.Join(metadir, fmt.Sprint(t.Id), "stdout")
stderr := filepath.Join(metadir, fmt.Sprint(t.Id), "stderr")
doneFlag := filepath.Join(metadir, fmt.Sprint(t.Id), fmt.Sprintf("%d.done", t.Clock))

file.WriteString(stdout, t.GetStdout())
file.WriteString(stderr, t.GetStderr())
file.WriteString(doneFlag, t.GetStatus())
}

Expand Down

0 comments on commit fe22938

Please sign in to comment.