Skip to content

Commit

Permalink
remove the local mutex, as S3 should handle that functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
danquack committed Dec 16, 2024
1 parent 4a275f9 commit 8861d2c
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 44 deletions.
3 changes: 0 additions & 3 deletions physical/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func NewS3Backend(conf map[string]string, logger log.Logger) (physical.Backend,
config.WithHTTPClient(&http.Client{Transport: pooledTransport}),
)
if accessKey != "" && secretKey != "" {
logger.Debug("using provided creds")
cfg.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, sessionToken)
} else {
logger.Debug("using other creds")
}

if err != nil {
Expand Down
43 changes: 2 additions & 41 deletions physical/s3/s3_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -46,11 +45,9 @@ type S3Lock struct {
value string
held bool
identity string
lock sync.Mutex

stopCh chan struct{}
stopped bool
stopLock sync.Mutex
stopCh chan struct{}
stopped bool

renewInterval time.Duration
retryInterval time.Duration
Expand All @@ -62,20 +59,6 @@ type S3Lock struct {
func (l *S3Lock) Unlock() error {
defer metrics.MeasureSince(metricLockUnlock, time.Now())

l.lock.Lock()
defer l.lock.Unlock()
if !l.held {
return nil
}

// Stop any existing locking or renewal attempts
l.stopLock.Lock()
if !l.stopped {
l.stopped = true
close(l.stopCh)
}
l.stopLock.Unlock()

// First verify the lock exists and we own it
exists, _, err := l.Value()
if err != nil {
Expand Down Expand Up @@ -124,7 +107,6 @@ func (s *S3Backend) LockWith(key, value string) (physical.Lock, error) {
key: key,
value: value,
identity: identity,
stopped: true,

renewInterval: LockRenewInterval,
retryInterval: LockRetryInterval,
Expand All @@ -137,12 +119,6 @@ func (s *S3Backend) LockWith(key, value string) (physical.Lock, error) {
func (l *S3Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
defer metrics.MeasureSince(metricLockLock, time.Now())

l.lock.Lock()
defer l.lock.Unlock()
if l.held {
return nil, errors.New("lock already held")
}

// Attempt to lock - this function blocks until a lock is acquired or an error
// occurs.

Expand All @@ -158,10 +134,7 @@ func (l *S3Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
l.held = true

// Build the locks
l.stopLock.Lock()
l.stopCh = make(chan struct{})
l.stopped = false
l.stopLock.Unlock()

// Periodically renew and watch the lock
go l.watchLock()
Expand Down Expand Up @@ -288,12 +261,6 @@ func (l *S3Lock) watchLock() {
if err != nil {
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
l.stopLock.Lock()
if !l.stopped {
close(l.stopCh)
l.stopped = true
}
l.stopLock.Unlock()
return
}
continue
Expand All @@ -306,12 +273,6 @@ func (l *S3Lock) watchLock() {
result.Body.Close()

if lockInfo.Identity != l.identity {
l.stopLock.Lock()
if !l.stopped {
close(l.stopCh)
l.stopped = true
}
l.stopLock.Unlock()
return
}
case <-l.stopCh:
Expand Down

0 comments on commit 8861d2c

Please sign in to comment.