Skip to content

Commit

Permalink
make sure init exited event is last sent
Browse files Browse the repository at this point in the history
Signed-off-by: ningmingxiao <ning.mingxiao@zte.com.cn>
  • Loading branch information
ningmingxiao committed Sep 4, 2024
1 parent 919beb1 commit ecb82bc
Showing 1 changed file with 101 additions and 85 deletions.
186 changes: 101 additions & 85 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
runningExecs: make(map[*runc.Container]uint32),
lastExecEventSent: make(map[*runc.Container]chan struct{}),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
containerInitProcess: make(map[*runc.Container]initProcess),
waitingAllExecExitSent: make(map[*runc.Container]bool),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand Down Expand Up @@ -115,9 +118,13 @@ type service struct {

containers map[string]*runc.Container

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
execsMu sync.Mutex
runningExecs map[*runc.Container]uint32
waitingAllExecExitSent map[*runc.Container]bool
lastExecEventSent map[*runc.Container]chan struct{}
containerInitProcess map[*runc.Container]initProcess
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
Expand All @@ -126,6 +133,11 @@ type service struct {
shutdown shutdown.Service
}

type initProcess struct {
pid uint32
exited bool
}

type containerProcess struct {
Container *runc.Container
Process process.Process
Expand Down Expand Up @@ -173,45 +185,7 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
if p != nil {
pid = p.Pid()
}

_, init := p.(*process.Init)
s.lifecycleMu.Lock()

var initExits []runcC.Exit
var initCps []containerProcess
if !init {
s.pendingExecs[c]--

initPid := c.Pid()
iExits, initExited := exits[initPid]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExits = iExits
var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}
if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}

ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Expand All @@ -220,11 +194,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
for _, eee := range initExits {
for _, cp := range initCps {
s.handleProcessExit(eee, cp.Container, cp.Process)
}
}
} else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{
Expand Down Expand Up @@ -262,7 +231,10 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
}

s.containers[r.ID] = container

s.containerInitProcess[container] = initProcess{
pid: uint32(container.Pid()),
exited: false,
}
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Expand Down Expand Up @@ -304,8 +276,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.lifecycleMu.Lock()
if r.ExecID == "" {
cinit = container
} else {
s.pendingExecs[container]++
}
initPro, ok := s.containerInitProcess[container]
if ok && initPro.exited {
s.lifecycleMu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
}
handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
Expand Down Expand Up @@ -347,6 +322,13 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()),
})
default:
s.execsMu.Lock()
s.runningExecs[container]++
_, exist := s.lastExecEventSent[container]
if !exist {
s.lastExecEventSent[container] = make(chan struct{})
}
s.execsMu.Unlock()
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Expand Down Expand Up @@ -674,27 +656,25 @@ func (s *service) processExits() {
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
var cps, skipped []containerProcess
for _, cp := range s.running[e.Pid] {
_, init := cp.Process.(*process.Init)
if init && s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)

init := initProcess{}
var container *runc.Container
for c, initPro := range s.containerInitProcess {
if initPro.pid == uint32(e.Pid) {
container = c
init.pid = uint32(e.Pid)
init.exited = true
}
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
if container != nil {
s.containerInitProcess[container] = init
}

// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
Expand All @@ -709,24 +689,60 @@ func (s *service) send(evt interface{}) {

// s.mu must be locked when calling handleProcessExit
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
if ip, ok := p.(*process.Init); ok {
if ip, init := p.(*process.Init); init {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
go func() {
s.execsMu.Lock()
s.waitingAllExecExitSent[c] = true
if execCounts, exist := s.runningExecs[c]; exist && execCounts > 0 {
s.execsMu.Unlock()
if ch, ok := s.lastExecEventSent[c]; ok {
<-ch
}
} else {
s.execsMu.Unlock()
}
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
s.execsMu.Lock()
delete(s.lastExecEventSent, c)
delete(s.containerInitProcess, c)
delete(s.waitingAllExecExitSent, c)
s.execsMu.Unlock()
}()
} else {
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
s.execsMu.Lock()
s.runningExecs[c]--
if s.runningExecs[c] == 0 && s.waitingAllExecExitSent[c] {
// last exec event is sent
sent, exist := s.lastExecEventSent[c]
if exist {
sent <- struct{}{}
}
}
s.execsMu.Unlock()
}

p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}

func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
Expand Down

0 comments on commit ecb82bc

Please sign in to comment.