Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add transactions to repositories #4

Merged
merged 3 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion siva/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(err)

fs := memfs.New()
lib := NewLibrary("test", fs)
lib := NewLibrary("test", fs, true)

var l borges.Location

Expand Down
12 changes: 7 additions & 5 deletions siva/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ import (

// Library represents a borges.Library implementation based on siva files.
type Library struct {
id borges.LibraryID
fs billy.Filesystem
id borges.LibraryID
fs billy.Filesystem
transactional bool
}

var _ borges.Library = (*Library)(nil)

// NewLibrary creates a new siva.Library.
func NewLibrary(id string, fs billy.Filesystem) *Library {
func NewLibrary(id string, fs billy.Filesystem, transactional bool) *Library {
return &Library{
id: borges.LibraryID(id),
fs: fs,
id: borges.LibraryID(id),
fs: fs,
transactional: transactional,
}
}

Expand Down
2 changes: 1 addition & 1 deletion siva/library_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestLibrary(t *testing.T) {
fs := osfs.New("../_testdata/siva")

s.LibrarySingle = func() borges.Library {
return NewLibrary("foo", fs)
return NewLibrary("foo", fs, false)
}

suite.Run(t, s)
Expand Down
132 changes: 123 additions & 9 deletions siva/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sivafs "gopkg.in/src-d/go-billy-siva.v4"
billy "gopkg.in/src-d/go-billy.v4"
"gopkg.in/src-d/go-billy.v4/memfs"
"gopkg.in/src-d/go-billy.v4/util"
"gopkg.in/src-d/go-errors.v1"
"gopkg.in/src-d/go-git.v4/config"
)
Expand All @@ -20,14 +21,21 @@ var (
ErrCannotUseSivaFile = errors.NewKind("cannot use siva file: %s")
// ErrMalformedData when checkpoint data is invalid.
ErrMalformedData = errors.NewKind("malformed data")
// ErrTransactioning is returned when a second transaction wants to start
// in the same location.
ErrTransactioning = errors.NewKind("already doing a transaction")
)

type Location struct {
id borges.LocationID
path string
cachedFS billy.Filesystem
transactional bool
library *Library
id borges.LocationID
path string
// cachedFS billy.Filesystem
cachedFS sivafs.SivaFS
library *Library

// last good position
checkpoint int64
transactioning bool
}

var _ borges.Location = (*Location)(nil)
Expand Down Expand Up @@ -134,18 +142,22 @@ func fixSiva(fs billy.Filesystem, path string) error {
return nil
}

func (l *Location) newFS() (sivafs.SivaFS, error) {
return sivafs.NewFilesystem(l.baseFS(), l.path, memfs.New())
}

// FS returns a filesystem for the location's siva file.
func (l *Location) FS() (billy.Filesystem, error) {
func (l *Location) FS() (sivafs.SivaFS, error) {
if l.cachedFS != nil {
return l.cachedFS, nil
}

err := fixSiva(l.library.fs, l.path)
err := fixSiva(l.baseFS(), l.path)
if err != nil {
return nil, err
}

sfs, err := sivafs.NewFilesystem(l.library.fs, l.path, memfs.New())
sfs, err := l.newFS()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -262,11 +274,113 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er
}, nil
}

func (l *Location) transactional() bool {
return l.library.transactional
}

func (l *Location) baseFS() billy.Filesystem {
return l.library.fs
}

func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) {
if !l.transactional() || mode != borges.RWMode {
return l.FS()
}

if l.transactioning {
return nil, ErrTransactioning.New()
}

fs, err := l.newFS()
if err != nil {
return nil, err
}

size, err := l.writeCheckpoint()
if err != nil {
return nil, err
}

l.checkpoint = size
l.transactioning = true

return fs, nil
}

func (l *Location) checkpointPath() string {
return fmt.Sprintf("%s.checkpoint", l.path)
}

func (l *Location) deleteCheckpoint() error {
return l.baseFS().Remove(l.checkpointPath())
}

func (l *Location) writeCheckpoint() (int64, error) {
var size int64
s, err := l.baseFS().Stat(l.path)
if err != nil {
if !os.IsNotExist(err) {
return 0, err
}
} else {
size = s.Size()
}

str := strconv.FormatInt(size, 10)
err = util.WriteFile(l.baseFS(), l.checkpointPath(), []byte(str), 0664)
if err != nil {
return 0, err
}

return size, nil
}

func (l *Location) Commit() error {
if !l.transactional() || !l.transactioning {
return nil
}

l.checkpoint = 0
l.transactioning = false
l.cachedFS = nil

return l.deleteCheckpoint()
}

func (l *Location) Rollback() error {
if !l.transactional() || !l.transactioning {
return nil
}

if l.checkpoint > 0 {
f, err := l.baseFS().Open(l.path)
if err != nil {
return err
}
defer f.Close()
err = f.Truncate(l.checkpoint)
if err != nil {
return err
}
} else {
err := l.baseFS().Remove(l.path)
if err != nil {
return err
}
}

l.checkpoint = 0
l.transactioning = false
l.cachedFS = nil

return l.deleteCheckpoint()
}

func (l *Location) repository(
id borges.RepositoryID,
mode borges.Mode,
) (borges.Repository, error) {
fs, err := l.FS()
fs, err := l.setupTransaction(mode)
if err != nil {
return nil, err
}
Expand Down
21 changes: 16 additions & 5 deletions siva/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package siva

import (
borges "github.com/src-d/go-borges"
billy "gopkg.in/src-d/go-billy.v4"

sivafs "gopkg.in/src-d/go-billy-siva.v4"
git "gopkg.in/src-d/go-git.v4"
"gopkg.in/src-d/go-git.v4/plumbing/cache"
"gopkg.in/src-d/go-git.v4/storage/filesystem"
Expand All @@ -11,7 +12,7 @@ import (
type Repository struct {
id borges.RepositoryID
repo *git.Repository
fs billy.Filesystem
fs sivafs.SivaFS
mode borges.Mode

location *Location
Expand All @@ -21,7 +22,7 @@ var _ borges.Repository = (*Repository)(nil)

func NewRepository(
id borges.RepositoryID,
fs billy.Filesystem,
fs sivafs.SivaFS,
m borges.Mode,
l *Location,
) (*Repository, error) {
Expand Down Expand Up @@ -53,11 +54,21 @@ func (r *Repository) Mode() borges.Mode {
}

func (r *Repository) Commit() error {
return borges.ErrNotImplemented.New()
err := r.fs.Sync()
if err != nil {
return err
}

return r.location.Commit()
}

func (r *Repository) Close() error {
return borges.ErrNotImplemented.New()
err := r.fs.Sync()
if err != nil {
return err
}

return r.location.Rollback()
}

func (r *Repository) R() *git.Repository {
Expand Down
106 changes: 106 additions & 0 deletions siva/transaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package siva

import (
"io/ioutil"
"testing"

borges "github.com/src-d/go-borges"
"github.com/stretchr/testify/require"
"gopkg.in/src-d/go-billy.v4/memfs"
"gopkg.in/src-d/go-billy.v4/util"
git "gopkg.in/src-d/go-git.v4"
)

func setupTranstaction(
t *testing.T,
) (borges.Location, borges.Repository, borges.Repository) {
t.Helper()
require := require.New(t)

sivaData, err := ioutil.ReadFile("../_testdata/siva/foo-bar.siva")
require.NoError(err)

fs := memfs.New()
lib := NewLibrary("test", fs, true)

err = util.WriteFile(fs, "foo-bar.siva", sivaData, 0666)
require.NoError(err)
l, err := lib.Location("foo-bar")
require.NoError(err)

// open two repositories, the write one is in transaction mode
r, err := l.Get("github.com/foo/bar", borges.ReadOnlyMode)
require.NoError(err)
w, err := l.Get("github.com/foo/bar", borges.RWMode)
require.NoError(err)

return l, r, w
}

func TestCommit(t *testing.T) {
require := require.New(t)
l, r, w := setupTranstaction(t)

read := r.R()
write := w.R()

head, err := read.Head()
require.NoError(err)

// a tag created in the write repo should not be seen in the read one

_, err = write.CreateTag("new_tag", head.Hash(), nil)
require.NoError(err)

_, err = read.Tag("new_tag")
require.Equal(git.ErrTagNotFound, err)

tag, err := write.Tag("new_tag")
require.NoError(err)
require.Equal(head.Hash(), tag.Hash())

// newly repositories opened before commit should see the previous state

r, err = l.Get("github.com/foo/bar", borges.ReadOnlyMode)
require.NoError(err)

_, err = r.R().Tag("new_tag")
require.Equal(git.ErrTagNotFound, err)

err = w.Commit()
require.NoError(err)

// after commit the tag should still not be seen in the read repo

_, err = read.Tag("new_tag")
require.Equal(git.ErrTagNotFound, err)

// open the repo again and check that the tag is there

r, err = l.Get("github.com/foo/bar", borges.ReadOnlyMode)
require.NoError(err)

_, err = r.R().Tag("new_tag")
require.NoError(err)
}

func TestRollback(t *testing.T) {
require := require.New(t)
l, _, w := setupTranstaction(t)

write := w.R()
head, err := write.Head()
require.NoError(err)

_, err = write.CreateTag("new_tag", head.Hash(), nil)
require.NoError(err)

err = w.Close()
require.NoError(err)

r, err := l.Get("github.com/foo/bar", borges.ReadOnlyMode)
require.NoError(err)

_, err = r.R().Tag("new_tag")
require.Equal(git.ErrTagNotFound, err)
}