diff --git a/filemanager/gcsmanager.go b/filemanager/gcsmanager.go index 5c3db627..9b761742 100644 --- a/filemanager/gcsmanager.go +++ b/filemanager/gcsmanager.go @@ -8,6 +8,7 @@ import ( "os" "path" "strings" + "sync" "time" "google.golang.org/api/iterator" @@ -30,7 +31,9 @@ type GCSConfig struct { } // NewGCSManager creates a new file manager for Google Cloud Storage -func NewGCSManager(config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration) (*gcsManager, error) { +func NewGCSManager( + config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration, +) (*gcsManager, error) { return &gcsManager{ baseManager: &baseManager{ logger: log, @@ -40,7 +43,7 @@ func NewGCSManager(config map[string]interface{}, log logger.Logger, defaultTime }, nil } -func (manager *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession { +func (m *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession { return &gcsListSession{ baseListSession: &baseListSession{ ctx: ctx, @@ -48,41 +51,41 @@ func (manager *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix: prefix, maxItems: maxItems, }, - manager: manager, + manager: m, } } -func (manager *gcsManager) Download(ctx context.Context, output *os.File, key string) error { - client, err := manager.getClient(ctx) +func (m *gcsManager) Download(ctx context.Context, output *os.File, key string) error { + client, err := m.getClient(ctx) if err != nil { return err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() - rc, err := client.Bucket(manager.config.Bucket).Object(key).NewReader(ctx) + rc, err := client.Bucket(m.config.Bucket).Object(key).NewReader(ctx) if err != nil { return err } - defer rc.Close() + defer func() { _ = rc.Close() }() _, err = io.Copy(output, rc) return err } -func (manager *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) { - fileName := path.Join(manager.config.Prefix, path.Join(prefixes...), path.Base(file.Name())) +func (m *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) { + fileName := path.Join(m.config.Prefix, path.Join(prefixes...), path.Base(file.Name())) - client, err := manager.getClient(ctx) + client, err := m.getClient(ctx) if err != nil { return UploadedFile{}, err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() - obj := client.Bucket(manager.config.Bucket).Object(fileName) + obj := client.Bucket(m.config.Bucket).Object(fileName) w := obj.NewWriter(ctx) if _, err := io.Copy(w, file); err != nil { err = fmt.Errorf("copying file to GCS: %v", err) @@ -98,82 +101,86 @@ func (manager *gcsManager) Upload(ctx context.Context, file *os.File, prefixes . } attrs := w.Attrs() - return UploadedFile{Location: manager.objectURL(attrs), ObjectName: fileName}, err + return UploadedFile{Location: m.objectURL(attrs), ObjectName: fileName}, err } -func (manager *gcsManager) Delete(ctx context.Context, keys []string) (err error) { - client, err := manager.getClient(ctx) +func (m *gcsManager) Delete(ctx context.Context, keys []string) (err error) { + client, err := m.getClient(ctx) if err != nil { return err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() for _, key := range keys { - if err := client.Bucket(manager.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) { + if err := client.Bucket(m.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) { return err } } return } -func (manager *gcsManager) Prefix() string { - return manager.config.Prefix +func (m *gcsManager) Prefix() string { + return m.config.Prefix } -func (manager *gcsManager) GetObjectNameFromLocation(location string) (string, error) { - splitStr := strings.Split(location, manager.config.Bucket) +func (m *gcsManager) GetObjectNameFromLocation(location string) (string, error) { + splitStr := strings.Split(location, m.config.Bucket) object := strings.TrimLeft(splitStr[len(splitStr)-1], "/") return object, nil } -func (manager *gcsManager) GetDownloadKeyFromFileLocation(location string) string { - splitStr := strings.Split(location, manager.config.Bucket) +func (m *gcsManager) GetDownloadKeyFromFileLocation(location string) string { + splitStr := strings.Split(location, m.config.Bucket) key := strings.TrimLeft(splitStr[len(splitStr)-1], "/") return key } -func (manager *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string { - if manager.config.EndPoint != nil && *manager.config.EndPoint != "" { - endpoint := strings.TrimSuffix(*manager.config.EndPoint, "/") +func (m *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string { + if m.config.EndPoint != nil && *m.config.EndPoint != "" { + endpoint := strings.TrimSuffix(*m.config.EndPoint, "/") return fmt.Sprintf("%s/%s/%s", endpoint, objAttrs.Bucket, objAttrs.Name) } return fmt.Sprintf("https://storage.googleapis.com/%s/%s", objAttrs.Bucket, objAttrs.Name) } -func (manager *gcsManager) getClient(ctx context.Context) (*storage.Client, error) { - var err error +func (m *gcsManager) getClient(ctx context.Context) (*storage.Client, error) { + m.clientMu.Lock() + defer m.clientMu.Unlock() - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) - defer cancel() - if manager.client != nil { - return manager.client, err + if m.client != nil { + return m.client, nil } - options := []option.ClientOption{} - if manager.config.EndPoint != nil && *manager.config.EndPoint != "" { - options = append(options, option.WithEndpoint(*manager.config.EndPoint)) + var options []option.ClientOption + if m.config.EndPoint != nil && *m.config.EndPoint != "" { + options = append(options, option.WithEndpoint(*m.config.EndPoint)) } - if !googleutil.ShouldSkipCredentialsInit(manager.config.Credentials) { - if err = googleutil.CompatibleGoogleCredentialsJSON([]byte(manager.config.Credentials)); err != nil { - return manager.client, err + if !googleutil.ShouldSkipCredentialsInit(m.config.Credentials) { + if err := googleutil.CompatibleGoogleCredentialsJSON([]byte(m.config.Credentials)); err != nil { + return m.client, err } - options = append(options, option.WithCredentialsJSON([]byte(manager.config.Credentials))) + options = append(options, option.WithCredentialsJSON([]byte(m.config.Credentials))) } - if manager.config.JSONReads { + if m.config.JSONReads { options = append(options, storage.WithJSONReads()) } - manager.client, err = storage.NewClient(ctx, options...) - return manager.client, err + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) + defer cancel() + + var err error + m.client, err = storage.NewClient(ctx, options...) + return m.client, err } type gcsManager struct { *baseManager config *GCSConfig - client *storage.Client + client *storage.Client + clientMu sync.Mutex } func gcsConfig(config map[string]interface{}) *GCSConfig {