Skip to content

Commit

Permalink
oci/cas: Add Engine.CAS() as a step towards decoupling CAS and refs
Browse files Browse the repository at this point in the history
This commit hard-codes the blobs/{algorithm}/{encoded} template [1],
but sets the stage for future work to relax that positioning [2].

I'm adding a PutIndex call in the tests, becase the CAS implementation
now has its own temp directory which is not known to the dirEngine.
Casengine's dir implementation does not use .umoci-* temporary
directories (it uses .casengine-* temporary directories), so it's
protected from Clean.  And the .casengine-* implementation does not
currently provide it's own Clean() implementation, although I may add
that in the future.

The "Deprecated:" syntax is discussed in [3,4,5].

Also adjust Close() to return the first error it encounters, but to
continue to optimistically attempt the remaining cleanup, logging any
subsequent errors.

Bumping go-mtree pulls in [6] and gives us a lowercase sirupsen import
that is compatible with oci-discovery and casengine.

[1]: https://github.com/opencontainers/image-spec/blob/v1.0.0/image-layout.md#blobs
[2]: https://github.com/openSUSE/umoci/pull/190
[3]: https://blog.golang.org/godoc-documenting-go-code
[4]: golang/blog@257114a
[5]: golang/go#10909
[6]: vbatts/go-mtree#144

Signed-off-by: W. Trevor King <wking@tremily.us>
  • Loading branch information
