Skip to content

Commit

Permalink
internal/runner: clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
adambabik committed Feb 13, 2023
1 parent 130e09c commit 54734ef
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 26 deletions.
25 changes: 16 additions & 9 deletions internal/api/runme/runner/v1/runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ option go_package = "github.com/stateful/runme/internal/gen/proto/go/runme/runne
message Session {
string id = 1;

// envs keeps track of session environment variables.
// They can be modified by executing programs which
// alter them through "export" and "unset" commands.
repeated string envs = 2;

// metadata is a map of client specific metadata.
map<string, string> metadata = 3;
}

message CreateSessionRequest {
// metadata is a map of client specific metadata.
map<string, string> metadata = 1;

// envs field provides an initial set of environment variables
// for a newly created session.
repeated string envs = 2;
}

Expand Down Expand Up @@ -76,18 +83,10 @@ message ExecuteRequest {
// tty when true allocates a pseudo-TTY.
bool tty = 7;

// background, if true, will not accept any input.
bool background = 8;

// chunk_interval specifies how often partial output
// should be streamed to the client.
// For example: "0.5s".
google.protobuf.Duration chunk_interval = 9;

// input_data is a byte array that will be send as input
// to the program.
// It is allowed in the consecutive calls only.
bytes input_data = 10;
bytes input_data = 8;

// session_id indicates in which Session the program should execute.
// Executing in a Session might provide additional context like
Expand All @@ -111,5 +110,13 @@ service RunnerService {
rpc GetSession(GetSessionRequest) returns (GetSessionResponse) {};
rpc ListSessions(ListSessionsRequest) returns (ListSessionsResponse) {};
rpc DeleteSession(DeleteSessionRequest) returns (DeleteSessionResponse) {};

// Execute executes a program. Examine "ExecuteRequest" to explore
// configuration options.
//
// It's a bidirectional stream RPC method. It expects the first
// "ExecuteRequest" to contain details of a program to execute.
// Subsequent "ExecuteRequest" should only contain "input_data" as
// other fields will be ignored.
rpc Execute(stream ExecuteRequest) returns (stream ExecuteResponse) {};
}
51 changes: 36 additions & 15 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ func executeCmd(
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g := new(errgroup.Group)
outs := make(chan output)
datac := make(chan output)

g.Go(func() error {
err := readLoop(ctx, time.Second, cmd.Stdout, cmd.Stderr, outs)
close(outs)
err := readLoop(ctx, time.Second, cmd.Stdout, cmd.Stderr, datac)
close(datac)
if errors.Is(err, io.EOF) {
err = nil
}
return err
})

g.Go(func() error {
for data := range outs {
for data := range datac {
if err := processData(data); err != nil {
return err
}
Expand Down Expand Up @@ -271,40 +271,61 @@ func (o output) Clone() (result output) {
return
}

var bufPool = sync.Pool{
New: func() any {
return make([]byte, 4*1024) // 4KB
},
}

// readLoop uses two sets of buffers in order to avoid allocating
// new memory over and over and putting more presure on GC.
// When the first set is read, it is sent to a channel called `results`.
// `results` should be an unbuffered channel. When a consumer consumes
// from the channel, the loop is unblocked and it moves on to read
// into the second set of buffers and blocks. During this time,
// the consumer has a chance to do something with the data stored
// in the first set of buffers.
func readLoop(
ctx context.Context,
timeout time.Duration,
stdout io.Reader,
stderr io.Reader,
results chan<- output,
) error {
out1, err1 := make([]byte, 1024), make([]byte, 1024)
out2, err2 := make([]byte, 1024), make([]byte, 1024)
idx := 0
if cap(results) > 0 {
panic("readLoop requires unbuffered channel")
}

const size = 4 * 1024

out1, err1 := make([]byte, size), make([]byte, size)
out2, err2 := make([]byte, size), make([]byte, size)

pairIdx := 0

read := func() error {
outb, errb := out1, err1
idx++
if idx%2 == 0 {
outb, errb = out2, err2
outBuf, errBuf := out1, err1
pairIdx = (pairIdx + 1) % 2
if pairIdx == 0 {
outBuf, errBuf = out2, err2
}

var result output

n, err := stdout.Read(outb)
n, err := stdout.Read(outBuf)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
if n > 0 {
result.Stdout = outb[:n]
result.Stdout = outBuf[:n]
}

n, err = stderr.Read(errb)
n, err = stderr.Read(errBuf)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
if n > 0 {
result.Stderr = errb[:n]
result.Stderr = errBuf[:n]
}

if len(result.Stdout) > 0 || len(result.Stderr) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/runner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func Test_runnerService_Execute(t *testing.T) {
},
})
require.NoError(t, err)
// assert.NoError(t, stream.CloseSend())
assert.NoError(t, stream.CloseSend())

result := <-execResult

Expand All @@ -229,7 +229,7 @@ func Test_runnerService_Execute(t *testing.T) {
},
})
require.NoError(t, err)
// assert.NoError(t, stream.CloseSend())
assert.NoError(t, stream.CloseSend())

result := <-execResult

Expand Down

0 comments on commit 54734ef

Please sign in to comment.