diff --git a/siva/checkpoint.go b/siva/checkpoint.go index 2ba9276..ac957ed 100644 --- a/siva/checkpoint.go +++ b/siva/checkpoint.go @@ -19,17 +19,17 @@ var ( const checkpointExtension = ".checkpoint" -// Checkpoint tracks the status of a siva file and creates checkpoints to be +// checkpoint tracks the status of a siva file and creates checkpoints to be // able to return back to a known state of that siva file. -type Checkpoint struct { +type checkpoint struct { offset int64 baseFs billy.Filesystem path string persist string } -// NewCheckpoint builds a new Checkpoint. -func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { +// newCheckpoint builds a new Checkpoint. +func newCheckpoint(fs billy.Filesystem, path string) (*checkpoint, error) { persist := path + checkpointExtension if _, err := fs.Stat(path); err != nil && os.IsNotExist(err) { @@ -38,7 +38,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { borges.ErrLocationNotExists.New(path)) } - c := &Checkpoint{ + c := &checkpoint{ baseFs: fs, path: path, persist: persist, @@ -64,7 +64,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { // Apply applies if necessary the operations on the siva file to // leave it in the last correct state the checkpoint keeps. -func (c *Checkpoint) Apply() error { +func (c *checkpoint) Apply() error { if c.offset > 0 { f, err := c.baseFs.Open(c.path) if err != nil { @@ -82,7 +82,7 @@ func (c *Checkpoint) Apply() error { } // Save saves the current state of the siva file. -func (c *Checkpoint) Save() error { +func (c *checkpoint) Save() error { info, err := c.baseFs.Stat(c.path) if err != nil { return ErrCannotUseSivaFile.Wrap(err) @@ -98,7 +98,7 @@ func (c *Checkpoint) Save() error { } // Reset resets the checkpoint. -func (c *Checkpoint) Reset() error { +func (c *checkpoint) Reset() error { if err := cleanup(c.baseFs, c.persist); err != nil { return ErrCannotUseCheckpointFile.Wrap(err) } diff --git a/siva/checkpoint_test.go b/siva/checkpoint_test.go index de95fde..8d0ab4d 100644 --- a/siva/checkpoint_test.go +++ b/siva/checkpoint_test.go @@ -19,7 +19,8 @@ func TestCheckpoint(t *testing.T) { require.NoError(err) fs := memfs.New() - lib := NewLibrary("test", fs, true) + lib, err := NewLibrary("test", fs, LibraryOptions{Transactional: true}) + require.NoError(err) var l borges.Location diff --git a/siva/iterator.go b/siva/iterator.go index b393464..b09bf3c 100644 --- a/siva/iterator.go +++ b/siva/iterator.go @@ -16,6 +16,7 @@ type repositoryIterator struct { var _ borges.RepositoryIterator = (*repositoryIterator)(nil) +// Next implements the borges.RepositoryIterator interface. func (i *repositoryIterator) Next() (borges.Repository, error) { for { if i.pos >= len(i.remotes) { @@ -39,6 +40,7 @@ func (i *repositoryIterator) Next() (borges.Repository, error) { } } +// ForEach implements the borges.RepositoryIterator interface. func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error { for { r, err := i.Next() @@ -56,4 +58,5 @@ func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error { } } +// Close implements the borges.RepositoryIterator interface. func (i *repositoryIterator) Close() {} diff --git a/siva/library.go b/siva/library.go index 44f7f91..81ad520 100644 --- a/siva/library.go +++ b/siva/library.go @@ -6,6 +6,7 @@ import ( "strings" borges "github.com/src-d/go-borges" + "github.com/src-d/go-borges/util" billy "gopkg.in/src-d/go-billy.v4" butil "gopkg.in/src-d/go-billy.v4/util" @@ -16,17 +17,37 @@ type Library struct { id borges.LibraryID fs billy.Filesystem transactional bool + locReg *locationRegistry +} + +// LibraryOptions hold configuration options for the library. +type LibraryOptions struct { + // Transactional enables transactions for repository writes. + Transactional bool + // RegistryCache is the maximum number of locations in the cache. A value + // of 0 disables the cache. + RegistryCache int } var _ borges.Library = (*Library)(nil) // NewLibrary creates a new siva.Library. -func NewLibrary(id string, fs billy.Filesystem, transactional bool) *Library { +func NewLibrary( + id string, + fs billy.Filesystem, + ops LibraryOptions, +) (*Library, error) { + lr, err := newLocationRegistry(ops.RegistryCache) + if err != nil { + return nil, err + } + return &Library{ id: borges.LibraryID(id), fs: fs, - transactional: transactional, - } + transactional: ops.Transactional, + locReg: lr, + }, nil } // ID implements borges.Library interface. @@ -114,8 +135,19 @@ func (l *Library) Repositories(mode borges.Mode) (borges.RepositoryIterator, err // Location implements borges.Library interface. func (l *Library) Location(id borges.LocationID) (borges.Location, error) { + if loc, ok := l.locReg.Get(id); ok { + return loc, nil + } + path := fmt.Sprintf("%s.siva", id) - return NewLocation(id, l, path) + loc, err := NewLocation(id, l, path) + if err != nil { + return nil, err + } + + l.locReg.Add(loc) + + return loc, nil } // Locations implements borges.Library interface. diff --git a/siva/library_test.go b/siva/library_test.go index 157e766..f4ce566 100644 --- a/siva/library_test.go +++ b/siva/library_test.go @@ -5,6 +5,7 @@ import ( borges "github.com/src-d/go-borges" "github.com/src-d/go-borges/test" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "gopkg.in/src-d/go-billy.v4/osfs" ) @@ -14,7 +15,10 @@ func TestLibrary(t *testing.T) { fs := osfs.New("../_testdata/siva") s.LibrarySingle = func() borges.Library { - return NewLibrary("foo", fs, false) + lib, err := NewLibrary("foo", fs, LibraryOptions{}) + require.NoError(t, err) + + return lib } suite.Run(t, s) diff --git a/siva/location.go b/siva/location.go index 975e07b..eb08703 100644 --- a/siva/location.go +++ b/siva/location.go @@ -9,47 +9,42 @@ import ( "gopkg.in/src-d/go-git.v4/config" ) -var ( - // ErrMalformedData when checkpoint data is invalid. - ErrMalformedData = errors.NewKind("malformed data") - // ErrTransactioning is returned when a second transaction wants to start - // in the same location. - ErrTransactioning = errors.NewKind("already doing a transaction") -) +// ErrMalformedData when checkpoint data is invalid. +var ErrMalformedData = errors.NewKind("malformed data") +// Location represents a siva file archiving several git repositories. type Location struct { - id borges.LocationID - path string - // cachedFS billy.Filesystem - cachedFS sivafs.SivaFS - library *Library - - // last good position - checkpoint *Checkpoint - transactioning bool + id borges.LocationID + path string + cachedFS sivafs.SivaFS + lib *Library + checkpoint *checkpoint + txer *transactioner } var _ borges.Location = (*Location)(nil) -func NewLocation(id borges.LocationID, l *Library, path string) (*Location, error) { - checkpoint, err := NewCheckpoint(l.fs, path) +// NewLocation creates a new Location object. +func NewLocation(id borges.LocationID, lib *Library, path string) (*Location, error) { + cp, err := newCheckpoint(lib.fs, path) if err != nil { return nil, err } - location := &Location{ + loc := &Location{ id: id, path: path, - library: l, - checkpoint: checkpoint, + lib: lib, + checkpoint: cp, } - _, err = location.FS() + _, err = loc.FS() if err != nil { return nil, err } - return location, nil + loc.txer = newTransactioner(loc, lib.locReg) + return loc, nil } func (l *Location) newFS() (sivafs.SivaFS, error) { @@ -75,10 +70,12 @@ func (l *Location) FS() (sivafs.SivaFS, error) { return sfs, nil } +// ID implements the borges.Location interface. func (l *Location) ID() borges.LocationID { return l.id } +// Init implements the borges.Location interface. func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -111,6 +108,7 @@ func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) { return repo, nil } +// Get implements the borges.Location interface. func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -124,6 +122,7 @@ func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Reposit return l.repository(id, mode) } +// GetOrInit implements the borges.Location interface. func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -137,6 +136,7 @@ func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error) return l.Init(id) } +// Has implements the borges.Location interface. func (l *Location) Has(name borges.RepositoryID) (bool, error) { repo, err := l.repository("", borges.ReadOnlyMode) if err != nil { @@ -159,6 +159,7 @@ func (l *Location) Has(name borges.RepositoryID) (bool, error) { return false, nil } +// Repositories implements the borges.Location interface. func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, error) { var remotes []*config.RemoteConfig @@ -183,21 +184,17 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er }, nil } -func (l *Location) transactional() bool { - return l.library.transactional -} - func (l *Location) baseFS() billy.Filesystem { - return l.library.fs + return l.lib.fs } -func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) { - if !l.transactional() || mode != borges.RWMode { +func (l *Location) getRepoFs(id borges.RepositoryID, mode borges.Mode) (sivafs.SivaFS, error) { + if !l.lib.transactional || mode != borges.RWMode { return l.FS() } - if l.transactioning { - return nil, ErrTransactioning.New() + if err := l.txer.Start(); err != nil { + return nil, err } fs, err := l.newFS() @@ -209,34 +206,35 @@ func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) { return nil, err } - l.transactioning = true return fs, nil } -func (l *Location) Commit() error { - if !l.transactional() || !l.transactioning { +// Commit persists transactional or write operations performed on the repositories. +func (l *Location) Commit(mode borges.Mode) error { + if !l.lib.transactional || mode != borges.RWMode { return nil } + defer l.txer.Stop() if err := l.checkpoint.Reset(); err != nil { return err } - l.transactioning = false l.cachedFS = nil return nil } -func (l *Location) Rollback() error { - if !l.transactional() || !l.transactioning { +// Rollback discard transactional or write operations performed on the repositories. +func (l *Location) Rollback(mode borges.Mode) error { + if !l.lib.transactional || mode != borges.RWMode { return nil } + defer l.txer.Stop() if err := l.checkpoint.Apply(); err != nil { return err } - l.transactioning = false l.cachedFS = nil return nil } @@ -245,7 +243,7 @@ func (l *Location) repository( id borges.RepositoryID, mode borges.Mode, ) (borges.Repository, error) { - fs, err := l.setupTransaction(mode) + fs, err := l.getRepoFs(id, mode) if err != nil { return nil, err } diff --git a/siva/location_registry.go b/siva/location_registry.go new file mode 100644 index 0000000..b86e4c7 --- /dev/null +++ b/siva/location_registry.go @@ -0,0 +1,89 @@ +package siva + +import ( + "sync" + + borges "github.com/src-d/go-borges" + + lru "github.com/hashicorp/golang-lru" +) + +// locationRegistry holds a list of locations that have a transaction under way +// and recently used. +type locationRegistry struct { + used map[borges.LocationID]*Location + cache *lru.Cache + + m sync.RWMutex +} + +func newLocationRegistry(cacheSize int) (*locationRegistry, error) { + var c *lru.Cache + var err error + + if cacheSize > 0 { + c, err = lru.New(cacheSize) + if err != nil { + return nil, err + } + } + + return &locationRegistry{ + used: make(map[borges.LocationID]*Location), + cache: c, + }, nil +} + +// Get retrieves a location from the registry. +func (r *locationRegistry) Get(id borges.LocationID) (*Location, bool) { + r.m.RLock() + defer r.m.RUnlock() + + if l, ok := r.used[id]; ok { + return l, true + } + + if r.cache == nil { + return nil, false + } + + if l, ok := r.cache.Get(id); ok { + return l.(*Location), true + } + + return nil, false +} + +// Add stores a location in the registry. +func (r *locationRegistry) Add(l *Location) { + if r.cache == nil { + return + } + + r.m.RLock() + defer r.m.RUnlock() + + r.cache.Add(l.ID(), l) +} + +// StartTransaction marks a location as being used so it does not get evicted. +func (r *locationRegistry) StartTransaction(l *Location) { + r.m.Lock() + defer r.m.Unlock() + + r.used[l.ID()] = l + if r.cache != nil { + r.cache.Remove(l.ID()) + } +} + +// EndTransaction moves a location back to normal cache. +func (r *locationRegistry) EndTransaction(l *Location) { + r.m.Lock() + defer r.m.Unlock() + + delete(r.used, l.ID()) + if r.cache != nil { + r.cache.Add(l.ID(), l) + } +} diff --git a/siva/location_registry_test.go b/siva/location_registry_test.go new file mode 100644 index 0000000..c7272d2 --- /dev/null +++ b/siva/location_registry_test.go @@ -0,0 +1,108 @@ +package siva + +import ( + "fmt" + "testing" + + borges "github.com/src-d/go-borges" + "github.com/stretchr/testify/require" +) + +func point(p interface{}) string { + return fmt.Sprintf("%p", p) +} + +func TestRegistryNoCache(t *testing.T) { + require := require.New(t) + + fs := setupFS(t) + + ops := LibraryOptions{ + Transactional: true, + RegistryCache: 0, + } + lib, err := NewLibrary("test", fs, ops) + require.NoError(err) + + // locations are recreated when no transaction is being made + + loc1, err := lib.Location("foo-bar") + require.NoError(err) + loc2, err := lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) + + // when there is a transaction it reuses the location + + loc1, err = lib.Location("foo-bar") + require.NoError(err) + + r, err := loc1.Get("github.com/foo/bar", borges.RWMode) + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + // after finishing the transaction locations are regenerated again + + err = r.Close() + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) + + // same case but with commit + + r, err = loc1.Get("github.com/foo/bar", borges.RWMode) + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + err = r.Commit() + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) +} + +func TestRegistryCache(t *testing.T) { + require := require.New(t) + + fs := setupFS(t) + + ops := LibraryOptions{ + Transactional: true, + RegistryCache: 1, + } + lib, err := NewLibrary("test", fs, ops) + require.NoError(err) + + // as the capacity is 1 getting the same location twice returns the same + // object + loc1, err := lib.Location("foo-bar") + require.NoError(err) + loc2, err := lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + // getting another location should swipe the first location from cache + + _, err = lib.Location("foo-qux") + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) +} diff --git a/siva/repository.go b/siva/repository.go index 8e3e267..df9bf58 100644 --- a/siva/repository.go +++ b/siva/repository.go @@ -9,6 +9,8 @@ import ( "gopkg.in/src-d/go-git.v4/storage/filesystem" ) +// Repository is an implementation for siva files of borges.Repository +// interface. type Repository struct { id borges.RepositoryID repo *git.Repository @@ -20,6 +22,7 @@ type Repository struct { var _ borges.Repository = (*Repository)(nil) +// NewRepository creates a new siva backed Repository. func NewRepository( id borges.RepositoryID, fs sivafs.SivaFS, @@ -41,36 +44,50 @@ func NewRepository( }, nil } +// ID implements borges.Repository interface. func (r *Repository) ID() borges.RepositoryID { return r.id } +// LocationID implements borges.Repository interface. func (r *Repository) LocationID() borges.LocationID { return r.location.ID() } +// Mode implements borges.Repository interface. func (r *Repository) Mode() borges.Mode { return r.mode } +// Commit implements borges.Repository interface. func (r *Repository) Commit() error { + if r.mode != borges.RWMode { + return nil + } + err := r.fs.Sync() if err != nil { return err } - return r.location.Commit() + return r.location.Commit(r.mode) } +// Close implements borges.Repository interface. func (r *Repository) Close() error { + if r.mode != borges.RWMode { + return nil + } + err := r.fs.Sync() if err != nil { return err } - return r.location.Rollback() + return r.location.Rollback(r.mode) } +// R implements borges.Repository interface. func (r *Repository) R() *git.Repository { return r.repo } diff --git a/siva/transaction_test.go b/siva/transaction_test.go index ef04f3a..7522eed 100644 --- a/siva/transaction_test.go +++ b/siva/transaction_test.go @@ -2,29 +2,52 @@ package siva import ( "io/ioutil" + "path" "testing" borges "github.com/src-d/go-borges" "github.com/stretchr/testify/require" + billy "gopkg.in/src-d/go-billy.v4" "gopkg.in/src-d/go-billy.v4/memfs" "gopkg.in/src-d/go-billy.v4/util" git "gopkg.in/src-d/go-git.v4" ) +func setupFS(t *testing.T) billy.Filesystem { + t.Helper() + require := require.New(t) + + fs := memfs.New() + + sivaFiles := []string{ + "foo-bar.siva", + "foo-qux.siva", + } + + for _, f := range sivaFiles { + path := path.Join("..", "_testdata", "siva", f) + sivaData, err := ioutil.ReadFile(path) + require.NoError(err) + err = util.WriteFile(fs, f, sivaData, 0666) + require.NoError(err) + } + + return fs +} + func setupTranstaction( t *testing.T, ) (borges.Location, borges.Repository, borges.Repository) { t.Helper() require := require.New(t) - sivaData, err := ioutil.ReadFile("../_testdata/siva/foo-bar.siva") - require.NoError(err) - - fs := memfs.New() - lib := NewLibrary("test", fs, true) + fs := setupFS(t) - err = util.WriteFile(fs, "foo-bar.siva", sivaData, 0666) + lib, err := NewLibrary("test", fs, LibraryOptions{ + Transactional: true, + }) require.NoError(err) + l, err := lib.Location("foo-bar") require.NoError(err) diff --git a/siva/transactioner.go b/siva/transactioner.go new file mode 100644 index 0000000..9a1b472 --- /dev/null +++ b/siva/transactioner.go @@ -0,0 +1,54 @@ +package siva + +import ( + "time" + + errors "gopkg.in/src-d/go-errors.v1" +) + +// ErrTransactionTimeout is returned when a repository can't be retrieved in +// transactional mode because of a timeout. +var ErrTransactionTimeout = errors.NewKind("timeout exceeded: unable to " + + "retrieve repository from location %s in transactional mode.") + +const txTimeout = 60 * time.Second + +// transactioner manages synchronization to allow transactions on a Location. +type transactioner struct { + notification chan struct{} + timeout time.Duration + loc *Location + locReg *locationRegistry +} + +func newTransactioner( + loc *Location, + locReg *locationRegistry, +) *transactioner { + n := make(chan struct{}, 1) + n <- struct{}{} + return &transactioner{ + notification: n, + timeout: txTimeout, + loc: loc, + locReg: locReg, + } +} + +// Start requests permission for a new transaction. If it can't get it after a +// certain amount of time it will fail with an ErrTransactionTimeout error. +func (t *transactioner) Start() error { + select { + case <-t.notification: + t.locReg.StartTransaction(t.loc) + return nil + case <-time.After(t.timeout): + return ErrTransactionTimeout.New(t.loc.ID()) + } +} + +// Stop signals the transaction is finished. +func (t *transactioner) Stop() { + t.locReg.EndTransaction(t.loc) + t.notification <- struct{}{} +}