From c8c6226e24d0ac096e54d62d95ae0436e5c7f725 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Fri, 15 Feb 2019 15:43:59 +0000 Subject: [PATCH 1/4] siva: synchronize transactions through a channel Signed-off-by: Manuel Carmona --- siva/location.go | 64 +++++++++++++++++++++++++++++++--------------- siva/repository.go | 4 +-- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/siva/location.go b/siva/location.go index 975e07b..2ccfd4b 100644 --- a/siva/location.go +++ b/siva/location.go @@ -1,6 +1,8 @@ package siva import ( + "time" + borges "github.com/src-d/go-borges" sivafs "gopkg.in/src-d/go-billy-siva.v4" billy "gopkg.in/src-d/go-billy.v4" @@ -12,9 +14,9 @@ import ( var ( // ErrMalformedData when checkpoint data is invalid. ErrMalformedData = errors.NewKind("malformed data") - // ErrTransactioning is returned when a second transaction wants to start - // in the same location. - ErrTransactioning = errors.NewKind("already doing a transaction") + // ErrTransactionTimeout is returned when a repository can't + // be retrieved in transactional mode because of a timeout. + ErrTransactionTimeout = errors.NewKind("timeout exceeded: unable toretrieve repository %s in transactional mode.") ) type Location struct { @@ -25,12 +27,36 @@ type Location struct { library *Library // last good position - checkpoint *Checkpoint - transactioning bool + checkpoint *Checkpoint + tx *repoTxer } var _ borges.Location = (*Location)(nil) +const txTimeout = 60 * time.Second + +type repoTxer struct { + notification chan struct{} + timeout time.Duration +} + +func newRepoTxer() *repoTxer { + n := make(chan struct{}, 1) + n <- struct{}{} + return &repoTxer{n, txTimeout} +} + +func (rtx *repoTxer) Start(id borges.RepositoryID) error { + select { + case <-rtx.notification: + return nil + case <-time.After(rtx.timeout): + return ErrTransactionTimeout.New(id) + } +} + +func (rtx *repoTxer) Stop() { rtx.notification <- struct{}{} } + func NewLocation(id borges.LocationID, l *Library, path string) (*Location, error) { checkpoint, err := NewCheckpoint(l.fs, path) if err != nil { @@ -49,6 +75,7 @@ func NewLocation(id borges.LocationID, l *Library, path string) (*Location, erro return nil, err } + location.tx = newRepoTxer() return location, nil } @@ -183,21 +210,17 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er }, nil } -func (l *Location) transactional() bool { - return l.library.transactional -} - func (l *Location) baseFS() billy.Filesystem { return l.library.fs } -func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) { - if !l.transactional() || mode != borges.RWMode { +func (l *Location) getRepoFs(id borges.RepositoryID, mode borges.Mode) (sivafs.SivaFS, error) { + if !l.library.transactional || mode != borges.RWMode { return l.FS() } - if l.transactioning { - return nil, ErrTransactioning.New() + if err := l.tx.Start(id); err != nil { + return nil, err } fs, err := l.newFS() @@ -209,34 +232,33 @@ func (l *Location) setupTransaction(mode borges.Mode) (sivafs.SivaFS, error) { return nil, err } - l.transactioning = true return fs, nil } -func (l *Location) Commit() error { - if !l.transactional() || !l.transactioning { +func (l *Location) Commit(mode borges.Mode) error { + if !l.library.transactional || mode != borges.RWMode { return nil } + defer l.tx.Stop() if err := l.checkpoint.Reset(); err != nil { return err } - l.transactioning = false l.cachedFS = nil return nil } -func (l *Location) Rollback() error { - if !l.transactional() || !l.transactioning { +func (l *Location) Rollback(mode borges.Mode) error { + if !l.library.transactional || mode != borges.RWMode { return nil } + defer l.tx.Stop() if err := l.checkpoint.Apply(); err != nil { return err } - l.transactioning = false l.cachedFS = nil return nil } @@ -245,7 +267,7 @@ func (l *Location) repository( id borges.RepositoryID, mode borges.Mode, ) (borges.Repository, error) { - fs, err := l.setupTransaction(mode) + fs, err := l.getRepoFs(id, mode) if err != nil { return nil, err } diff --git a/siva/repository.go b/siva/repository.go index 8e3e267..2b58e8c 100644 --- a/siva/repository.go +++ b/siva/repository.go @@ -59,7 +59,7 @@ func (r *Repository) Commit() error { return err } - return r.location.Commit() + return r.location.Commit(r.mode) } func (r *Repository) Close() error { @@ -68,7 +68,7 @@ func (r *Repository) Close() error { return err } - return r.location.Rollback() + return r.location.Rollback(r.mode) } func (r *Repository) R() *git.Repository { From ab470fcbc28b1f44889fb123c451623d14979947 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Fri, 15 Feb 2019 15:10:48 +0100 Subject: [PATCH 2/4] add location registry to library It's cache size is hardcoded for now until a library options is implemented. Transactions should call startTransaction and endTransaction to make sure that subsequent Location calls return the correct location. Signed-off-by: Javi Fontan --- siva/checkpoint_test.go | 3 +- siva/library.go | 40 ++++++++++++++++++++-- siva/library_test.go | 6 +++- siva/location.go | 5 ++- siva/locationregistry.go | 72 ++++++++++++++++++++++++++++++++++++++++ siva/repository.go | 17 ++++++++++ siva/transaction_test.go | 3 +- 7 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 siva/locationregistry.go diff --git a/siva/checkpoint_test.go b/siva/checkpoint_test.go index de95fde..33d6448 100644 --- a/siva/checkpoint_test.go +++ b/siva/checkpoint_test.go @@ -19,7 +19,8 @@ func TestCheckpoint(t *testing.T) { require.NoError(err) fs := memfs.New() - lib := NewLibrary("test", fs, true) + lib, err := NewLibrary("test", fs, true) + require.NoError(err) var l borges.Location diff --git a/siva/library.go b/siva/library.go index 44f7f91..bc86fb7 100644 --- a/siva/library.go +++ b/siva/library.go @@ -6,27 +6,42 @@ import ( "strings" borges "github.com/src-d/go-borges" + "github.com/src-d/go-borges/util" billy "gopkg.in/src-d/go-billy.v4" butil "gopkg.in/src-d/go-billy.v4/util" ) +// LocationRegistrySize is the number of locations cached in the registry. +const LocationRegistrySize = 1024 + // Library represents a borges.Library implementation based on siva files. type Library struct { id borges.LibraryID fs billy.Filesystem transactional bool + locReg *locationRegistry } var _ borges.Library = (*Library)(nil) // NewLibrary creates a new siva.Library. -func NewLibrary(id string, fs billy.Filesystem, transactional bool) *Library { +func NewLibrary( + id string, + fs billy.Filesystem, + transactional bool, +) (*Library, error) { + l, err := newLocationRegistry(LocationRegistrySize) + if err != nil { + return nil, err + } + return &Library{ id: borges.LibraryID(id), fs: fs, transactional: transactional, - } + locReg: l, + }, nil } // ID implements borges.Library interface. @@ -114,8 +129,19 @@ func (l *Library) Repositories(mode borges.Mode) (borges.RepositoryIterator, err // Location implements borges.Library interface. func (l *Library) Location(id borges.LocationID) (borges.Location, error) { + if loc, ok := l.locReg.Get(id); ok { + return loc, nil + } + path := fmt.Sprintf("%s.siva", id) - return NewLocation(id, l, path) + loc, err := NewLocation(id, l, path) + if err != nil { + return nil, err + } + + l.locReg.Add(loc) + + return loc, nil } // Locations implements borges.Library interface. @@ -162,3 +188,11 @@ func (l *Library) Libraries() (borges.LibraryIterator, error) { libs := []borges.Library{l} return util.NewLibraryIterator(libs), nil } + +func (l *Library) startTransaction(loc *Location) { + l.locReg.StartTransaction(loc) +} + +func (l *Library) endTransaction(loc *Location) { + l.locReg.EndTransaction(loc) +} diff --git a/siva/library_test.go b/siva/library_test.go index 157e766..1240e4f 100644 --- a/siva/library_test.go +++ b/siva/library_test.go @@ -5,6 +5,7 @@ import ( borges "github.com/src-d/go-borges" "github.com/src-d/go-borges/test" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "gopkg.in/src-d/go-billy.v4/osfs" ) @@ -14,7 +15,10 @@ func TestLibrary(t *testing.T) { fs := osfs.New("../_testdata/siva") s.LibrarySingle = func() borges.Library { - return NewLibrary("foo", fs, false) + lib, err := NewLibrary("foo", fs, false) + require.NoError(t, err) + + return lib } suite.Run(t, s) diff --git a/siva/location.go b/siva/location.go index 2ccfd4b..2840a5c 100644 --- a/siva/location.go +++ b/siva/location.go @@ -20,9 +20,8 @@ var ( ) type Location struct { - id borges.LocationID - path string - // cachedFS billy.Filesystem + id borges.LocationID + path string cachedFS sivafs.SivaFS library *Library diff --git a/siva/locationregistry.go b/siva/locationregistry.go new file mode 100644 index 0000000..fef22c1 --- /dev/null +++ b/siva/locationregistry.go @@ -0,0 +1,72 @@ +package siva + +import ( + "sync" + + borges "github.com/src-d/go-borges" + + lru "github.com/hashicorp/golang-lru" +) + +func newLocationRegistry(cacheSize int) (*locationRegistry, error) { + c, err := lru.New(cacheSize) + if err != nil { + return nil, err + } + + return &locationRegistry{ + used: make(map[borges.LocationID]*Location), + cache: c, + }, nil +} + +// locationRegistry holds a list of locations that have a transaction under way +// and recently used. +type locationRegistry struct { + used map[borges.LocationID]*Location + cache *lru.Cache + + m sync.RWMutex +} + +// Get retrieves a location from the registry. +func (r *locationRegistry) Get(id borges.LocationID) (*Location, bool) { + r.m.RLock() + defer r.m.RUnlock() + + if l, ok := r.used[id]; ok { + return l, true + } + + if l, ok := r.cache.Get(id); ok { + return l.(*Location), true + } + + return nil, false +} + +// Add stores a location in the registry. +func (r *locationRegistry) Add(l *Location) { + r.m.RLock() + defer r.m.RUnlock() + + r.cache.Add(l.ID(), l) +} + +// StartTransaction marks a location as being used so it does not get evicted. +func (r *locationRegistry) StartTransaction(l *Location) { + r.m.Lock() + defer r.m.Unlock() + + r.cache.Remove(l.ID()) + r.used[l.ID()] = l +} + +// EndTransaction moves a location back to normal cache. +func (r *locationRegistry) EndTransaction(l *Location) { + r.m.Lock() + defer r.m.Unlock() + + r.cache.Add(l.ID(), l) + delete(r.used, l.ID()) +} diff --git a/siva/repository.go b/siva/repository.go index 2b58e8c..df9bf58 100644 --- a/siva/repository.go +++ b/siva/repository.go @@ -9,6 +9,8 @@ import ( "gopkg.in/src-d/go-git.v4/storage/filesystem" ) +// Repository is an implementation for siva files of borges.Repository +// interface. type Repository struct { id borges.RepositoryID repo *git.Repository @@ -20,6 +22,7 @@ type Repository struct { var _ borges.Repository = (*Repository)(nil) +// NewRepository creates a new siva backed Repository. func NewRepository( id borges.RepositoryID, fs sivafs.SivaFS, @@ -41,19 +44,27 @@ func NewRepository( }, nil } +// ID implements borges.Repository interface. func (r *Repository) ID() borges.RepositoryID { return r.id } +// LocationID implements borges.Repository interface. func (r *Repository) LocationID() borges.LocationID { return r.location.ID() } +// Mode implements borges.Repository interface. func (r *Repository) Mode() borges.Mode { return r.mode } +// Commit implements borges.Repository interface. func (r *Repository) Commit() error { + if r.mode != borges.RWMode { + return nil + } + err := r.fs.Sync() if err != nil { return err @@ -62,7 +73,12 @@ func (r *Repository) Commit() error { return r.location.Commit(r.mode) } +// Close implements borges.Repository interface. func (r *Repository) Close() error { + if r.mode != borges.RWMode { + return nil + } + err := r.fs.Sync() if err != nil { return err @@ -71,6 +87,7 @@ func (r *Repository) Close() error { return r.location.Rollback(r.mode) } +// R implements borges.Repository interface. func (r *Repository) R() *git.Repository { return r.repo } diff --git a/siva/transaction_test.go b/siva/transaction_test.go index ef04f3a..80d1ea0 100644 --- a/siva/transaction_test.go +++ b/siva/transaction_test.go @@ -21,7 +21,8 @@ func setupTranstaction( require.NoError(err) fs := memfs.New() - lib := NewLibrary("test", fs, true) + lib, err := NewLibrary("test", fs, true) + require.NoError(err) err = util.WriteFile(fs, "foo-bar.siva", sivaData, 0666) require.NoError(err) From 469ab32a50c3b6ad886bd5004c7a1d5d1d270d8e Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Fri, 15 Feb 2019 20:11:43 +0100 Subject: [PATCH 3/4] add library options Now transactions and registry cache size can be configured with LibraryOptions struct. Also added tests for location registry. Signed-off-by: Javi Fontan --- siva/checkpoint_test.go | 2 +- siva/library.go | 18 ++++-- siva/library_test.go | 2 +- siva/locationregistry.go | 27 +++++++-- siva/locationregistry_test.go | 108 ++++++++++++++++++++++++++++++++++ siva/transaction_test.go | 34 +++++++++-- 6 files changed, 172 insertions(+), 19 deletions(-) create mode 100644 siva/locationregistry_test.go diff --git a/siva/checkpoint_test.go b/siva/checkpoint_test.go index 33d6448..8d0ab4d 100644 --- a/siva/checkpoint_test.go +++ b/siva/checkpoint_test.go @@ -19,7 +19,7 @@ func TestCheckpoint(t *testing.T) { require.NoError(err) fs := memfs.New() - lib, err := NewLibrary("test", fs, true) + lib, err := NewLibrary("test", fs, LibraryOptions{Transactional: true}) require.NoError(err) var l borges.Location diff --git a/siva/library.go b/siva/library.go index bc86fb7..287a373 100644 --- a/siva/library.go +++ b/siva/library.go @@ -12,9 +12,6 @@ import ( butil "gopkg.in/src-d/go-billy.v4/util" ) -// LocationRegistrySize is the number of locations cached in the registry. -const LocationRegistrySize = 1024 - // Library represents a borges.Library implementation based on siva files. type Library struct { id borges.LibraryID @@ -23,15 +20,24 @@ type Library struct { locReg *locationRegistry } +// LibraryOptions hold configuration options for the library. +type LibraryOptions struct { + // Transactional enables transactions for repository writes. + Transactional bool + // RegistryCache is the maximum number of locations in the cache. A value + // of 0 disables the cache. + RegistryCache int +} + var _ borges.Library = (*Library)(nil) // NewLibrary creates a new siva.Library. func NewLibrary( id string, fs billy.Filesystem, - transactional bool, + ops LibraryOptions, ) (*Library, error) { - l, err := newLocationRegistry(LocationRegistrySize) + l, err := newLocationRegistry(ops.RegistryCache) if err != nil { return nil, err } @@ -39,7 +45,7 @@ func NewLibrary( return &Library{ id: borges.LibraryID(id), fs: fs, - transactional: transactional, + transactional: ops.Transactional, locReg: l, }, nil } diff --git a/siva/library_test.go b/siva/library_test.go index 1240e4f..f4ce566 100644 --- a/siva/library_test.go +++ b/siva/library_test.go @@ -15,7 +15,7 @@ func TestLibrary(t *testing.T) { fs := osfs.New("../_testdata/siva") s.LibrarySingle = func() borges.Library { - lib, err := NewLibrary("foo", fs, false) + lib, err := NewLibrary("foo", fs, LibraryOptions{}) require.NoError(t, err) return lib diff --git a/siva/locationregistry.go b/siva/locationregistry.go index fef22c1..4b35d74 100644 --- a/siva/locationregistry.go +++ b/siva/locationregistry.go @@ -9,9 +9,14 @@ import ( ) func newLocationRegistry(cacheSize int) (*locationRegistry, error) { - c, err := lru.New(cacheSize) - if err != nil { - return nil, err + var c *lru.Cache + var err error + + if cacheSize > 0 { + c, err = lru.New(cacheSize) + if err != nil { + return nil, err + } } return &locationRegistry{ @@ -38,6 +43,10 @@ func (r *locationRegistry) Get(id borges.LocationID) (*Location, bool) { return l, true } + if r.cache == nil { + return nil, false + } + if l, ok := r.cache.Get(id); ok { return l.(*Location), true } @@ -47,6 +56,10 @@ func (r *locationRegistry) Get(id borges.LocationID) (*Location, bool) { // Add stores a location in the registry. func (r *locationRegistry) Add(l *Location) { + if r.cache == nil { + return + } + r.m.RLock() defer r.m.RUnlock() @@ -58,8 +71,10 @@ func (r *locationRegistry) StartTransaction(l *Location) { r.m.Lock() defer r.m.Unlock() - r.cache.Remove(l.ID()) r.used[l.ID()] = l + if r.cache != nil { + r.cache.Remove(l.ID()) + } } // EndTransaction moves a location back to normal cache. @@ -67,6 +82,8 @@ func (r *locationRegistry) EndTransaction(l *Location) { r.m.Lock() defer r.m.Unlock() - r.cache.Add(l.ID(), l) delete(r.used, l.ID()) + if r.cache != nil { + r.cache.Add(l.ID(), l) + } } diff --git a/siva/locationregistry_test.go b/siva/locationregistry_test.go new file mode 100644 index 0000000..c7272d2 --- /dev/null +++ b/siva/locationregistry_test.go @@ -0,0 +1,108 @@ +package siva + +import ( + "fmt" + "testing" + + borges "github.com/src-d/go-borges" + "github.com/stretchr/testify/require" +) + +func point(p interface{}) string { + return fmt.Sprintf("%p", p) +} + +func TestRegistryNoCache(t *testing.T) { + require := require.New(t) + + fs := setupFS(t) + + ops := LibraryOptions{ + Transactional: true, + RegistryCache: 0, + } + lib, err := NewLibrary("test", fs, ops) + require.NoError(err) + + // locations are recreated when no transaction is being made + + loc1, err := lib.Location("foo-bar") + require.NoError(err) + loc2, err := lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) + + // when there is a transaction it reuses the location + + loc1, err = lib.Location("foo-bar") + require.NoError(err) + + r, err := loc1.Get("github.com/foo/bar", borges.RWMode) + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + // after finishing the transaction locations are regenerated again + + err = r.Close() + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) + + // same case but with commit + + r, err = loc1.Get("github.com/foo/bar", borges.RWMode) + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + err = r.Commit() + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) +} + +func TestRegistryCache(t *testing.T) { + require := require.New(t) + + fs := setupFS(t) + + ops := LibraryOptions{ + Transactional: true, + RegistryCache: 1, + } + lib, err := NewLibrary("test", fs, ops) + require.NoError(err) + + // as the capacity is 1 getting the same location twice returns the same + // object + loc1, err := lib.Location("foo-bar") + require.NoError(err) + loc2, err := lib.Location("foo-bar") + require.NoError(err) + + require.Equal(point(loc1), point(loc2)) + + // getting another location should swipe the first location from cache + + _, err = lib.Location("foo-qux") + require.NoError(err) + + loc2, err = lib.Location("foo-bar") + require.NoError(err) + + require.NotEqual(point(loc1), point(loc2)) +} diff --git a/siva/transaction_test.go b/siva/transaction_test.go index 80d1ea0..7522eed 100644 --- a/siva/transaction_test.go +++ b/siva/transaction_test.go @@ -2,30 +2,52 @@ package siva import ( "io/ioutil" + "path" "testing" borges "github.com/src-d/go-borges" "github.com/stretchr/testify/require" + billy "gopkg.in/src-d/go-billy.v4" "gopkg.in/src-d/go-billy.v4/memfs" "gopkg.in/src-d/go-billy.v4/util" git "gopkg.in/src-d/go-git.v4" ) +func setupFS(t *testing.T) billy.Filesystem { + t.Helper() + require := require.New(t) + + fs := memfs.New() + + sivaFiles := []string{ + "foo-bar.siva", + "foo-qux.siva", + } + + for _, f := range sivaFiles { + path := path.Join("..", "_testdata", "siva", f) + sivaData, err := ioutil.ReadFile(path) + require.NoError(err) + err = util.WriteFile(fs, f, sivaData, 0666) + require.NoError(err) + } + + return fs +} + func setupTranstaction( t *testing.T, ) (borges.Location, borges.Repository, borges.Repository) { t.Helper() require := require.New(t) - sivaData, err := ioutil.ReadFile("../_testdata/siva/foo-bar.siva") - require.NoError(err) + fs := setupFS(t) - fs := memfs.New() - lib, err := NewLibrary("test", fs, true) + lib, err := NewLibrary("test", fs, LibraryOptions{ + Transactional: true, + }) require.NoError(err) - err = util.WriteFile(fs, "foo-bar.siva", sivaData, 0666) - require.NoError(err) l, err := lib.Location("foo-bar") require.NoError(err) From 704672dadc20b1c9e55b5f1b4dcfc9cc6fd254b8 Mon Sep 17 00:00:00 2001 From: Manuel Carmona Date: Tue, 19 Feb 2019 15:23:02 +0000 Subject: [PATCH 4/4] siva: add a transactioner using the location registry to coordinate transactional and write operations. Signed-off-by: Manuel Carmona --- siva/checkpoint.go | 16 ++-- siva/iterator.go | 3 + siva/library.go | 12 +-- siva/location.go | 89 +++++++------------ ...cationregistry.go => location_registry.go} | 18 ++-- ...stry_test.go => location_registry_test.go} | 0 siva/transactioner.go | 54 +++++++++++ 7 files changed, 109 insertions(+), 83 deletions(-) rename siva/{locationregistry.go => location_registry.go} (100%) rename siva/{locationregistry_test.go => location_registry_test.go} (100%) create mode 100644 siva/transactioner.go diff --git a/siva/checkpoint.go b/siva/checkpoint.go index 2ba9276..ac957ed 100644 --- a/siva/checkpoint.go +++ b/siva/checkpoint.go @@ -19,17 +19,17 @@ var ( const checkpointExtension = ".checkpoint" -// Checkpoint tracks the status of a siva file and creates checkpoints to be +// checkpoint tracks the status of a siva file and creates checkpoints to be // able to return back to a known state of that siva file. -type Checkpoint struct { +type checkpoint struct { offset int64 baseFs billy.Filesystem path string persist string } -// NewCheckpoint builds a new Checkpoint. -func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { +// newCheckpoint builds a new Checkpoint. +func newCheckpoint(fs billy.Filesystem, path string) (*checkpoint, error) { persist := path + checkpointExtension if _, err := fs.Stat(path); err != nil && os.IsNotExist(err) { @@ -38,7 +38,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { borges.ErrLocationNotExists.New(path)) } - c := &Checkpoint{ + c := &checkpoint{ baseFs: fs, path: path, persist: persist, @@ -64,7 +64,7 @@ func NewCheckpoint(fs billy.Filesystem, path string) (*Checkpoint, error) { // Apply applies if necessary the operations on the siva file to // leave it in the last correct state the checkpoint keeps. -func (c *Checkpoint) Apply() error { +func (c *checkpoint) Apply() error { if c.offset > 0 { f, err := c.baseFs.Open(c.path) if err != nil { @@ -82,7 +82,7 @@ func (c *Checkpoint) Apply() error { } // Save saves the current state of the siva file. -func (c *Checkpoint) Save() error { +func (c *checkpoint) Save() error { info, err := c.baseFs.Stat(c.path) if err != nil { return ErrCannotUseSivaFile.Wrap(err) @@ -98,7 +98,7 @@ func (c *Checkpoint) Save() error { } // Reset resets the checkpoint. -func (c *Checkpoint) Reset() error { +func (c *checkpoint) Reset() error { if err := cleanup(c.baseFs, c.persist); err != nil { return ErrCannotUseCheckpointFile.Wrap(err) } diff --git a/siva/iterator.go b/siva/iterator.go index b393464..b09bf3c 100644 --- a/siva/iterator.go +++ b/siva/iterator.go @@ -16,6 +16,7 @@ type repositoryIterator struct { var _ borges.RepositoryIterator = (*repositoryIterator)(nil) +// Next implements the borges.RepositoryIterator interface. func (i *repositoryIterator) Next() (borges.Repository, error) { for { if i.pos >= len(i.remotes) { @@ -39,6 +40,7 @@ func (i *repositoryIterator) Next() (borges.Repository, error) { } } +// ForEach implements the borges.RepositoryIterator interface. func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error { for { r, err := i.Next() @@ -56,4 +58,5 @@ func (i *repositoryIterator) ForEach(f func(borges.Repository) error) error { } } +// Close implements the borges.RepositoryIterator interface. func (i *repositoryIterator) Close() {} diff --git a/siva/library.go b/siva/library.go index 287a373..81ad520 100644 --- a/siva/library.go +++ b/siva/library.go @@ -37,7 +37,7 @@ func NewLibrary( fs billy.Filesystem, ops LibraryOptions, ) (*Library, error) { - l, err := newLocationRegistry(ops.RegistryCache) + lr, err := newLocationRegistry(ops.RegistryCache) if err != nil { return nil, err } @@ -46,7 +46,7 @@ func NewLibrary( id: borges.LibraryID(id), fs: fs, transactional: ops.Transactional, - locReg: l, + locReg: lr, }, nil } @@ -194,11 +194,3 @@ func (l *Library) Libraries() (borges.LibraryIterator, error) { libs := []borges.Library{l} return util.NewLibraryIterator(libs), nil } - -func (l *Library) startTransaction(loc *Location) { - l.locReg.StartTransaction(loc) -} - -func (l *Library) endTransaction(loc *Location) { - l.locReg.EndTransaction(loc) -} diff --git a/siva/location.go b/siva/location.go index 2840a5c..eb08703 100644 --- a/siva/location.go +++ b/siva/location.go @@ -1,8 +1,6 @@ package siva import ( - "time" - borges "github.com/src-d/go-borges" sivafs "gopkg.in/src-d/go-billy-siva.v4" billy "gopkg.in/src-d/go-billy.v4" @@ -11,71 +9,42 @@ import ( "gopkg.in/src-d/go-git.v4/config" ) -var ( - // ErrMalformedData when checkpoint data is invalid. - ErrMalformedData = errors.NewKind("malformed data") - // ErrTransactionTimeout is returned when a repository can't - // be retrieved in transactional mode because of a timeout. - ErrTransactionTimeout = errors.NewKind("timeout exceeded: unable toretrieve repository %s in transactional mode.") -) +// ErrMalformedData when checkpoint data is invalid. +var ErrMalformedData = errors.NewKind("malformed data") +// Location represents a siva file archiving several git repositories. type Location struct { - id borges.LocationID - path string - cachedFS sivafs.SivaFS - library *Library - - // last good position - checkpoint *Checkpoint - tx *repoTxer + id borges.LocationID + path string + cachedFS sivafs.SivaFS + lib *Library + checkpoint *checkpoint + txer *transactioner } var _ borges.Location = (*Location)(nil) -const txTimeout = 60 * time.Second - -type repoTxer struct { - notification chan struct{} - timeout time.Duration -} - -func newRepoTxer() *repoTxer { - n := make(chan struct{}, 1) - n <- struct{}{} - return &repoTxer{n, txTimeout} -} - -func (rtx *repoTxer) Start(id borges.RepositoryID) error { - select { - case <-rtx.notification: - return nil - case <-time.After(rtx.timeout): - return ErrTransactionTimeout.New(id) - } -} - -func (rtx *repoTxer) Stop() { rtx.notification <- struct{}{} } - -func NewLocation(id borges.LocationID, l *Library, path string) (*Location, error) { - checkpoint, err := NewCheckpoint(l.fs, path) +// NewLocation creates a new Location object. +func NewLocation(id borges.LocationID, lib *Library, path string) (*Location, error) { + cp, err := newCheckpoint(lib.fs, path) if err != nil { return nil, err } - location := &Location{ + loc := &Location{ id: id, path: path, - library: l, - checkpoint: checkpoint, + lib: lib, + checkpoint: cp, } - _, err = location.FS() + _, err = loc.FS() if err != nil { return nil, err } - location.tx = newRepoTxer() - return location, nil + loc.txer = newTransactioner(loc, lib.locReg) + return loc, nil } func (l *Location) newFS() (sivafs.SivaFS, error) { @@ -101,10 +70,12 @@ func (l *Location) FS() (sivafs.SivaFS, error) { return sfs, nil } +// ID implements the borges.Location interface. func (l *Location) ID() borges.LocationID { return l.id } +// Init implements the borges.Location interface. func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -137,6 +108,7 @@ func (l *Location) Init(id borges.RepositoryID) (borges.Repository, error) { return repo, nil } +// Get implements the borges.Location interface. func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -150,6 +122,7 @@ func (l *Location) Get(id borges.RepositoryID, mode borges.Mode) (borges.Reposit return l.repository(id, mode) } +// GetOrInit implements the borges.Location interface. func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error) { has, err := l.Has(id) if err != nil { @@ -163,6 +136,7 @@ func (l *Location) GetOrInit(id borges.RepositoryID) (borges.Repository, error) return l.Init(id) } +// Has implements the borges.Location interface. func (l *Location) Has(name borges.RepositoryID) (bool, error) { repo, err := l.repository("", borges.ReadOnlyMode) if err != nil { @@ -185,6 +159,7 @@ func (l *Location) Has(name borges.RepositoryID) (bool, error) { return false, nil } +// Repositories implements the borges.Location interface. func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, error) { var remotes []*config.RemoteConfig @@ -210,15 +185,15 @@ func (l *Location) Repositories(mode borges.Mode) (borges.RepositoryIterator, er } func (l *Location) baseFS() billy.Filesystem { - return l.library.fs + return l.lib.fs } func (l *Location) getRepoFs(id borges.RepositoryID, mode borges.Mode) (sivafs.SivaFS, error) { - if !l.library.transactional || mode != borges.RWMode { + if !l.lib.transactional || mode != borges.RWMode { return l.FS() } - if err := l.tx.Start(id); err != nil { + if err := l.txer.Start(); err != nil { return nil, err } @@ -234,12 +209,13 @@ func (l *Location) getRepoFs(id borges.RepositoryID, mode borges.Mode) (sivafs.S return fs, nil } +// Commit persists transactional or write operations performed on the repositories. func (l *Location) Commit(mode borges.Mode) error { - if !l.library.transactional || mode != borges.RWMode { + if !l.lib.transactional || mode != borges.RWMode { return nil } - defer l.tx.Stop() + defer l.txer.Stop() if err := l.checkpoint.Reset(); err != nil { return err } @@ -248,12 +224,13 @@ func (l *Location) Commit(mode borges.Mode) error { return nil } +// Rollback discard transactional or write operations performed on the repositories. func (l *Location) Rollback(mode borges.Mode) error { - if !l.library.transactional || mode != borges.RWMode { + if !l.lib.transactional || mode != borges.RWMode { return nil } - defer l.tx.Stop() + defer l.txer.Stop() if err := l.checkpoint.Apply(); err != nil { return err } diff --git a/siva/locationregistry.go b/siva/location_registry.go similarity index 100% rename from siva/locationregistry.go rename to siva/location_registry.go index 4b35d74..b86e4c7 100644 --- a/siva/locationregistry.go +++ b/siva/location_registry.go @@ -8,6 +8,15 @@ import ( lru "github.com/hashicorp/golang-lru" ) +// locationRegistry holds a list of locations that have a transaction under way +// and recently used. +type locationRegistry struct { + used map[borges.LocationID]*Location + cache *lru.Cache + + m sync.RWMutex +} + func newLocationRegistry(cacheSize int) (*locationRegistry, error) { var c *lru.Cache var err error @@ -25,15 +34,6 @@ func newLocationRegistry(cacheSize int) (*locationRegistry, error) { }, nil } -// locationRegistry holds a list of locations that have a transaction under way -// and recently used. -type locationRegistry struct { - used map[borges.LocationID]*Location - cache *lru.Cache - - m sync.RWMutex -} - // Get retrieves a location from the registry. func (r *locationRegistry) Get(id borges.LocationID) (*Location, bool) { r.m.RLock() diff --git a/siva/locationregistry_test.go b/siva/location_registry_test.go similarity index 100% rename from siva/locationregistry_test.go rename to siva/location_registry_test.go diff --git a/siva/transactioner.go b/siva/transactioner.go new file mode 100644 index 0000000..9a1b472 --- /dev/null +++ b/siva/transactioner.go @@ -0,0 +1,54 @@ +package siva + +import ( + "time" + + errors "gopkg.in/src-d/go-errors.v1" +) + +// ErrTransactionTimeout is returned when a repository can't be retrieved in +// transactional mode because of a timeout. +var ErrTransactionTimeout = errors.NewKind("timeout exceeded: unable to " + + "retrieve repository from location %s in transactional mode.") + +const txTimeout = 60 * time.Second + +// transactioner manages synchronization to allow transactions on a Location. +type transactioner struct { + notification chan struct{} + timeout time.Duration + loc *Location + locReg *locationRegistry +} + +func newTransactioner( + loc *Location, + locReg *locationRegistry, +) *transactioner { + n := make(chan struct{}, 1) + n <- struct{}{} + return &transactioner{ + notification: n, + timeout: txTimeout, + loc: loc, + locReg: locReg, + } +} + +// Start requests permission for a new transaction. If it can't get it after a +// certain amount of time it will fail with an ErrTransactionTimeout error. +func (t *transactioner) Start() error { + select { + case <-t.notification: + t.locReg.StartTransaction(t.loc) + return nil + case <-time.After(t.timeout): + return ErrTransactionTimeout.New(t.loc.ID()) + } +} + +// Stop signals the transaction is finished. +func (t *transactioner) Stop() { + t.locReg.EndTransaction(t.loc) + t.notification <- struct{}{} +}