Skip to content

Commit

Permalink
Merge pull request #2501 from sipsma/mergeop-inline-cache
Browse files Browse the repository at this point in the history
remote cache: support arbitrary layers as results
  • Loading branch information
tonistiigi authored Jan 16, 2022
2 parents 2dc3e74 + ce012ab commit cfdaaa4
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 94 deletions.
70 changes: 57 additions & 13 deletions cache/remotecache/inline/inline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -73,14 +74,55 @@ func (ce *exporter) ExportForLayers(ctx context.Context, layers []digest.Digest)
return nil, nil
}

cache := map[int]int{}

// reorder layers based on the order in the image
blobIndexes := make(map[digest.Digest]int, len(layers))
for i, blob := range layers {
blobIndexes[blob] = i
}

for i, r := range cfg.Records {
for j, rr := range r.Results {
n := getSortedLayerIndex(rr.LayerIndex, cfg.Layers, cache)
rr.LayerIndex = n
r.Results[j] = rr
resultBlobs := layerToBlobs(rr.LayerIndex, cfg.Layers)
// match being true means the result is in the same order as the image
var match bool
if len(resultBlobs) <= len(layers) {
match = true
for k, resultBlob := range resultBlobs {
layerBlob := layers[k]
if resultBlob != layerBlob {
match = false
break
}
}
}
if match {
// The layers of the result are in the same order as the image, so we can
// specify it just using the CacheResult struct and specifying LayerIndex
// as the top-most layer of the result.
rr.LayerIndex = len(resultBlobs) - 1
r.Results[j] = rr
} else {
// The layers of the result are not in the same order as the image, so we
// have to use ChainedResult to specify each layer of the result individually.
chainedResult := v1.ChainedResult{}
for _, resultBlob := range resultBlobs {
idx, ok := blobIndexes[resultBlob]
if !ok {
return nil, errors.Errorf("failed to find blob %s in layers", resultBlob)
}
chainedResult.LayerIndexes = append(chainedResult.LayerIndexes, idx)
}
r.Results[j] = v1.CacheResult{}
r.ChainedResults = append(r.ChainedResults, chainedResult)
}
// remove any CacheResults that had to be converted to the ChainedResult format.
var filteredResults []v1.CacheResult
for _, rr := range r.Results {
if rr != (v1.CacheResult{}) {
filteredResults = append(filteredResults, rr)
}
}
r.Results = filteredResults
cfg.Records[i] = r
}
}
Expand All @@ -94,14 +136,16 @@ func (ce *exporter) ExportForLayers(ctx context.Context, layers []digest.Digest)
return dt, nil
}

func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[int]int) int {
if idx == -1 {
return -1
func layerToBlobs(idx int, layers []v1.CacheLayer) []digest.Digest {
var ds []digest.Digest
for idx != -1 {
layer := layers[idx]
ds = append(ds, layer.Blob)
idx = layer.ParentIndex
}
l := layers[idx]
if i, ok := cache[idx]; ok {
return i
// reverse so they go lowest to highest
for i, j := 0, len(ds)-1; i < j; i, j = i+1, j-1 {
ds[i], ds[j] = ds[j], ds[i]
}
cache[idx] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1
return cache[idx]
return ds
}
25 changes: 25 additions & 0 deletions cache/remotecache/v1/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.
}
}

for _, res := range rec.ChainedResults {
remote := &solver.Remote{}
mp := contentutil.NewMultiProvider(nil)
for _, diff := range res.LayerIndexes {
if diff < 0 || diff >= len(cc.Layers) {
return nil, errors.Errorf("invalid layer index %d", diff)
}

l := cc.Layers[diff]

descPair, ok := provider[l.Blob]
if !ok {
remote = nil
break
}

remote.Descriptors = append(remote.Descriptors, descPair.Descriptor)
mp.Add(descPair.Descriptor.Digest, descPair.Provider)
}
if remote != nil {
remote.Provider = mp
r.AddResult(res.CreatedAt, remote)
}
}

cache[idx] = r
return r, nil
}
Expand Down
12 changes: 9 additions & 3 deletions cache/remotecache/v1/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@ type LayerAnnotations struct {
}

type CacheRecord struct {
Results []CacheResult `json:"layers,omitempty"`
Digest digest.Digest `json:"digest,omitempty"`
Inputs [][]CacheInput `json:"inputs,omitempty"`
Results []CacheResult `json:"layers,omitempty"`
ChainedResults []ChainedResult `json:"chains,omitempty"`
Digest digest.Digest `json:"digest,omitempty"`
Inputs [][]CacheInput `json:"inputs,omitempty"`
}

type CacheResult struct {
LayerIndex int `json:"layer"`
CreatedAt time.Time `json:"createdAt,omitempty"`
}

type ChainedResult struct {
LayerIndexes []int `json:"layers"`
CreatedAt time.Time `json:"createdAt,omitempty"`
}

type CacheInput struct {
Selector string `json:"selector,omitempty"`
LinkIndex int `json:"link"`
Expand Down
187 changes: 150 additions & 37 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func TestIntegration(t *testing.T) {
testBuildExportZstd,
testPullZstdImage,
testMergeOp,
testMergeOpCache,
testMergeOpCacheInline,
testMergeOpCacheMin,
testMergeOpCacheMax,
testRmSymlink,
testMoveParentDir,
)
Expand Down Expand Up @@ -4117,7 +4119,20 @@ func testMergeOp(t *testing.T, sb integration.Sandbox) {
)
}

