From d3b53a5163f7b8444b937a9fe05d68b83a855fc0 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 14 Feb 2019 19:38:54 +0100 Subject: [PATCH 1/3] add transactions to repositories Now library has a flag to set it transactional and is used with all locations and repositories below it. Transaction is managed from the location with Commit and Rollback methods. While a transaction is being made no other transaction can start. When a transaction starts a new siva filesystem is created for it and the previous cachedFS kept. This way previously created repositories from that location should work the same as before. It should also be able to create new repositories. Signed-off-by: Javi Fontan --- siva/checkpoint_test.go | 2 +- siva/library.go | 12 ++-- siva/library_test.go | 2 +- siva/location.go | 132 +++++++++++++++++++++++++++++++++++++--- siva/repository.go | 21 +++++-- 5 files changed, 148 insertions(+), 21 deletions(-) diff --git a/siva/checkpoint_test.go b/siva/checkpoint_test.go index 9671965..de95fde 100644 --- a/siva/checkpoint_test.go +++ b/siva/checkpoint_test.go @@ -19,7 +19,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(err) fs := memfs.New() - lib := NewLibrary("test", fs) + lib := NewLibrary("test", fs, true) var l borges.Location diff --git a/siva/library.go b/siva/library.go index 7352406..44f7f91 100644 --- a/siva/library.go +++ b/siva/library.go @@ -13,17 +13,19 @@ import ( // Library represents a borges.Library implementation based on siva files. type Library struct { - id borges.LibraryID - fs billy.Filesystem + id borges.LibraryID + fs billy.Filesystem + transactional bool } var _ borges.Library = (*Library)(nil) // NewLibrary creates a new siva.Library. -func NewLibrary(id string, fs billy.Filesystem) *Library { +func NewLibrary(id string, fs billy.Filesystem, transactional bool) *Library { return &Library{ - id: borges.LibraryID(id), - fs: fs, + id: borges.LibraryID(id), + fs: fs, + transactional: transactional, } } diff --git a/siva/library_test.go b/siva/library_test.go index f9a4f0c..157e766 100644 --- a/siva/library_test.go +++ b/siva/library_test.go @@ -14,7 +14,7 @@ func TestLibrary(t *testing.T) { fs := osfs.New("../_testdata/siva") s.LibrarySingle = func() borges.Library { - return NewLibrary("foo", fs) + return NewLibrary("foo", fs, false) } suite.Run(t, s) diff --git a/siva/location.go b/siva/location.go index 6e8c973..d2bc3cc 100644 --- a/siva/location.go +++ b/siva/location.go @@ -9,6 +9,7 @@ import ( sivafs "gopkg.in/src-d/go-billy-siva.v4" billy "gopkg.in/src-d/go-billy.v4" "gopkg.in/src-d/go-billy.v4/memfs" + "gopkg.in/src-d/go-billy.v4/util" "gopkg.in/src-d/go-errors.v1" "gopkg.in/src-d/go-git.v4/config" ) @@ -20,14 +21,21 @@ var ( ErrCannotUseSivaFile = errors.NewKind("cannot use siva file: %s") // ErrMalformedData when checkpoint data is invalid. ErrMalformedData = errors.NewKind("malformed data") + // ErrTransactioning is returned when a second transaction wants to start + // in the same location. + ErrTransactioning = errors.NewKind("already doing a transaction") ) type Location struct { - id borges.LocationID - path string - cachedFS billy.Filesystem - transactional bool - library *Library + id borges.LocationID + path string + // cachedFS billy.Filesystem + cachedFS sivafs.SivaFS + library *Library + + // last good position + checkpoint int64 + transactioning bool } var _ borges.Location = (*Location)(nil) @@ -134,18 +142,22 @@ func fixSiva(fs billy.Filesystem, path string) error { return nil } +func (l *Location) newFS() (sivafs.SivaFS, error) { + return sivafs.NewFilesystem(l.baseFS(), l.path, memfs.New()) +} + // FS returns a filesystem for the location's siva file. -func (l *Location) FS() (billy.Filesystem, error) { +func (l *Location) FS() (sivafs.SivaFS, error) { if l.cachedFS != nil { return l.cachedFS, nil } - err := fixSiva(l.library.fs, l.path) + err := fixSiva(l.baseFS(), l.path) if err != nil { return nil, err } - sfs, err := sivafs.NewFilesystem(l.library.fs, l.path, memfs.New()) + sfs, err := l.newFS() if err != nil { return nil, err } @@ -262,11 +274,113 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er }, nil } +func (l *Location) transactional() bool { + return l.library.transactional +} + +func (l *Location) baseFS() billy.Filesystem { + return l.library.fs +} + +func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) { + if !l.transactional() || mode != borges.RWMode { + return l.FS() + } + + if l.transactioning { + return nil, ErrTransactioning.New() + } + + fs, err := l.newFS() + if err != nil { + return nil, err + } + + size, err := l.writeCheckpoint() + if err != nil { + return nil, err + } + + l.checkpoint = size + l.transactioning = true + + return fs, nil +} + +func (l *Location) checkpointPath() string { + return fmt.Sprintf("%s.checkpoint", l.path) +} + +func (l *Location) deleteCheckpoint() error { + return l.baseFS().Remove(l.checkpointPath()) +} + +func (l *Location) writeCheckpoint() (int64, error) { + var size int64 + s, err := l.baseFS().Stat(l.path) + if err != nil { + if !os.IsNotExist(err) { + return 0, err + } + } else { + size = s.Size() + } + + str := strconv.FormatInt(size, 64) + err = util.WriteFile(l.baseFS(), l.checkpointPath(), []byte(str), 0664) + if err != nil { + return 0, err + } + + return size, nil +} + +func (l *Location) Commit() error { + if !l.transactional() || !l.transactioning { + return nil + } + + l.checkpoint = 0 + l.transactioning = false + l.cachedFS = nil + + return l.deleteCheckpoint() +} + +func (l *Location) Rollback() error { + if !l.transactional() || !l.transactioning { + return nil + } + + if l.checkpoint > 0 { + f, err := l.baseFS().Open(l.path) + if err != nil { + return err + } + defer f.Close() + err = f.Truncate(l.checkpoint) + if err != nil { + return err + } + } else { + err := l.baseFS().Remove(l.path) + if err != nil { + return err + } + } + + l.checkpoint = 0 + l.transactioning = false + l.cachedFS = nil + + return l.deleteCheckpoint() +} + func (l *Location) repository( id borges.RepositoryID, mode borges.Mode, ) (borges.Repository, error) { - fs, err := l.FS() + fs, err := l.setupTransaction(mode) if err != nil { return nil, err } diff --git a/siva/repository.go b/siva/repository.go index 5a2eb17..8e3e267 100644 --- a/siva/repository.go +++ b/siva/repository.go @@ -2,7 +2,8 @@ package siva import ( borges "github.com/src-d/go-borges" - billy "gopkg.in/src-d/go-billy.v4" + + sivafs "gopkg.in/src-d/go-billy-siva.v4" git "gopkg.in/src-d/go-git.v4" "gopkg.in/src-d/go-git.v4/plumbing/cache" "gopkg.in/src-d/go-git.v4/storage/filesystem" @@ -11,7 +12,7 @@ import ( type Repository struct { id borges.RepositoryID repo *git.Repository - fs billy.Filesystem + fs sivafs.SivaFS mode borges.Mode location *Location @@ -21,7 +22,7 @@ var _ borges.Repository = (*Repository)(nil) func NewRepository( id borges.RepositoryID, - fs billy.Filesystem, + fs sivafs.SivaFS, m borges.Mode, l *Location, ) (*Repository, error) { @@ -53,11 +54,21 @@ func (r *Repository) Mode() borges.Mode { } func (r *Repository) Commit() error { - return borges.ErrNotImplemented.New() + err := r.fs.Sync() + if err != nil { + return err + } + + return r.location.Commit() } func (r *Repository) Close() error { - return borges.ErrNotImplemented.New() + err := r.fs.Sync() + if err != nil { + return err + } + + return r.location.Rollback() } func (r *Repository) R() *git.Repository { From 1838ccc3d22f8ee9270b620128542fdf050fa63e Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Fri, 15 Feb 2019 12:12:33 +0100 Subject: [PATCH 2/3] Fix bug writing checkpoint size Signed-off-by: Javi Fontan --- siva/location.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/siva/location.go b/siva/location.go index d2bc3cc..2e6b7fc 100644 --- a/siva/location.go +++ b/siva/location.go @@ -326,7 +326,7 @@ func (l *Location) writeCheckpoint() (int64, error) { size = s.Size() } - str := strconv.FormatInt(size, 64) + str := strconv.FormatInt(size, 10) err = util.WriteFile(l.baseFS(), l.checkpointPath(), []byte(str), 0664) if err != nil { return 0, err From d5cb7094947c67a1e5aead181418b4e9953ecff0 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Fri, 15 Feb 2019 12:14:48 +0100 Subject: [PATCH 3/3] add tests for transaction commit and rollback Signed-off-by: Javi Fontan --- siva/transaction_test.go | 106 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 siva/transaction_test.go diff --git a/siva/transaction_test.go b/siva/transaction_test.go new file mode 100644 index 0000000..ef04f3a --- /dev/null +++ b/siva/transaction_test.go @@ -0,0 +1,106 @@ +package siva + +import ( + "io/ioutil" + "testing" + + borges "github.com/src-d/go-borges" + "github.com/stretchr/testify/require" + "gopkg.in/src-d/go-billy.v4/memfs" + "gopkg.in/src-d/go-billy.v4/util" + git "gopkg.in/src-d/go-git.v4" +) + +func setupTranstaction( + t *testing.T, +) (borges.Location, borges.Repository, borges.Repository) { + t.Helper() + require := require.New(t) + + sivaData, err := ioutil.ReadFile("../_testdata/siva/foo-bar.siva") + require.NoError(err) + + fs := memfs.New() + lib := NewLibrary("test", fs, true) + + err = util.WriteFile(fs, "foo-bar.siva", sivaData, 0666) + require.NoError(err) + l, err := lib.Location("foo-bar") + require.NoError(err) + + // open two repositories, the write one is in transaction mode + r, err := l.Get("github.com/foo/bar", borges.ReadOnlyMode) + require.NoError(err) + w, err := l.Get("github.com/foo/bar", borges.RWMode) + require.NoError(err) + + return l, r, w +} + +func TestCommit(t *testing.T) { + require := require.New(t) + l, r, w := setupTranstaction(t) + + read := r.R() + write := w.R() + + head, err := read.Head() + require.NoError(err) + + // a tag created in the write repo should not be seen in the read one + + _, err = write.CreateTag("new_tag", head.Hash(), nil) + require.NoError(err) + + _, err = read.Tag("new_tag") + require.Equal(git.ErrTagNotFound, err) + + tag, err := write.Tag("new_tag") + require.NoError(err) + require.Equal(head.Hash(), tag.Hash()) + + // newly repositories opened before commit should see the previous state + + r, err = l.Get("github.com/foo/bar", borges.ReadOnlyMode) + require.NoError(err) + + _, err = r.R().Tag("new_tag") + require.Equal(git.ErrTagNotFound, err) + + err = w.Commit() + require.NoError(err) + + // after commit the tag should still not be seen in the read repo + + _, err = read.Tag("new_tag") + require.Equal(git.ErrTagNotFound, err) + + // open the repo again and check that the tag is there + + r, err = l.Get("github.com/foo/bar", borges.ReadOnlyMode) + require.NoError(err) + + _, err = r.R().Tag("new_tag") + require.NoError(err) +} + +func TestRollback(t *testing.T) { + require := require.New(t) + l, _, w := setupTranstaction(t) + + write := w.R() + head, err := write.Head() + require.NoError(err) + + _, err = write.CreateTag("new_tag", head.Hash(), nil) + require.NoError(err) + + err = w.Close() + require.NoError(err) + + r, err := l.Get("github.com/foo/bar", borges.ReadOnlyMode) + require.NoError(err) + + _, err = r.R().Tag("new_tag") + require.Equal(git.ErrTagNotFound, err) +}