Skip to content

Commit

Permalink
executor: fix cpu usage exhaust (pingcap#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Dec 29, 2021
1 parent ad8a0d6 commit f9ea138
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
25 changes: 21 additions & 4 deletions executor/runtime/benchmark/operator.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package benchmark

import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"time"

"github.com/hanfei1991/microcosm/executor/runtime"
Expand Down Expand Up @@ -37,16 +39,31 @@ func (f *fileWriter) Prepare() error {
}

func sprintPayload(r *pb.Record) string {
str := fmt.Sprintf("tid %d, pk %d, time tracer ", r.Tid, r.Pk)
var buf bytes.Buffer
buf.WriteString("tid ")
buf.WriteString(strconv.FormatInt(int64(r.Tid), 10))
buf.WriteString(", pk")
buf.WriteString(strconv.FormatInt(int64(r.Pk), 10))
buf.WriteString(", time tracer ")
for _, ts := range r.TimeTracer {
str += fmt.Sprintf("%s ", time.Unix(0, ts))
buf.WriteString(strconv.FormatInt(ts, 10) + " ")
}
return str
return buf.String()
}

func sprintRecord(r *runtime.Record) string {
start := time.Unix(0, r.Payload.(*pb.Record).TimeTracer[0])
return fmt.Sprintf("flowID %s start %s end %s payload: %s\n", r.FlowID, start.String(), r.End.String(), sprintPayload(r.Payload.(*pb.Record)))
var buf bytes.Buffer
buf.WriteString("flowID ")
buf.WriteString(r.FlowID)
buf.WriteString(" start ")
buf.WriteString(start.String())
buf.WriteString(" end ")
buf.WriteString(r.End.String())
buf.WriteString(" payload: ")
buf.WriteString(sprintPayload(r.Payload.(*pb.Record)))
buf.WriteString("\n")
return buf.String()
}

func (f *fileWriter) writeStats(s *recordStats) error {
Expand Down
5 changes: 4 additions & 1 deletion executor/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runtime
import (
"context"
"sync"
"time"

"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/test"
Expand Down Expand Up @@ -72,7 +73,9 @@ func (s *Runtime) runImpl(ctx context.Context) {
}
t := s.q.pop()
if t == nil {
// idle
// idle, sleep for sometime to avoid busy loop
// TODO: find better wake up mechanism way if needed
time.Sleep(time.Millisecond * 50)
continue
}
status := t.Poll()
Expand Down
3 changes: 2 additions & 1 deletion executor/runtime/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type Channel struct {

func (c *Channel) readBatch(batch int) []*Record {
records := make([]*Record, 0, batch)
readLoop:
for i := 0; i < batch; i++ {
select {
case record := <-c.innerChan:
records = append(records, record)
default:
break
break readLoop
}
}
if len(records) > 0 {
Expand Down

0 comments on commit f9ea138

Please sign in to comment.