Skip to content

Commit

Permalink
feat: Add Real-time task report function (#1001)
Browse files Browse the repository at this point in the history
* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* d

* aa

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* ad

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update cmd_nix.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update tasks.go

* s

* Update tasks.go

* a

* Update heartbeat.go

* aa

* Update tasks.go

* Update tasks.go

* Update tasks.go

* Update heartbeat.go

* Update tasks.go

* Update tasks.go

* a

* realtime report

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* error realtime output

* a

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go

* Update task.go
  • Loading branch information
NeganZhao authored Jul 11, 2024
1 parent d64b6c0 commit b643390
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
1 change: 1 addition & 0 deletions ibex/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func heartbeat() {
}

var resp types.ReportResponse

err := client.GetCli().Call("Server.Report", req, &resp)

if err != nil {
Expand Down
48 changes: 43 additions & 5 deletions ibex/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package ibex

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os/exec"
"os/user"
Expand Down Expand Up @@ -281,28 +283,65 @@ func (t *Task) start() {
}
}

cmd.Stdout = &t.Stdout
cmd.Stderr = &t.Stderr
cmd.Stdin = t.Stdin
t.Cmd = cmd

stdout, err := t.Cmd.StdoutPipe()
if err != nil {
log.Printf("E! cannot read ouput of task[%d]: %v", t.Id, err)
}

stderr, err := t.Cmd.StderrPipe()

if err != nil {
log.Printf("E! cannot read err of task[%d]: %v", t.Id, err)
}

err = CmdStart(cmd)

if err != nil {
log.Printf("E! cannot start cmd of task[%d]: %v", t.Id, err)
return
}

go runProcess(t)
go runProcessRealtime(stdout, stderr, t)
}

func (t *Task) kill() {
go killProcess(t)
}

func runProcess(t *Task) {
func runProcessRealtime(stdout io.ReadCloser, stderr io.ReadCloser, t *Task) {
t.SetAlive(true)
defer t.SetAlive(false)

reader := bufio.NewReader(stdout)

go func() {
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
t.Stdout.WriteString(line)

persistResult(t)
}
}()

errReader := bufio.NewReader(stderr)

go func() {
for {
line, err2 := errReader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
t.Stderr.WriteString(line)
persistResult(t)
}
}()

err := t.Cmd.Wait()
if err != nil {
if strings.Contains(err.Error(), "signal: killed") {
Expand All @@ -326,7 +365,6 @@ func runProcess(t *Task) {

func persistResult(t *Task) {
metadir := config.Config.Ibex.MetaDir

stdout := filepath.Join(metadir, fmt.Sprint(t.Id), "stdout")
stderr := filepath.Join(metadir, fmt.Sprint(t.Id), "stderr")
doneFlag := filepath.Join(metadir, fmt.Sprint(t.Id), fmt.Sprintf("%d.done", t.Clock))
Expand Down
2 changes: 1 addition & 1 deletion ibex/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (lt *LocalTasksT) ReportTasks() []types.ReportTask {
rt := types.ReportTask{Id: id, Clock: t.Clock}

rt.Status = t.GetStatus()
if rt.Status == "running" || rt.Status == "killing" {
if rt.Status == "killing" {
// intermediate state
continue
}
Expand Down

0 comments on commit b643390

Please sign in to comment.