Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

siva: location registry + transactioner #8

Merged
merged 4 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions siva/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ var (

const checkpointExtension = ".checkpoint"

// Checkpoint tracks the status of a siva file and creates checkpoints to be
// checkpoint tracks the status of a siva file and creates checkpoints to be
// able to return back to a known state of that siva file.
type Checkpoint struct {
type checkpoint struct {
offset int64
baseFs billy.Filesystem
path string
persist string
}

// NewCheckpoint builds a new Checkpoint.
func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) {
// newCheckpoint builds a new Checkpoint.
func newCheckpoint(fs billy.Filesystem, path string) (*checkpoint, error) {
persist := path + checkpointExtension

if _, err := fs.Stat(path); err != nil && os.IsNotExist(err) {
Expand All @@ -38,7 +38,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) {
borges.ErrLocationNotExists.New(path))
}

c := &Checkpoint{
c := &checkpoint{
baseFs: fs,
path: path,
persist: persist,
Expand All @@ -64,7 +64,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) {

// Apply applies if necessary the operations on the siva file to
// leave it in the last correct state the checkpoint keeps.
func (c *Checkpoint) Apply() error {
func (c *checkpoint) Apply() error {
if c.offset > 0 {
f, err := c.baseFs.Open(c.path)
if err != nil {
Expand All @@ -82,7 +82,7 @@ func (c *Checkpoint) Apply() error {
}

// Save saves the current state of the siva file.
func (c *Checkpoint) Save() error {
func (c *checkpoint) Save() error {
info, err := c.baseFs.Stat(c.path)
if err != nil {
return ErrCannotUseSivaFile.Wrap(err)
Expand All @@ -98,7 +98,7 @@ func (c *Checkpoint) Save() error {
}

// Reset resets the checkpoint.
func (c *Checkpoint) Reset() error {
func (c *checkpoint) Reset() error {
if err := cleanup(c.baseFs, c.persist); err != nil {
return ErrCannotUseCheckpointFile.Wrap(err)
}
Expand Down
3 changes: 2 additions & 1 deletion siva/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestCheckpoint(t *testing.T) {
require.NoError(err)

fs := memfs.New()
lib := NewLibrary("test", fs, true)
lib, err := NewLibrary("test", fs, LibraryOptions{Transactional: true})
require.NoError(err)

var l borges.Location

Expand Down
3 changes: 3 additions & 0 deletions siva/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type repositoryIterator struct {

var _ borges.RepositoryIterator = (*repositoryIterator)(nil)

// Next implements the borges.RepositoryIterator interface.
func (i *repositoryIterator) Next() (borges.Repository, error) {
for {
if i.pos >= len(i.remotes) {
Expand All @@ -39,6 +40,7 @@ func (i *repositoryIterator) Next() (borges.Repository, error) {
}
}

// ForEach implements the borges.RepositoryIterator interface.
func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error {
for {
r, err := i.Next()
Expand All @@ -56,4 +58,5 @@ func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error {
}
}

// Close implements the borges.RepositoryIterator interface.
func (i *repositoryIterator) Close() {}
40 changes: 36 additions & 4 deletions siva/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

borges "github.com/src-d/go-borges"

"github.com/src-d/go-borges/util"
billy "gopkg.in/src-d/go-billy.v4"
butil "gopkg.in/src-d/go-billy.v4/util"
Expand All @@ -16,17 +17,37 @@ type Library struct {
id borges.LibraryID
fs billy.Filesystem
transactional bool
locReg *locationRegistry
}

// LibraryOptions hold configuration options for the library.
type LibraryOptions struct {
// Transactional enables transactions for repository writes.
Transactional bool
// RegistryCache is the maximum number of locations in the cache. A value
// of 0 disables the cache.
RegistryCache int
}

var _ borges.Library = (*Library)(nil)

// NewLibrary creates a new siva.Library.
func NewLibrary(id string, fs billy.Filesystem, transactional bool) *Library {
func NewLibrary(
id string,
fs billy.Filesystem,
ops LibraryOptions,
) (*Library, error) {
lr, err := newLocationRegistry(ops.RegistryCache)
if err != nil {
return nil, err
}

return &Library{
id: borges.LibraryID(id),
fs: fs,
transactional: transactional,
}
transactional: ops.Transactional,
locReg: lr,
}, nil
}

// ID implements borges.Library interface.
Expand Down Expand Up @@ -114,8 +135,19 @@ func (l *Library) Repositories(mode borges.Mode) (borges.RepositoryIterator, err

// Location implements borges.Library interface.
func (l *Library) Location(id borges.LocationID) (borges.Location, error) {
if loc, ok := l.locReg.Get(id); ok {
return loc, nil
}

path := fmt.Sprintf("%s.siva", id)
return NewLocation(id, l, path)
loc, err := NewLocation(id, l, path)
if err != nil {
return nil, err
}

l.locReg.Add(loc)

return loc, nil
}

// Locations implements borges.Library interface.
Expand Down
6 changes: 5 additions & 1 deletion siva/library_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

borges "github.com/src-d/go-borges"
"github.com/src-d/go-borges/test"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"gopkg.in/src-d/go-billy.v4/osfs"
)
Expand All @@ -14,7 +15,10 @@ func TestLibrary(t *testing.T) {
fs := osfs.New("../_testdata/siva")

s.LibrarySingle = func() borges.Library {
return NewLibrary("foo", fs, false)
lib, err := NewLibrary("foo", fs, LibraryOptions{})
require.NoError(t, err)

return lib
}

suite.Run(t, s)
Expand Down
78 changes: 38 additions & 40 deletions siva/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,42 @@ import (
"gopkg.in/src-d/go-git.v4/config"
)

var (
// ErrMalformedData when checkpoint data is invalid.
ErrMalformedData = errors.NewKind("malformed data")
// ErrTransactioning is returned when a second transaction wants to start
// in the same location.
ErrTransactioning = errors.NewKind("already doing a transaction")
)
// ErrMalformedData when checkpoint data is invalid.
var ErrMalformedData = errors.NewKind("malformed data")

// Location represents a siva file archiving several git repositories.
type Location struct {
id borges.LocationID
path string
// cachedFS billy.Filesystem
cachedFS sivafs.SivaFS
library *Library

// last good position
checkpoint *Checkpoint
transactioning bool
id borges.LocationID
path string
cachedFS sivafs.SivaFS
lib *Library
checkpoint *checkpoint
txer *transactioner
}

var _ borges.Location = (*Location)(nil)

func NewLocation(id borges.LocationID, l *Library, path string) (*Location, error) {
checkpoint, err := NewCheckpoint(l.fs, path)
// NewLocation creates a new Location object.
func NewLocation(id borges.LocationID, lib *Library, path string) (*Location, error) {
cp, err := newCheckpoint(lib.fs, path)
if err != nil {
return nil, err
}

location := &Location{
loc := &Location{
id: id,
path: path,
library: l,
checkpoint: checkpoint,
lib: lib,
checkpoint: cp,
}

_, err = location.FS()
_, err = loc.FS()
if err != nil {
return nil, err
}

return location, nil
loc.txer = newTransactioner(loc, lib.locReg)
return loc, nil
}

func (l *Location) newFS() (sivafs.SivaFS, error) {
Expand All @@ -75,10 +70,12 @@ func (l *Location) FS() (sivafs.SivaFS, error) {
return sfs, nil
}

// ID implements the borges.Location interface.
func (l *Location) ID() borges.LocationID {
return l.id
}

// Init implements the borges.Location interface.
func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) {
has, err := l.Has(id)
if err != nil {
Expand Down Expand Up @@ -111,6 +108,7 @@ func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) {
return repo, nil
}

// Get implements the borges.Location interface.
func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Repository, error) {
has, err := l.Has(id)
if err != nil {
Expand All @@ -124,6 +122,7 @@ func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Reposit
return l.repository(id, mode)
}

// GetOrInit implements the borges.Location interface.
func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error) {
has, err := l.Has(id)
if err != nil {
Expand All @@ -137,6 +136,7 @@ func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error)
return l.Init(id)
}

// Has implements the borges.Location interface.
func (l *Location) Has(name borges.RepositoryID) (bool, error) {
repo, err := l.repository("", borges.ReadOnlyMode)
if err != nil {
Expand All @@ -159,6 +159,7 @@ func (l *Location) Has(name borges.RepositoryID) (bool, error) {
return false, nil
}

// Repositories implements the borges.Location interface.
func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, error) {
var remotes []*config.RemoteConfig

Expand All @@ -183,21 +184,17 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er
}, nil
}

func (l *Location) transactional() bool {
return l.library.transactional
}

func (l *Location) baseFS() billy.Filesystem {
return l.library.fs
return l.lib.fs
}

func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) {
if !l.transactional() || mode != borges.RWMode {
func (l *Location) getRepoFs(id borges.RepositoryID, mode borges.Mode) (sivafs.SivaFS, error) {
if !l.lib.transactional || mode != borges.RWMode {
return l.FS()
}

if l.transactioning {
return nil, ErrTransactioning.New()
if err := l.txer.Start(); err != nil {
return nil, err
}

fs, err := l.newFS()
Expand All @@ -209,34 +206,35 @@ func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) {
return nil, err
}

l.transactioning = true
return fs, nil
}

func (l *Location) Commit() error {
if !l.transactional() || !l.transactioning {
// Commit persists transactional or write operations performed on the repositories.
func (l *Location) Commit(mode borges.Mode) error {
if !l.lib.transactional || mode != borges.RWMode {
return nil
}

defer l.txer.Stop()
if err := l.checkpoint.Reset(); err != nil {
return err
}

l.transactioning = false
l.cachedFS = nil
return nil
}

func (l *Location) Rollback() error {
if !l.transactional() || !l.transactioning {
// Rollback discard transactional or write operations performed on the repositories.
func (l *Location) Rollback(mode borges.Mode) error {
if !l.lib.transactional || mode != borges.RWMode {
return nil
}

defer l.txer.Stop()
if err := l.checkpoint.Apply(); err != nil {
return err
}

l.transactioning = false
l.cachedFS = nil
return nil
}
Expand All @@ -245,7 +243,7 @@ func (l *Location) repository(
id borges.RepositoryID,
mode borges.Mode,
) (borges.Repository, error) {
fs, err := l.setupTransaction(mode)
fs, err := l.getRepoFs(id, mode)
if err != nil {
return nil, err
}
Expand Down
Loading