Skip to content

Commit

Permalink
Integrate Owl Store into Runner v2 (#686)
Browse files Browse the repository at this point in the history
Refactor integration based on lessons learned in v1.
  • Loading branch information
sourishkrout authored Oct 21, 2024
1 parent efb7bca commit 4ff0375
Show file tree
Hide file tree
Showing 50 changed files with 2,147 additions and 2,559 deletions.
21 changes: 18 additions & 3 deletions internal/cmd/beta/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/stateful/runme/v3/internal/command"
"github.com/stateful/runme/v3/internal/config/autoconfig"
rcontext "github.com/stateful/runme/v3/internal/runner/context"
"github.com/stateful/runme/v3/internal/session"
runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2"
"github.com/stateful/runme/v3/pkg/document"
"github.com/stateful/runme/v3/pkg/project"
Expand Down Expand Up @@ -65,7 +67,14 @@ Run all blocks from the "setup" and "teardown" tags:
return errors.WithStack(err)
}

session := command.NewSession()
session, err := session.New(
session.WithOwl(false),
session.WithProject(proj),
session.WithSeedEnv(nil),
)
if err != nil {
return err
}
options := getCommandOptions(cmd, session)

for _, t := range tasks {
Expand All @@ -86,7 +95,7 @@ Run all blocks from the "setup" and "teardown" tags:

func getCommandOptions(
cmd *cobra.Command,
sess *command.Session,
sess *session.Session,
) command.CommandOptions {
return command.CommandOptions{
Session: sess,
Expand Down Expand Up @@ -119,6 +128,12 @@ func runCodeBlock(

cfg.Mode = runnerv2.CommandMode_COMMAND_MODE_CLI

execInfo := &rcontext.ExecutionInfo{
KnownName: block.Name(),
KnownID: block.ID(),
}
ctx = rcontext.ContextWithExecutionInfo(ctx, execInfo)

cmd, err := factory.Build(cfg, options)
if err != nil {
return err
Expand All @@ -127,5 +142,5 @@ func runCodeBlock(
if err != nil {
return err
}
return cmd.Wait()
return cmd.Wait(ctx)
}
27 changes: 9 additions & 18 deletions internal/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/stateful/runme/v3/internal/session"
"github.com/stateful/runme/v3/pkg/project"
)

Expand All @@ -19,7 +20,7 @@ type Command interface {
Running() bool
Start(context.Context) error
Signal(os.Signal) error
Wait() error
Wait(context.Context) error
}

type internalCommandGetters interface {
Expand All @@ -41,7 +42,7 @@ type base struct {
logger *zap.Logger
project *project.Project
runtime Runtime
session *Session
session *session.Session
stdin io.Reader
stdout io.Writer
stderr io.Writer
Expand Down Expand Up @@ -69,22 +70,12 @@ func (c *base) Signal(os.Signal) error {
return errors.New("not implemented")
}

func (c *base) Wait() error {
func (c *base) Wait(context.Context) error {
return errors.New("not implemented")
}

func (c *base) Env() []string {
env := c.runtime.Environ()

if c.project != nil {
projEnv, err := c.project.LoadEnv()
if err != nil {
c.logger.Warn("failed to load project env", zap.Error(err))
}
env = append(env, projEnv...)
}

env = append(env, c.session.GetAllEnv()...)
env := c.session.GetAllEnv()
env = append(env, c.cfg.Env...)

if err := c.limitEnviron(env); err != nil {
Expand Down Expand Up @@ -127,22 +118,22 @@ func (c *base) limitEnviron(environ []string) error {
}
}

if size <= MaxEnvironSizeInBytes {
if size <= session.MaxEnvironSizeInBytes {
return nil
}

c.logger.Warn("environment size exceeds the limit", zap.Int("size", size), zap.Int("limit", MaxEnvironSizeInBytes))
c.logger.Warn("environment size exceeds the limit", zap.Int("size", size), zap.Int("limit", session.MaxEnvironSizeInBytes))

if stdoutEnvIdx == -1 {
return errors.New("env is too large; no stdout env to trim")
}

stdoutCap := MaxEnvironSizeInBytes - size + len(environ[stdoutEnvIdx])
stdoutCap := session.MaxEnvironSizeInBytes - size + len(environ[stdoutEnvIdx])
if stdoutCap < 0 {
return errors.New("env is too large even if trimming stdout env")
}

key, value := splitEnv(environ[stdoutEnvIdx])
key, value := session.SplitEnv(environ[stdoutEnvIdx])
environ[stdoutEnvIdx] = CreateEnv(key, value[len(value)-stdoutCap:])

return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/command/command_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *dockerCommand) Signal(os.Signal) error {
return c.cmd.Signal()
}

func (c *dockerCommand) Wait() (err error) {
func (c *dockerCommand) Wait(ctx context.Context) (err error) {
c.logger.Info("waiting for the docker command to finish")
err = c.cmd.Wait()
c.logger.Info("the docker command finished", zap.Error(err))
Expand Down
8 changes: 4 additions & 4 deletions internal/command/command_docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDockerCommand(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, cmd.Start(context.Background()))
require.NoError(t, cmd.Wait())
require.NoError(t, cmd.Wait(context.Background()))
})

t.Run("Output", func(t *testing.T) {
Expand All @@ -53,7 +53,7 @@ func TestDockerCommand(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, cmd.Start(context.Background()))
require.NoError(t, cmd.Wait())
require.NoError(t, cmd.Wait(context.Background()))
assert.Equal(t, "test", stdout.String())
})

Expand All @@ -71,7 +71,7 @@ func TestDockerCommand(t *testing.T) {
require.NoError(t, cmd.Start(context.Background()))
require.True(t, cmd.Running())
require.Greater(t, cmd.Pid(), 0)
require.NoError(t, cmd.Wait())
require.NoError(t, cmd.Wait(context.Background()))
})

t.Run("NonZeroExit", func(t *testing.T) {
Expand All @@ -87,6 +87,6 @@ func TestDockerCommand(t *testing.T) {
)
require.NoError(t, err)
require.NoError(t, cmd.Start(context.Background()))
require.Error(t, cmd.Wait(), "exit code 11 due to error \"\"")
require.Error(t, cmd.Wait(context.Background()), "exit code 11 due to error \"\"")
})
}
4 changes: 2 additions & 2 deletions internal/command/command_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ func (c *fileCommand) Start(ctx context.Context) error {
return c.internalCommand.Start(ctx)
}

func (c *fileCommand) Wait() (err error) {
func (c *fileCommand) Wait(ctx context.Context) (err error) {
defer func() {
rErr := c.removeTempDir()
if err == nil {
err = rErr
}
}()
err = c.internalCommand.Wait()
err = c.internalCommand.Wait(ctx)
return
}

Expand Down
17 changes: 8 additions & 9 deletions internal/command/command_inline_shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"

"github.com/pkg/errors"
"github.com/stateful/runme/v3/internal/session"
"go.uber.org/zap"
)

Expand All @@ -16,7 +17,7 @@ type inlineShellCommand struct {
debug bool
envCollector envCollector
logger *zap.Logger
session *Session
session *session.Session
}

func (c *inlineShellCommand) getPty() *os.File {
Expand Down Expand Up @@ -45,12 +46,12 @@ func (c *inlineShellCommand) Start(ctx context.Context) error {
return c.internalCommand.Start(ctx)
}

func (c *inlineShellCommand) Wait() error {
err := c.internalCommand.Wait()
func (c *inlineShellCommand) Wait(ctx context.Context) error {
err := c.internalCommand.Wait(ctx)

if c.envCollector != nil {
c.logger.Info("collecting the environment after the script execution")
cErr := c.collectEnv()
cErr := c.collectEnv(ctx)
c.logger.Info("collected the environment after the script execution", zap.Error(cErr))
if cErr != nil && err == nil {
err = cErr
Expand Down Expand Up @@ -97,7 +98,7 @@ func (c *inlineShellCommand) build() (string, error) {
return buf.String(), nil
}

func (c *inlineShellCommand) collectEnv() error {
func (c *inlineShellCommand) collectEnv(ctx context.Context) error {
if c.envCollector == nil {
return nil
}
Expand All @@ -107,14 +108,12 @@ func (c *inlineShellCommand) collectEnv() error {
return err
}

err = c.session.SetEnv(changed...)
err = c.session.SetEnv(ctx, changed...)
if err != nil {
return errors.WithMessage(err, "failed to set the new or updated env")
}

c.session.DeleteEnv(deleted...)

return nil
return c.session.DeleteEnv(ctx, deleted...)
}

func (c *inlineShellCommand) shellOptions() (string, error) {
Expand Down
40 changes: 25 additions & 15 deletions internal/command/command_inline_shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap/zaptest"

"github.com/stateful/runme/v3/internal/command/testdata"
"github.com/stateful/runme/v3/internal/session"
runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2"
)

Expand All @@ -45,7 +46,8 @@ func TestInlineShellCommand_CollectEnv(t *testing.T) {
},
Mode: runnerv2.CommandMode_COMMAND_MODE_INLINE,
}
sess := NewSession()
sess, err := session.New()
require.NoError(t, err)
factory := NewFactory(WithLogger(zaptest.NewLogger(t)))

command, err := factory.Build(cfg, CommandOptions{Session: sess})
Expand All @@ -61,7 +63,7 @@ func TestInlineShellCommand_CollectEnv(t *testing.T) {
err = <-errC
require.NoError(t, err)

err = command.Wait()
err = command.Wait(context.Background())
require.EqualError(t, err, "signal: killed")

got, ok := sess.GetEnv("TEST_ENV")
Expand All @@ -78,11 +80,12 @@ func TestInlineShellCommand_CollectEnv(t *testing.T) {
func TestInlineShellCommand_MaxEnvSize(t *testing.T) {
t.Parallel()

sess := NewSession()
sess, err := session.New()
require.NoError(t, err)

envName := "TEST"
envValue := strings.Repeat("a", MaxEnvSizeInBytes-len(envName)-1) // -1 for the "=" sign
err := sess.SetEnv(createEnv(envName, envValue))
envValue := strings.Repeat("a", session.MaxEnvSizeInBytes-len(envName)-1) // -1 for the "=" sign
err = sess.SetEnv(context.Background(), createEnv(envName, envValue))
require.NoError(t, err)

factory := NewFactory(
Expand All @@ -106,7 +109,7 @@ func TestInlineShellCommand_MaxEnvSize(t *testing.T) {

err = command.Start(context.Background())
require.NoError(t, err)
err = command.Wait()
err = command.Wait(context.Background())
require.NoError(t, err)

assert.Equal(t, envValue, stdout.String())
Expand All @@ -115,20 +118,21 @@ func TestInlineShellCommand_MaxEnvSize(t *testing.T) {
func TestInlineShellCommand_MaxEnvironSizeInBytes(t *testing.T) {
t.Parallel()

sess := NewSession()
sess, err := session.New()
require.NoError(t, err)

// Set multiple environment variables of [MaxEnvSizeInBytes] length.
// [StoreStdoutEnvName] is also set but it exceeds [MaxEnvironSizeInBytes],
// however, it's allowed to be trimmed so it should not cause an error.
envCount := math.Ceil(float64(MaxEnvironSizeInBytes) / float64(MaxEnvSizeInBytes))
envValue := strings.Repeat("a", MaxEnvSizeInBytes-1) // -1 for the equal sign
envCount := math.Ceil(float64(session.MaxEnvironSizeInBytes) / float64(session.MaxEnvSizeInBytes))
envValue := strings.Repeat("a", session.MaxEnvSizeInBytes-1) // -1 for the equal sign
for i := 0; i < int(envCount); i++ {
name := "TEST" + strconv.Itoa(i)
value := envValue[:len(envValue)-len(name)]
err := sess.SetEnv(createEnv(name, value))
err := sess.SetEnv(context.Background(), createEnv(name, value))
require.NoError(t, err)
}
err := sess.SetEnv(createEnv(StoreStdoutEnvName, envValue[:len(envValue)-len(StoreStdoutEnvName)]))
err = sess.SetEnv(context.Background(), createEnv(StoreStdoutEnvName, envValue[:len(envValue)-len(StoreStdoutEnvName)]))
require.NoError(t, err)

factory := NewFactory(
Expand All @@ -151,7 +155,7 @@ func TestInlineShellCommand_MaxEnvironSizeInBytes(t *testing.T) {

err = command.Start(context.Background())
require.NoError(t, err)
err = command.Wait()
err = command.Wait(context.Background())
require.NoError(t, err)
}

Expand All @@ -175,14 +179,19 @@ func TestInlineShellCommand_LargeOutput(t *testing.T) {
},
Mode: runnerv2.CommandMode_COMMAND_MODE_INLINE,
}
sess := NewSession()
sess, err := session.New(
session.WithOwl(false),
session.WithSeedEnv(os.Environ()),
)
require.NoError(t, err)

stdout := bytes.NewBuffer(nil)
command, err := factory.Build(cfg, CommandOptions{Session: sess, Stdout: stdout})
require.NoError(t, err)

err = command.Start(context.Background())
require.NoError(t, err)
err = command.Wait()
err = command.Wait(context.Background())
require.NoError(t, err)

expected, err := os.ReadFile(fileName)
Expand All @@ -201,7 +210,8 @@ func testInlineShellCommandCollectEnv(t *testing.T) {
},
Mode: runnerv2.CommandMode_COMMAND_MODE_INLINE,
}
sess := NewSession()
sess, err := session.New(session.WithSeedEnv(os.Environ()))
require.NoError(t, err)

testExecuteCommandWithSession(t, cfg, sess, nil, "", "")

Expand Down
2 changes: 1 addition & 1 deletion internal/command/command_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *nativeCommand) Signal(sig os.Signal) error {
return nil
}

func (c *nativeCommand) Wait() (err error) {
func (c *nativeCommand) Wait(ctx context.Context) (err error) {
c.logger.Info("waiting for finish")

var stderr []byte
Expand Down
Loading

0 comments on commit 4ff0375

Please sign in to comment.