From 50cbb93061879aacc5bfc99bdabcd3e87678c381 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Wed, 21 Nov 2018 10:39:46 +0000 Subject: [PATCH 1/4] Abort image tags fetching on HTTP 429 errors - log the rate limit error once per image repository --- registry/cache/warming.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/registry/cache/warming.go b/registry/cache/warming.go index 78fa68dda..c1238c98c 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -273,6 +273,10 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id awaitFetchers := &sync.WaitGroup{} awaitFetchers.Add(len(toUpdate)) + ctxc, cancel := context.WithCancel(context.Background()) + var once sync.Once + defer cancel() + updates: for _, up := range toUpdate { select { @@ -284,6 +288,12 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id go func(update update) { defer func() { awaitFetchers.Done(); <-fetchers }() + select { + case <-ctxc.Done(): + return // terminate on error + default: // avoid blocking + } + imageID := update.ref if w.Trace { @@ -297,7 +307,16 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id // This was due to a context timeout, don't bother logging return } - errorLogger.Log("err", err, "ref", imageID) + + // abort the image tags fetching if we've been rate limited + if strings.Contains(err.Error(), "429") { + once.Do(func() { + errorLogger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later") + }) + cancel() + } else { + errorLogger.Log("err", err, "ref", imageID) + } return } From 2ffbbcbdb895c39f92717a48d139af309168cc8c Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Wed, 21 Nov 2018 14:05:22 +0000 Subject: [PATCH 2/4] Create the cancelable ctx from the context passed into the warm func --- registry/cache/warming.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/registry/cache/warming.go b/registry/cache/warming.go index c1238c98c..ba48a9067 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -273,14 +273,14 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id awaitFetchers := &sync.WaitGroup{} awaitFetchers.Add(len(toUpdate)) - ctxc, cancel := context.WithCancel(context.Background()) + ctxc, cancel := context.WithCancel(ctx) var once sync.Once defer cancel() updates: for _, up := range toUpdate { select { - case <-ctx.Done(): + case <-ctxc.Done(): break updates case fetchers <- struct{}{}: } @@ -288,12 +288,6 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id go func(update update) { defer func() { awaitFetchers.Done(); <-fetchers }() - select { - case <-ctxc.Done(): - return // terminate on error - default: // avoid blocking - } - imageID := update.ref if w.Trace { @@ -301,7 +295,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id } // Get the image from the remote - entry, err := client.Manifest(ctx, imageID.Tag) + entry, err := client.Manifest(ctxc, imageID.Tag) if err != nil { if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() { // This was due to a context timeout, don't bother logging From 181bc1b4539bbdce53226a7af11d476c748f3cef Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Wed, 21 Nov 2018 16:00:07 +0000 Subject: [PATCH 3/4] Do not block on cancelable context and cancel context only once --- registry/cache/warming.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/registry/cache/warming.go b/registry/cache/warming.go index ba48a9067..cb20f18ac 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -280,7 +280,7 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id updates: for _, up := range toUpdate { select { - case <-ctxc.Done(): + case <-ctx.Done(): break updates case fetchers <- struct{}{}: } @@ -306,8 +306,8 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id if strings.Contains(err.Error(), "429") { once.Do(func() { errorLogger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later") + cancel() }) - cancel() } else { errorLogger.Log("err", err, "ref", imageID) } From 4feee5f4671a91151ddc0089c4c95769bd652426 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Wed, 21 Nov 2018 16:21:03 +0000 Subject: [PATCH 4/4] Break update on context cancel and set wait group increment after the select --- registry/cache/warming.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/registry/cache/warming.go b/registry/cache/warming.go index cb20f18ac..5ef1b0f1c 100644 --- a/registry/cache/warming.go +++ b/registry/cache/warming.go @@ -271,7 +271,6 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id // w.Burst, so limit the number of fetching goroutines to that. fetchers := make(chan struct{}, w.burst) awaitFetchers := &sync.WaitGroup{} - awaitFetchers.Add(len(toUpdate)) ctxc, cancel := context.WithCancel(ctx) var once sync.Once @@ -280,11 +279,13 @@ func (w *Warmer) warm(ctx context.Context, now time.Time, logger log.Logger, id updates: for _, up := range toUpdate { select { - case <-ctx.Done(): + case <-ctxc.Done(): break updates case fetchers <- struct{}{}: } + awaitFetchers.Add(1) + go func(update update) { defer func() { awaitFetchers.Done(); <-fetchers }()