diff --git a/cmd/containerd-shim-runc-v2/task/service.go b/cmd/containerd-shim-runc-v2/task/service.go index b6455f285942..58d525050c06 100644 --- a/cmd/containerd-shim-runc-v2/task/service.go +++ b/cmd/containerd-shim-runc-v2/task/service.go @@ -74,15 +74,18 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S } go ep.Run(ctx) s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), - running: make(map[int][]containerProcess), - pendingExecs: make(map[*runc.Container]int), - exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + running: make(map[int][]containerProcess), + runningExecs: make(map[*runc.Container]uint32), + lastExecEventSent: make(map[*runc.Container]chan struct{}), + exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), + containerInitProcess: make(map[*runc.Container]initProcess), + waitingAllExecExitSent: make(map[*runc.Container]bool), } go s.processExits() runcC.Monitor = reaper.Default @@ -115,9 +118,13 @@ type service struct { containers map[string]*runc.Container - lifecycleMu sync.Mutex - running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu - pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu + lifecycleMu sync.Mutex + running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu + execsMu sync.Mutex + runningExecs map[*runc.Container]uint32 + waitingAllExecExitSent map[*runc.Container]bool + lastExecEventSent map[*runc.Container]chan struct{} + containerInitProcess map[*runc.Container]initProcess // Subscriptions to exits for PIDs. Adding/deleting subscriptions and // dereferencing the subscription pointers must only be done while holding // lifecycleMu. @@ -126,6 +133,11 @@ type service struct { shutdown shutdown.Service } +type initProcess struct { + pid uint32 + exited bool +} + type containerProcess struct { Container *runc.Container Process process.Process @@ -173,45 +185,7 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe if p != nil { pid = p.Pid() } - - _, init := p.(*process.Init) s.lifecycleMu.Lock() - - var initExits []runcC.Exit - var initCps []containerProcess - if !init { - s.pendingExecs[c]-- - - initPid := c.Pid() - iExits, initExited := exits[initPid] - if initExited && s.pendingExecs[c] == 0 { - // c's init process has exited before handleStarted was called and - // this is the last pending exec process start - we need to process - // the exit for the init process after processing this exec, so: - // - delete c from the s.pendingExecs map - // - keep the exits for the init pid to process later (after we process - // this exec's exits) - // - get the necessary containerProcesses for the init process (that we - // need to process the exits), and remove them from s.running (which we skipped - // doing in processExits). - delete(s.pendingExecs, c) - initExits = iExits - var skipped []containerProcess - for _, initPidCp := range s.running[initPid] { - if initPidCp.Container == c { - initCps = append(initCps, initPidCp) - } else { - skipped = append(skipped, initPidCp) - } - } - if len(skipped) == 0 { - delete(s.running, initPid) - } else { - s.running[initPid] = skipped - } - } - } - ees, exited := exits[pid] delete(s.exitSubscribers, &exits) exits = nil @@ -220,11 +194,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe for _, ee := range ees { s.handleProcessExit(ee, c, p) } - for _, eee := range initExits { - for _, cp := range initCps { - s.handleProcessExit(eee, cp.Container, cp.Process) - } - } } else { // Process start was successful, add to `s.running`. s.running[pid] = append(s.running[pid], containerProcess{ @@ -262,7 +231,10 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * } s.containers[r.ID] = container - + s.containerInitProcess[container] = initProcess{ + pid: uint32(container.Pid()), + exited: false, + } s.send(&eventstypes.TaskCreate{ ContainerID: r.ID, Bundle: r.Bundle, @@ -304,8 +276,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. s.lifecycleMu.Lock() if r.ExecID == "" { cinit = container - } else { - s.pendingExecs[container]++ + } + initPro, ok := s.containerInitProcess[container] + if ok && initPro.exited { + s.lifecycleMu.Unlock() + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID) } handleStarted, cleanup := s.preStart(cinit) s.lifecycleMu.Unlock() @@ -347,6 +322,13 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. Pid: uint32(p.Pid()), }) default: + s.execsMu.Lock() + s.runningExecs[container]++ + _, exist := s.lastExecEventSent[container] + if !exist { + s.lastExecEventSent[container] = make(chan struct{}) + } + s.execsMu.Unlock() s.send(&eventstypes.TaskExecStarted{ ContainerID: container.ID, ExecID: r.ExecID, @@ -674,27 +656,25 @@ func (s *service) processExits() { for subscriber := range s.exitSubscribers { (*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e) } - // Handle the exit for a created/started process. If there's more than - // one, assume they've all exited. One of them will be the correct - // process. - var cps, skipped []containerProcess - for _, cp := range s.running[e.Pid] { - _, init := cp.Process.(*process.Init) - if init && s.pendingExecs[cp.Container] != 0 { - // This exit relates to a container for which we have pending execs. In - // order to ensure order between execs and the init process for a given - // container, skip processing this exit here and let the `handleStarted` - // closure for the pending exec publish it. - skipped = append(skipped, cp) - } else { - cps = append(cps, cp) + + init := initProcess{} + var container *runc.Container + for c, initPro := range s.containerInitProcess { + if initPro.pid == uint32(e.Pid) { + container = c + init.pid = uint32(e.Pid) + init.exited = true } } - if len(skipped) > 0 { - s.running[e.Pid] = skipped - } else { - delete(s.running, e.Pid) + if container != nil { + s.containerInitProcess[container] = init } + + // Handle the exit for a created/started process. If there's more than + // one, assume they've all exited. One of them will be the correct + // process. + cps := s.running[e.Pid] + delete(s.running, e.Pid) s.lifecycleMu.Unlock() for _, cp := range cps { @@ -709,7 +689,7 @@ func (s *service) send(evt interface{}) { // s.mu must be locked when calling handleProcessExit func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { - if ip, ok := p.(*process.Init); ok { + if ip, init := p.(*process.Init); init { // Ensure all children are killed if runc.ShouldKillAllOnExit(s.context, c.Bundle) { if err := ip.KillAll(s.context); err != nil { @@ -717,16 +697,52 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P Error("failed to kill init's children") } } + go func() { + s.execsMu.Lock() + s.waitingAllExecExitSent[c] = true + if execCounts, exist := s.runningExecs[c]; exist && execCounts > 0 { + s.execsMu.Unlock() + if ch, ok := s.lastExecEventSent[c]; ok { + <-ch + } + } else { + s.execsMu.Unlock() + } + p.SetExited(e.Status) + s.send(&eventstypes.TaskExit{ + ContainerID: c.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), + }) + s.execsMu.Lock() + delete(s.lastExecEventSent, c) + delete(s.containerInitProcess, c) + delete(s.waitingAllExecExitSent, c) + s.execsMu.Unlock() + }() + } else { + p.SetExited(e.Status) + s.send(&eventstypes.TaskExit{ + ContainerID: c.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), + }) + s.execsMu.Lock() + s.runningExecs[c]-- + if s.runningExecs[c] == 0 && s.waitingAllExecExitSent[c] { + // last exec event is sent + sent, exist := s.lastExecEventSent[c] + if exist { + sent <- struct{}{} + } + } + s.execsMu.Unlock() } - p.SetExited(e.Status) - s.send(&eventstypes.TaskExit{ - ContainerID: c.ID, - ID: p.ID(), - Pid: uint32(e.Pid), - ExitStatus: uint32(e.Status), - ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), - }) } func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {