Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
shimv2: Add tracing to shimv2
Browse files Browse the repository at this point in the history
Add trace calls to shimv2 that create spans for functions in service.go.
Tracing starts in New(), which is forked twice and is followed by either
StartShim() or Create().

Tracing cannot start without the value for Trace enabled from the
runtime config so load the config in New(), which results in it being
loaded every time New() is called in addition to where it is originally
loaded after Create().

Fixes #1807

Signed-off-by: Chelsea Mafrica <chelsea.e.mafrica@intel.com>
  • Loading branch information
cmaf committed Dec 2, 2020
1 parent 2a98f43 commit 0279c81
Showing 1 changed file with 92 additions and 1 deletion.
93 changes: 92 additions & 1 deletion containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/opencontainers/runtime-spec/specs-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -72,13 +73,29 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
vci.SetLogger(ctx, logger)
katautils.SetLogger(ctx, logger, logger.Logger.Level)

// load runtime config so that tracing can start if enabled
_, runtimeConfig, err := katautils.LoadConfiguration("", false, true)
if err != nil {
return nil, err
}

// create tracer
_, err = katautils.CreateTracer("kata")
if err != nil {
return nil, err
}
// create span
span, ctx := trace(ctx, "New")
defer span.Finish()

ctx, cancel := context.WithCancel(ctx)

s := &service{
id: id,
pid: uint32(os.Getpid()),
ctx: ctx,
containers: make(map[string]*container),
config: &runtimeConfig,
events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize),
cancel: cancel,
Expand Down Expand Up @@ -161,6 +178,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str
// StartShim willl start a kata shimv2 daemon which will implemented the
// ShimV2 APIs such as create/start/update etc containers.
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
// Stop tracing here since a new tracer will be created the next time New()
// is called again after StartShim()
defer katautils.StopTracing(s.ctx)

span, _ := trace(s.ctx, "StartShim")
defer span.Finish()

bundlePath, err := os.Getwd()
if err != nil {
return "", err
Expand Down Expand Up @@ -274,7 +298,22 @@ func getTopic(e interface{}) string {
return cdruntime.TaskUnknownTopic
}

func trace(ctx context.Context, name string) (opentracing.Span, context.Context) {
if ctx == nil {
logrus.WithField("type", "bug").Error("trace called before context set")
ctx = context.Background()
}

span, ctx := opentracing.StartSpanFromContext(ctx, name)
span.SetTag("source", "runtime")

return span, ctx
}

func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
span, _ := trace(s.ctx, "Cleanup")
defer span.Finish()

//Since the binary cleanup will return the DeleteResponse from stdout to
//containerd, thus we must make sure there is no any outputs in stdout except
//the returned response, thus here redirect the log to stderr in case there's
Expand Down Expand Up @@ -330,6 +369,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e

// Create a new sandbox or container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
span, _ := trace(s.ctx, "Create")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -381,6 +423,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
span, _ := trace(s.ctx, "Start")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -427,6 +472,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP

// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
span, _ := trace(s.ctx, "Delete")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -474,6 +522,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task

// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Exec")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -507,6 +558,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p

// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "ResizePty")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -541,6 +595,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_

// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
span, _ := trace(s.ctx, "State")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -584,11 +641,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP
Terminal: execs.tty.terminal,
ExitStatus: uint32(execs.exitCode),
}, nil

}

// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Pause")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -623,6 +682,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes

// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Resume")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -655,6 +717,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp

// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Kill")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -707,6 +772,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E
// Since for kata, it cannot get the process's pid from VM,
// thus only return the Shim's pid directly.
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
span, _ := trace(s.ctx, "Pids")
defer span.Finish()

var processes []*task.ProcessInfo

defer func() {
Expand All @@ -725,6 +793,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.

// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "CloseIO")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -761,6 +832,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt

// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Checkpoint")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -770,6 +844,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque

// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
span, _ := trace(s.ctx, "Connect")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -785,6 +862,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta
}

func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Shutdown")

defer func() {
err = toGRPC(err)
}()
Expand All @@ -796,6 +875,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
}
s.mu.Unlock()

span.Finish()
katautils.StopTracing(s.ctx)

s.cancel()

os.Exit(0)
Expand All @@ -806,6 +888,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
}

func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
span, _ := trace(s.ctx, "Stats")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand All @@ -830,6 +915,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP

// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
span, _ := trace(s.ctx, "Update")
defer span.Finish()

defer func() {
err = toGRPC(err)
}()
Expand Down Expand Up @@ -857,6 +945,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *

// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
span, _ := trace(s.ctx, "Wait")
defer span.Finish()

var ret uint32

defer func() {
Expand Down

0 comments on commit 0279c81

Please sign in to comment.