Skip to content

Commit

Permalink
feat: track long running generic commands in the oplog (#516)
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Oct 12, 2024
1 parent d7704cf commit 28c3172
Show file tree
Hide file tree
Showing 23 changed files with 641 additions and 444 deletions.
2 changes: 1 addition & 1 deletion gen/go/v1/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

371 changes: 233 additions & 138 deletions gen/go/v1/operations.pb.go

Large diffs are not rendered by default.

40 changes: 20 additions & 20 deletions gen/go/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 27 additions & 54 deletions gen/go/v1/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions gen/go/v1/v1connect/service.connect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 17 additions & 62 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,73 +429,28 @@ func (s *BackrestHandler) Restore(ctx context.Context, req *connect.Request[v1.R
return connect.NewResponse(&emptypb.Empty{}), nil
}

func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v1.RunCommandRequest], resp *connect.ServerStream[types.BytesValue]) error {
repo, err := s.orchestrator.GetRepoOrchestrator(req.Msg.RepoId)
if err != nil {
return fmt.Errorf("failed to get repo %q: %w", req.Msg.RepoId, err)
}

ctx, cancel := context.WithCancel(ctx)

outputs := make(chan []byte, 100)
errChan := make(chan error, 1)
go func() {
start := time.Now()
zap.S().Infof("running command for webui: %v", req.Msg.Command)
if err := repo.RunCommand(ctx, req.Msg.Command, func(output []byte) {
outputs <- bytes.Clone(output)
}); err != nil && ctx.Err() == nil {
zap.S().Errorf("error running command for webui: %v", err)
errChan <- err
} else {
zap.S().Infof("command completed for webui: %v", time.Since(start))
}
outputs <- []byte("took " + time.Since(start).String())
cancel()
}()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

bufSize := 32 * 1024
buf := make([]byte, 0, bufSize)

flush := func() error {
if len(buf) > 0 {
if err := resp.Send(&types.BytesValue{Value: buf}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
}
buf = buf[:0]
func (s *BackrestHandler) RunCommand(ctx context.Context, req *connect.Request[v1.RunCommandRequest]) (*connect.Response[types.Int64Value], error) {
// group commands within the last 24 hours (or 256 operations) into the same flow ID
var flowID int64
if s.oplog.Query(oplog.Query{RepoID: req.Msg.RepoId, Limit: 256, Reversed: true}, func(op *v1.Operation) error {
if op.GetOperationRunCommand() != nil && time.Since(time.UnixMilli(op.UnixTimeStartMs)) < 30*time.Minute {
flowID = op.FlowId
}
return nil
}) != nil {
return nil, fmt.Errorf("failed to query operations")
}

for {
select {
case err := <-errChan:
if err := flush(); err != nil {
return err
}
return err
case <-ctx.Done():
return flush()
case output := <-outputs:
if len(output)+len(buf) > bufSize {
flush()
}
if len(output) > bufSize {
if err := resp.Send(&types.BytesValue{Value: output}); err != nil {
return fmt.Errorf("failed to write output: %w", err)
}
continue
}
buf = append(buf, output...)
case <-ticker.C:
if len(buf) > 0 {
flush()
}
}
task := tasks.NewOneoffRunCommandTask(req.Msg.RepoId, tasks.PlanForSystemTasks, flowID, time.Now(), req.Msg.Command)
st, err := s.orchestrator.CreateUnscheduledTask(task, tasks.TaskPriorityInteractive, time.Now())
if err != nil {
return nil, fmt.Errorf("failed to create task: %w", err)
}
if err := s.orchestrator.RunTask(context.Background(), st); err != nil {
return nil, fmt.Errorf("failed to run command: %w", err)
}

return connect.NewResponse(&types.Int64Value{Value: st.Op.GetId()}), nil
}

func (s *BackrestHandler) Cancel(ctx context.Context, req *connect.Request[types.Int64Value]) (*connect.Response[emptypb.Empty], error) {
Expand Down
Loading

0 comments on commit 28c3172

Please sign in to comment.