Skip to content

Commit

Permalink
fix: gcs manager race (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Aug 23, 2023
1 parent d8cd51a commit 64602fd
Showing 1 changed file with 51 additions and 44 deletions.
95 changes: 51 additions & 44 deletions filemanager/gcsmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"strings"
"sync"
"time"

"google.golang.org/api/iterator"
Expand All @@ -30,7 +31,9 @@ type GCSConfig struct {
}

// NewGCSManager creates a new file manager for Google Cloud Storage
func NewGCSManager(config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration) (*gcsManager, error) {
func NewGCSManager(
config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration,
) (*gcsManager, error) {
return &gcsManager{
baseManager: &baseManager{
logger: log,
Expand All @@ -40,49 +43,49 @@ func NewGCSManager(config map[string]interface{}, log logger.Logger, defaultTime
}, nil
}

func (manager *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
func (m *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
return &gcsListSession{
baseListSession: &baseListSession{
ctx: ctx,
startAfter: startAfter,
prefix: prefix,
maxItems: maxItems,
},
manager: manager,
manager: m,
}
}

func (manager *gcsManager) Download(ctx context.Context, output *os.File, key string) error {
client, err := manager.getClient(ctx)
func (m *gcsManager) Download(ctx context.Context, output *os.File, key string) error {
client, err := m.getClient(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

rc, err := client.Bucket(manager.config.Bucket).Object(key).NewReader(ctx)
rc, err := client.Bucket(m.config.Bucket).Object(key).NewReader(ctx)
if err != nil {
return err
}
defer rc.Close()
defer func() { _ = rc.Close() }()

_, err = io.Copy(output, rc)
return err
}

func (manager *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
fileName := path.Join(manager.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))
func (m *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
fileName := path.Join(m.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))

client, err := manager.getClient(ctx)
client, err := m.getClient(ctx)
if err != nil {
return UploadedFile{}, err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

obj := client.Bucket(manager.config.Bucket).Object(fileName)
obj := client.Bucket(m.config.Bucket).Object(fileName)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, file); err != nil {
err = fmt.Errorf("copying file to GCS: %v", err)
Expand All @@ -98,82 +101,86 @@ func (manager *gcsManager) Upload(ctx context.Context, file *os.File, prefixes .
}
attrs := w.Attrs()

return UploadedFile{Location: manager.objectURL(attrs), ObjectName: fileName}, err
return UploadedFile{Location: m.objectURL(attrs), ObjectName: fileName}, err
}

func (manager *gcsManager) Delete(ctx context.Context, keys []string) (err error) {
client, err := manager.getClient(ctx)
func (m *gcsManager) Delete(ctx context.Context, keys []string) (err error) {
client, err := m.getClient(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

for _, key := range keys {
if err := client.Bucket(manager.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
if err := client.Bucket(m.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return err
}
}
return
}

func (manager *gcsManager) Prefix() string {
return manager.config.Prefix
func (m *gcsManager) Prefix() string {
return m.config.Prefix
}

func (manager *gcsManager) GetObjectNameFromLocation(location string) (string, error) {
splitStr := strings.Split(location, manager.config.Bucket)
func (m *gcsManager) GetObjectNameFromLocation(location string) (string, error) {
splitStr := strings.Split(location, m.config.Bucket)
object := strings.TrimLeft(splitStr[len(splitStr)-1], "/")
return object, nil
}

func (manager *gcsManager) GetDownloadKeyFromFileLocation(location string) string {
splitStr := strings.Split(location, manager.config.Bucket)
func (m *gcsManager) GetDownloadKeyFromFileLocation(location string) string {
splitStr := strings.Split(location, m.config.Bucket)
key := strings.TrimLeft(splitStr[len(splitStr)-1], "/")
return key
}

func (manager *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string {
if manager.config.EndPoint != nil && *manager.config.EndPoint != "" {
endpoint := strings.TrimSuffix(*manager.config.EndPoint, "/")
func (m *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string {
if m.config.EndPoint != nil && *m.config.EndPoint != "" {
endpoint := strings.TrimSuffix(*m.config.EndPoint, "/")
return fmt.Sprintf("%s/%s/%s", endpoint, objAttrs.Bucket, objAttrs.Name)
}
return fmt.Sprintf("https://storage.googleapis.com/%s/%s", objAttrs.Bucket, objAttrs.Name)
}

func (manager *gcsManager) getClient(ctx context.Context) (*storage.Client, error) {
var err error
func (m *gcsManager) getClient(ctx context.Context) (*storage.Client, error) {
m.clientMu.Lock()
defer m.clientMu.Unlock()

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
defer cancel()
if manager.client != nil {
return manager.client, err
if m.client != nil {
return m.client, nil
}
options := []option.ClientOption{}

if manager.config.EndPoint != nil && *manager.config.EndPoint != "" {
options = append(options, option.WithEndpoint(*manager.config.EndPoint))
var options []option.ClientOption
if m.config.EndPoint != nil && *m.config.EndPoint != "" {
options = append(options, option.WithEndpoint(*m.config.EndPoint))
}
if !googleutil.ShouldSkipCredentialsInit(manager.config.Credentials) {
if err = googleutil.CompatibleGoogleCredentialsJSON([]byte(manager.config.Credentials)); err != nil {
return manager.client, err
if !googleutil.ShouldSkipCredentialsInit(m.config.Credentials) {
if err := googleutil.CompatibleGoogleCredentialsJSON([]byte(m.config.Credentials)); err != nil {
return m.client, err
}
options = append(options, option.WithCredentialsJSON([]byte(manager.config.Credentials)))
options = append(options, option.WithCredentialsJSON([]byte(m.config.Credentials)))
}
if manager.config.JSONReads {
if m.config.JSONReads {
options = append(options, storage.WithJSONReads())
}

manager.client, err = storage.NewClient(ctx, options...)
return manager.client, err
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

var err error
m.client, err = storage.NewClient(ctx, options...)
return m.client, err
}

type gcsManager struct {
*baseManager
config *GCSConfig

client *storage.Client
client *storage.Client
clientMu sync.Mutex
}

func gcsConfig(config map[string]interface{}) *GCSConfig {
Expand Down

0 comments on commit 64602fd

Please sign in to comment.