Skip to content

Commit

Permalink
Merge pull request moby#5270 from bpaquet/s3_parallel_upload
Browse files Browse the repository at this point in the history
Parallel layer upload for s3 cache
  • Loading branch information
tonistiigi authored Aug 28, 2024
2 parents b9a3e7b + 22f6b3e commit 6fdac94
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 102 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ Other options are:
* Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands.
* `ignore-error=<false|true>`: specify if error is ignored in case cache export fails (default: `false`)
* `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them.
* `upload_parallelism=4`: This parameter changes the number of layers uploaded to s3 in parallel. Each individual layer is uploaded with 5 threads, using the Upload manager provided by the AWS SDK.

`--import-cache` options:
* `type=s3`
Expand Down
230 changes: 128 additions & 102 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,40 @@ import (
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

const (
attrBucket = "bucket"
attrRegion = "region"
attrPrefix = "prefix"
attrManifestsPrefix = "manifests_prefix"
attrBlobsPrefix = "blobs_prefix"
attrName = "name"
attrTouchRefresh = "touch_refresh"
attrEndpointURL = "endpoint_url"
attrAccessKeyID = "access_key_id"
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
attrBucket = "bucket"
attrRegion = "region"
attrPrefix = "prefix"
attrManifestsPrefix = "manifests_prefix"
attrBlobsPrefix = "blobs_prefix"
attrName = "name"
attrTouchRefresh = "touch_refresh"
attrEndpointURL = "endpoint_url"
attrAccessKeyID = "access_key_id"
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
attrUploadParallelism = "upload_parallelism"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
)

type Config struct {
Bucket string
Region string
Prefix string
ManifestsPrefix string
BlobsPrefix string
Names []string
TouchRefresh time.Duration
EndpointURL string
AccessKeyID string
SecretAccessKey string
SessionToken string
UsePathStyle bool
Bucket string
Region string
Prefix string
ManifestsPrefix string
BlobsPrefix string
Names []string
TouchRefresh time.Duration
EndpointURL string
AccessKeyID string
SecretAccessKey string
SessionToken string
UsePathStyle bool
UploadParallelism int
}

func getConfig(attrs map[string]string) (Config, error) {
Expand Down Expand Up @@ -125,19 +128,33 @@ func getConfig(attrs map[string]string) (Config, error) {
}
}

uploadParallelism := 4
uploadParallelismStr, ok := attrs[attrUploadParallelism]
if ok {
uploadParallelismInt, err := strconv.Atoi(uploadParallelismStr)
if err != nil {
return Config{}, errors.Errorf("upload_parallelism must be a positive integer")
}
if uploadParallelismInt <= 0 {
return Config{}, errors.Errorf("upload_parallelism must be a positive integer")
}
uploadParallelism = uploadParallelismInt
}

return Config{
Bucket: bucket,
Region: region,
Prefix: prefix,
ManifestsPrefix: manifestsPrefix,
BlobsPrefix: blobsPrefix,
Names: names,
TouchRefresh: touchRefresh,
EndpointURL: endpointURL,
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
UsePathStyle: usePathStyle,
Bucket: bucket,
Region: region,
Prefix: prefix,
ManifestsPrefix: manifestsPrefix,
BlobsPrefix: blobsPrefix,
Names: names,
TouchRefresh: touchRefresh,
EndpointURL: endpointURL,
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
UsePathStyle: usePathStyle,
UploadParallelism: uploadParallelism,
}, nil
}

Expand Down Expand Up @@ -187,64 +204,84 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
return nil, err
}

for i, l := range cacheConfig.Layers {
dgstPair, ok := descs[l.Blob]
if !ok {
return nil, errors.Errorf("missing blob %s", l.Blob)
}
if dgstPair.Descriptor.Annotations == nil {
return nil, errors.Errorf("invalid descriptor without annotations")
}
v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed]
if !ok {
return nil, errors.Errorf("invalid descriptor without uncompressed annotation")
}
diffID, err := digest.Parse(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse uncompressed annotation")
}
eg, groupCtx := errgroup.WithContext(ctx)
tasks := make(chan int, e.config.UploadParallelism)

key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
exists, size, err := e.s3Client.exists(ctx, key)
if err != nil {
return nil, errors.Wrapf(err, "failed to check file presence in cache")
go func() {
for i := range cacheConfig.Layers {
tasks <- i
}
if exists != nil {
if time.Since(*exists) > e.config.TouchRefresh {
err = e.s3Client.touch(ctx, key, size)
close(tasks)
}()

for k := 0; k < e.config.UploadParallelism; k++ {
eg.Go(func() error {
for index := range tasks {
blob := cacheConfig.Layers[index].Blob
dgstPair, ok := descs[blob]
if !ok {
return errors.Errorf("missing blob %s", blob)
}
if dgstPair.Descriptor.Annotations == nil {
return errors.Errorf("invalid descriptor without annotations")
}
v, ok := dgstPair.Descriptor.Annotations[labels.LabelUncompressed]
if !ok {
return errors.Errorf("invalid descriptor without uncompressed annotation")
}
diffID, err := digest.Parse(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to touch file")
return errors.Wrapf(err, "failed to parse uncompressed annotation")
}
}
} else {
layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s", l.Blob))
// TODO: once buildkit uses v2, start using
// https://github.com/containerd/containerd/pull/9657
// currently inline data should never happen.
ra, err := dgstPair.Provider.ReaderAt(ctx, dgstPair.Descriptor)
if err != nil {
return nil, layerDone(errors.Wrap(err, "error reading layer blob from provider"))
}
defer ra.Close()
if err := e.s3Client.saveMutableAt(ctx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
}

la := &v1.LayerAnnotations{
DiffID: diffID,
Size: dgstPair.Descriptor.Size,
MediaType: dgstPair.Descriptor.MediaType,
}
if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok {
var t time.Time
if err := (&t).UnmarshalText([]byte(v)); err != nil {
return nil, err
key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
exists, size, err := e.s3Client.exists(groupCtx, key)
if err != nil {
return errors.Wrapf(err, "failed to check file presence in cache")
}
if exists != nil {
if time.Since(*exists) > e.config.TouchRefresh {
err = e.s3Client.touch(groupCtx, key, size)
if err != nil {
return errors.Wrapf(err, "failed to touch file")
}
}
} else {
layerDone := progress.OneOff(groupCtx, fmt.Sprintf("writing layer %s", blob))
// TODO: once buildkit uses v2, start using
// https://github.com/containerd/containerd/pull/9657
// currently inline data should never happen.
ra, err := dgstPair.Provider.ReaderAt(groupCtx, dgstPair.Descriptor)
if err != nil {
return layerDone(errors.Wrap(err, "error reading layer blob from provider"))
}
defer ra.Close()
if err := e.s3Client.saveMutableAt(groupCtx, key, &nopCloserSectionReader{io.NewSectionReader(ra, 0, ra.Size())}); err != nil {
return layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
}

la := &v1.LayerAnnotations{
DiffID: diffID,
Size: dgstPair.Descriptor.Size,
MediaType: dgstPair.Descriptor.MediaType,
}
if v, ok := dgstPair.Descriptor.Annotations["buildkit/createdat"]; ok {
var t time.Time
if err := (&t).UnmarshalText([]byte(v)); err != nil {
return err
}
la.CreatedAt = t.UTC()
}
cacheConfig.Layers[index].Annotations = la
}
la.CreatedAt = t.UTC()
}
cacheConfig.Layers[i].Annotations = la
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

dt, err := json.Marshal(cacheConfig)
Expand All @@ -253,7 +290,7 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
}

for _, name := range e.config.Names {
if err := e.s3Client.saveMutable(ctx, e.s3Client.manifestKey(name), dt); err != nil {
if err := e.s3Client.saveMutableAt(ctx, e.s3Client.manifestKey(name), bytes.NewReader(dt)); err != nil {
return nil, errors.Wrapf(err, "error writing manifest: %s", name)
}
}
Expand Down Expand Up @@ -430,18 +467,7 @@ func (s3Client *s3Client) getReader(ctx context.Context, key string) (io.ReadClo
return output.Body, nil
}

func (s3Client *s3Client) saveMutable(ctx context.Context, key string, value []byte) error {
input := &s3.PutObjectInput{
Bucket: &s3Client.bucket,
Key: &key,

Body: bytes.NewReader(value),
}
_, err := s3Client.Upload(ctx, input)
return err
}

func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.ReadSeekCloser) error {
func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io.Reader) error {
input := &s3.PutObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
Expand Down

0 comments on commit 6fdac94

Please sign in to comment.