Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix Fail when Resource Quota exceeded. #627

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 64 additions & 16 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"path"
"strconv"
"strings"
"time"

"github.com/golang-jwt/jwt/v4"
Expand Down Expand Up @@ -64,6 +65,36 @@ func (e *AddonNotFound) Is(err error) (matched bool) {
return
}

// QuotaExceeded report quota exceeded.
type QuotaExceeded struct {
Reason string
}

// Match returns true when the error is Forbidden due to quota exceeded.
func (e *QuotaExceeded) Match(err error) (matched bool) {
if k8serr.IsForbidden(err) {
matched = true
e.Reason = err.Error()
for _, s := range []string{"quota", "exceeded"} {
matched = strings.Contains(e.Reason, s)
if !matched {
break
}
}
}
return
}

func (e *QuotaExceeded) Error() (s string) {
return e.Reason
}

func (e *QuotaExceeded) Is(err error) (matched bool) {
var inst *QuotaExceeded
matched = errors.As(err, &inst)
return
}

// Manager provides task management.
type Manager struct {
// DB
Expand Down Expand Up @@ -142,28 +173,28 @@ func (m *Manager) startReady() {
if m.postpone(ready, list) {
ready.State = Postponed
Log.Info("Task postponed.", "id", ready.ID)
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
err := m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
rt := Task{ready}
err := rt.Run(m.Client)
started, err := rt.Run(m.Client)
if err != nil {
if errors.Is(err, &AddonNotFound{}) {
ready.Error("Error", err.Error())
ready.State = Failed
sErr := m.DB.Save(ready).Error
Log.Error(sErr, "")
}
Log.Error(err, "")
ready.Error("Error", err.Error())
ready.State = Failed
err = m.DB.Save(ready).Error
Log.Error(err, "")
continue
}
Log.Info("Task started.", "id", ready.ID)
err = m.DB.Save(ready).Error
Log.Error(err, "")
if started {
Log.Info("Task started.", "id", ready.ID)
if ready.Retries == 0 {
metrics.TasksInitiated.Inc()
}
}
default:
// Ignored.
// Other states included to support
Expand Down Expand Up @@ -256,13 +287,23 @@ type Task struct {
}

// Run the specified task.
func (r *Task) Run(client k8s.Client) (err error) {
func (r *Task) Run(client k8s.Client) (started bool, err error) {
mark := time.Now()
defer func() {
if err != nil {
if err == nil {
return
}
if errors.Is(err, &QuotaExceeded{}) {
Log.V(1).Info(err.Error())
err = nil
return
}
if errors.Is(err, &AddonNotFound{}) {
r.Error("Error", err.Error())
r.Terminated = &mark
r.State = Failed
err = nil
return
}
}()
addon, err := r.findAddon(client, r.Addon)
Expand All @@ -288,6 +329,10 @@ func (r *Task) Run(client k8s.Client) (err error) {
pod := r.pod(addon, owner, &secret)
err = client.Create(context.TODO(), &pod)
if err != nil {
qe := &QuotaExceeded{err.Error()}
if qe.Match(err) {
err = qe
}
err = liberr.Wrap(err)
return
}
Expand All @@ -309,6 +354,7 @@ func (r *Task) Run(client k8s.Client) (err error) {
err = liberr.Wrap(err)
return
}
started = true
r.Started = &mark
r.State = Pending
r.Pod = path.Join(
Expand All @@ -329,7 +375,9 @@ func (r *Task) Reflect(client k8s.Client) (err error) {
pod)
if err != nil {
if k8serr.IsNotFound(err) {
err = r.Run(client)
r.Pod = ""
r.State = Ready
err = nil
} else {
err = liberr.Wrap(err)
}
Expand Down
Loading