wking committed Nov 4, 2017
1 parent 62bb4bf commit 0611ae1
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 91 deletions.
6 changes: 4 additions & 2 deletions hack/vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ clone github.com/pkg/errors v0.8.0
clone github.com/apex/log afb2e76037a5f36542c77e88ef8aef9f469b09f8
clone github.com/urfave/cli v1.20.0
clone github.com/cyphar/filepath-securejoin v0.2.1
clone github.com/vbatts/go-mtree v0.4.1
clone github.com/Sirupsen/logrus v1.0.3
clone github.com/jtacoma/uritemplates v1.0.0
clone github.com/vbatts/go-mtree 005af4d18f8ab74174ce23565be732a3101cf316
clone github.com/sirupsen/logrus v1.0.3
clone github.com/wking/casengine 3ed08888a9365a2753ab8b809b7efb286566fe8d
clone golang.org/x/net 45e771701b814666a7eb299e6c7a57d0b1799e91 https://github.com/golang/net
# Used purely for testing.
clone github.com/mohae/deepcopy 491d3605edfb866af34a48075bd4355ac1bf46ca
Expand Down
12 changes: 12 additions & 0 deletions oci/cas/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/wking/casengine"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -63,13 +64,20 @@ var (
// Engine is an interface that provides methods for accessing and modifying an
// OCI image, namely allowing access to reference descriptors and blobs.
type Engine interface {
// CAS returns the casengine.Engine backing this engine.
CAS() (casEngine casengine.Engine)

// PutBlob adds a new blob to the image. This is idempotent; a nil error
// means that "the content is stored at DIGEST" without implying "because
// of this PutBlob() call".
//
// Deprecated: Use CAS().Put instead.
PutBlob(ctx context.Context, reader io.Reader) (digest digest.Digest, size int64, err error)

// GetBlob returns a reader for retrieving a blob from the image, which the
// caller must Close(). Returns ErrNotExist if the digest is not found.
//
// Deprecated: Use CAS().Get instead.
GetBlob(ctx context.Context, digest digest.Digest) (reader io.ReadCloser, err error)

// PutIndex sets the index of the OCI image to the given index, replacing
Expand All @@ -92,9 +100,13 @@ type Engine interface {
// DeleteBlob removes a blob from the image. This is idempotent; a nil
// error means "the content is not in the store" without implying "because
// of this DeleteBlob() call".
//
// Deprecated: Use CAS().Delete instead.
DeleteBlob(ctx context.Context, digest digest.Digest) (err error)

// ListBlobs returns the set of blob digests stored in the image.
//
// Deprecated: Use CAS().Digests instead.
ListBlobs(ctx context.Context) (digests []digest.Digest, err error)

// Clean executes a garbage collection of any non-blob garbage in the store
Expand Down
174 changes: 86 additions & 88 deletions oci/cas/dir/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ package dir

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"

"github.com/apex/log"
"github.com/openSUSE/umoci/oci/cas"
"github.com/opencontainers/go-digest"
imeta "github.com/opencontainers/image-spec/specs-go"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/wking/casengine"
"github.com/wking/casengine/counter"
"github.com/wking/casengine/dir"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
)
Expand All @@ -42,6 +48,10 @@ const (
ImageLayoutVersion = "1.0.0"

// blobDirectory is the directory inside an OCI image that contains blobs.
//
// FIXME: if the URI Template currently hard-coded Open() changes,
// then this variable will no longer be meaningful, and its consumers
// will have to be updated to use other logic.
blobDirectory = "blobs"

// indexFile is the file inside an OCI image that contains the top-level
Expand All @@ -53,24 +63,8 @@ const (
layoutFile = "oci-layout"
)

// blobPath returns the path to a blob given its digest, relative to the root
// of the OCI image. The digest must be of the form algorithm:hex.
func blobPath(digest digest.Digest) (string, error) {
if err := digest.Validate(); err != nil {
return "", errors.Wrapf(err, "invalid digest: %q", digest)
}

algo := digest.Algorithm()
hash := digest.Hex()

if algo != cas.BlobAlgorithm {
return "", errors.Errorf("unsupported algorithm: %q", algo)
}

return filepath.Join(blobDirectory, algo.String(), hash), nil
}

type dirEngine struct {
cas casengine.DigestListerEngine
path string
temp string
tempFile *os.File
Expand Down Expand Up @@ -146,56 +140,29 @@ func (e *dirEngine) validate() error {
return nil
}

// CAS returns the casengine.Engine backing this engine.
func (e *dirEngine) CAS() (casEngine casengine.Engine) {
return e.cas
}

// PutBlob adds a new blob to the image. This is idempotent; a nil error
// means that "the content is stored at DIGEST" without implying "because
// of this PutBlob() call".
//
// Deprecated: Use CAS().Put instead.
func (e *dirEngine) PutBlob(ctx context.Context, reader io.Reader) (digest.Digest, int64, error) {
if err := e.ensureTempDir(); err != nil {
return "", -1, errors.Wrap(err, "ensure tempdir")
}

digester := cas.BlobAlgorithm.Digester()

// We copy this into a temporary file because we need to get the blob hash,
// but also to avoid half-writing an invalid blob.
fh, err := ioutil.TempFile(e.temp, "blob-")
if err != nil {
return "", -1, errors.Wrap(err, "create temporary blob")
}
tempPath := fh.Name()
defer fh.Close()

writer := io.MultiWriter(fh, digester.Hash())
size, err := io.Copy(writer, reader)
if err != nil {
return "", -1, errors.Wrap(err, "copy to temporary blob")
}
fh.Close()

// Get the digest.
path, err := blobPath(digester.Digest())
if err != nil {
return "", -1, errors.Wrap(err, "compute blob name")
}

// Move the blob to its correct path.
path = filepath.Join(e.path, path)
if err := os.Rename(tempPath, path); err != nil {
return "", -1, errors.Wrap(err, "rename temporary blob")
}

return digester.Digest(), int64(size), nil
counter := &counter.Counter{}
countedReader := io.TeeReader(reader, counter)
dig, err := e.cas.Put(ctx, cas.BlobAlgorithm, countedReader)
return dig, int64(counter.Count()), err
}

// GetBlob returns a reader for retrieving a blob from the image, which the
// caller must Close(). Returns os.ErrNotExist if the digest is not found.
//
// Deprecated: Use CAS().Get instead.
func (e *dirEngine) GetBlob(ctx context.Context, digest digest.Digest) (io.ReadCloser, error) {
path, err := blobPath(digest)
if err != nil {
return nil, errors.Wrap(err, "compute blob path")
}
fh, err := os.Open(filepath.Join(e.path, path))
return fh, errors.Wrap(err, "open blob")
return e.cas.Get(ctx, digest)
}

// PutIndex sets the index of the OCI image to the given index, replacing the
Expand Down Expand Up @@ -259,36 +226,23 @@ func (e *dirEngine) GetIndex(ctx context.Context) (ispec.Index, error) {
// DeleteBlob removes a blob from the image. This is idempotent; a nil
// error means "the content is not in the store" without implying "because
// of this DeleteBlob() call".
//
// Deprecated: Use CAS().Delete instead.
func (e *dirEngine) DeleteBlob(ctx context.Context, digest digest.Digest) error {
path, err := blobPath(digest)
if err != nil {
return errors.Wrap(err, "compute blob path")
}

err = os.Remove(filepath.Join(e.path, path))
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "remove blob")
}
return nil
return e.cas.Delete(ctx, digest)
}

// ListBlobs returns the set of blob digests stored in the image.
//
// Deprecated: Use CAS().Digests instead.
func (e *dirEngine) ListBlobs(ctx context.Context) ([]digest.Digest, error) {
digests := []digest.Digest{}
blobDir := filepath.Join(e.path, blobDirectory, cas.BlobAlgorithm.String())

if err := filepath.Walk(blobDir, func(path string, _ os.FileInfo, _ error) error {
// Skip the actual directory.
if path == blobDir {
return nil
}

// XXX: Do we need to handle multiple-directory-deep cases?
digest := digest.NewDigestFromHex(cas.BlobAlgorithm.String(), filepath.Base(path))
err := e.cas.Digests(ctx, "", "", -1, 0, func(ctx context.Context, digest digest.Digest) (err error) {
digests = append(digests, digest)
return nil
}); err != nil {
return nil, errors.Wrap(err, "walk blobdir")
})
if err != nil {
return nil, err
}

return digests, nil
Expand Down Expand Up @@ -340,25 +294,69 @@ func (e *dirEngine) cleanPath(ctx context.Context, path string) error {

// Close releases all references held by the e. Subsequent operations may
// fail.
func (e *dirEngine) Close() error {
func (e *dirEngine) Close() (err error) {
ctx := context.Background()
var err2 error
if e.cas != nil {
if err2 = e.cas.Close(ctx); err2 != nil {
err = errors.Wrap(err, "close CAS")
}
}

if e.temp != "" {
if err := unix.Flock(int(e.tempFile.Fd()), unix.LOCK_UN); err != nil {
return errors.Wrap(err, "unlock tempdir")
if err2 := unix.Flock(int(e.tempFile.Fd()), unix.LOCK_UN); err2 != nil {
err2 = errors.Wrap(err2, "unlock tempdir")
if err == nil {
err = err2
}
}
if err := e.tempFile.Close(); err != nil {
return errors.Wrap(err, "close tempdir")
if err2 := e.tempFile.Close(); err2 != nil {
err2 = errors.Wrap(err2, "close tempdir")
if err == nil {
err = err2
}
}
if err := os.RemoveAll(e.temp); err != nil {
return errors.Wrap(err, "remove tempdir")
if err2 := os.RemoveAll(e.temp); err != nil {
err2 = errors.Wrap(err2, "remove tempdir")
if err == nil {
err = err2
}
}
}
return nil
return err
}

// Open opens a new reference to the directory-backed OCI image referenced by
// the provided path.
func Open(path string) (cas.Engine, error) {
ctx := context.Background()
uri := "blobs/{algorithm}/{encoded}"

pattern := `^blobs/(?P<algorithm>[a-z0-9+._-]+)/(?P<encoded>[a-zA-Z0-9=_-]{1,})$`
if filepath.Separator != '/' {
if filepath.Separator == '\\' {
pattern = strings.Replace(pattern, "/", `\\`, -1)
} else {
return nil, fmt.Errorf("unknown path separator %q", string(filepath.Separator))
}
}

getDigestRegexp, err := regexp.Compile(pattern)
if err != nil {
return nil, errors.Wrap(err, "get-digest regexp")
}

getDigest := &dir.RegexpGetDigest{
Regexp: getDigestRegexp,
}

casEngine, err := dir.NewDigestListerEngine(ctx, path, uri, getDigest.GetDigest)
if err != nil {
return nil, errors.Wrap(err, "initialize CAS engine")
}

engine := &dirEngine{
cas: casEngine,
path: path,
temp: "",
}
Expand Down
10 changes: 9 additions & 1 deletion oci/cas/dir/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"testing"

"github.com/openSUSE/umoci/oci/cas"
imeta "github.com/opencontainers/image-spec/specs-go"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -232,8 +234,14 @@ func TestEngineGCLocking(t *testing.T) {
t.Errorf("PutBlob: length doesn't match: expected=%d got=%d", len(content), size)
}

err = engine.PutIndex(ctx, ispec.Index{
Versioned: imeta.Versioned{
SchemaVersion: 2, // FIXME: This is hardcoded at the moment.
},
})

if engine.(*dirEngine).temp == "" {
t.Errorf("engine doesn't have a tempdir after putting a blob!")
t.Errorf("engine doesn't have a tempdir after putting an index!")
}

// Create umoci and other directories and files to make sure things work.
Expand Down

0 comments on commit 0611ae1

Please sign in to comment.