func testMergeOpCache(t *testing.T, sb integration.Sandbox) {
func testMergeOpCacheInline(t *testing.T, sb integration.Sandbox) {
testMergeOpCache(t, sb, "inline")
}

func testMergeOpCacheMin(t *testing.T, sb integration.Sandbox) {
testMergeOpCache(t, sb, "min")
}

func testMergeOpCacheMax(t *testing.T, sb integration.Sandbox) {
testMergeOpCache(t, sb, "max")
}

func testMergeOpCache(t *testing.T, sb integration.Sandbox, mode string) {
t.Helper()
skipDockerd(t, sb)
requiresLinux(t)

Expand Down Expand Up @@ -4205,6 +4220,51 @@ func testMergeOpCache(t *testing.T, sb integration.Sandbox) {

target := registry + "/buildkit/testmerge:latest"
cacheTarget := registry + "/buildkit/testmergecache:latest"

var cacheExports []CacheOptionsEntry
var cacheImports []CacheOptionsEntry
switch mode {
case "inline":
cacheExports = []CacheOptionsEntry{{
Type: "inline",
}}
cacheImports = []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": target,
},
}}
case "min":
cacheExports = []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}}
cacheImports = []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}}
case "max":
cacheExports = []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
"mode": "max",
},
}}
cacheImports = []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}}
default:
require.Fail(t, "unknown cache mode: %s", mode)
}

_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Expand All @@ -4215,12 +4275,7 @@ func testMergeOpCache(t *testing.T, sb integration.Sandbox) {
},
},
},
CacheExports: []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}},
CacheExports: cacheExports,
}, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -4273,22 +4328,12 @@ func testMergeOpCache(t *testing.T, sb integration.Sandbox) {
"push": "true",
},
}},
CacheImports: []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}},
CacheExports: []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}},
CacheImports: cacheImports,
CacheExports: cacheExports,
}, nil)
require.NoError(t, err)

// verify everything from before stayed lazy except the middle layer for input1Copy
// verify everything from before stayed lazy
img, err = imageService.Get(ctx, target)
require.NoError(t, err)

Expand Down Expand Up @@ -4319,18 +4364,8 @@ func testMergeOpCache(t *testing.T, sb integration.Sandbox) {
"push": "true",
},
}},
CacheImports: []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}},
CacheExports: []CacheOptionsEntry{{
Type: "registry",
Attrs: map[string]string{
"ref": cacheTarget,
},
}},
CacheExports: cacheExports,
CacheImports: cacheImports,
}, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -4368,19 +4403,97 @@ func testMergeOpCache(t *testing.T, sb integration.Sandbox) {
OutputDir: destDir,
},
},
CacheImports: []CacheOptionsEntry{{
Type: "registry",
CacheImports: cacheImports,
}, nil)
require.NoError(t, err)

newBar2Contents, err := ioutil.ReadFile(filepath.Join(destDir, "bar", "2"))
require.NoError(t, err)

require.Equalf(t, bar2Contents, newBar2Contents, "bar/2 contents changed")

// Now test the case with a layer on top of a merge.
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)

for _, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.ErrorIs(t, err, ctderrdefs.ErrNotFound, "unexpected error %v", err)
}

mergePlusLayer := merge.File(llb.Mkfile("/3", 0444, nil))

def, err = mergePlusLayer.Marshal(sb.Context())
require.NoError(t, err)

_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{{
Type: ExporterImage,
Attrs: map[string]string{
"ref": cacheTarget,
"name": target,
"push": "true",
},
}},
CacheExports: cacheExports,
CacheImports: cacheImports,
}, nil)
require.NoError(t, err)

newBar2Contents, err := ioutil.ReadFile(filepath.Join(destDir, "bar", "2"))
// check the random value at /bar/2 didn't change
destDir, err = ioutil.TempDir("", "buildkit")
require.NoError(t, err)
defer os.RemoveAll(destDir)

_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterLocal,
OutputDir: destDir,
},
},
CacheImports: cacheImports,
}, nil)
require.NoError(t, err)

newBar2Contents, err = ioutil.ReadFile(filepath.Join(destDir, "bar", "2"))
require.NoError(t, err)

require.Equalf(t, bar2Contents, newBar2Contents, "bar/2 contents changed")

// clear local state, repeat the build, verify everything stays lazy
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)

for _, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.ErrorIs(t, err, ctderrdefs.ErrNotFound, "unexpected error %v", err)
}

_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{{
Type: ExporterImage,
Attrs: map[string]string{
"name": target,
"push": "true",
},
}},
CacheImports: cacheImports,
CacheExports: cacheExports,
}, nil)
require.NoError(t, err)

img, err = imageService.Get(ctx, target)
require.NoError(t, err)

manifest, err = images.Manifest(ctx, contentStore, img.Target, nil)
require.NoError(t, err)

for i, layer := range manifest.Layers {
_, err = contentStore.Info(ctx, layer.Digest)
require.ErrorIs(t, err, ctderrdefs.ErrNotFound, "unexpected error %v for index %d", err, i)
}
}

func requireContents(ctx context.Context, t *testing.T, c *Client, sb integration.Sandbox, state llb.State, cacheImports, cacheExports []CacheOptionsEntry, imageTarget string, files ...fstest.Applier) {
Expand Down
Loading

0 comments on commit cfdaaa4

Please sign in to comment.