Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gcs manager race #96

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 51 additions & 44 deletions filemanager/gcsmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"os"
"path"
"strings"
"sync"
"time"

"google.golang.org/api/iterator"
Expand All @@ -30,7 +31,9 @@
}

// NewGCSManager creates a new file manager for Google Cloud Storage
func NewGCSManager(config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration) (*gcsManager, error) {
func NewGCSManager(
config map[string]interface{}, log logger.Logger, defaultTimeout func() time.Duration,
) (*gcsManager, error) {
return &gcsManager{
baseManager: &baseManager{
logger: log,
Expand All @@ -40,49 +43,49 @@
}, nil
}

func (manager *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
func (m *gcsManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
return &gcsListSession{
baseListSession: &baseListSession{
ctx: ctx,
startAfter: startAfter,
prefix: prefix,
maxItems: maxItems,
},
manager: manager,
manager: m,
}
}

func (manager *gcsManager) Download(ctx context.Context, output *os.File, key string) error {
client, err := manager.getClient(ctx)
func (m *gcsManager) Download(ctx context.Context, output *os.File, key string) error {
client, err := m.getClient(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

rc, err := client.Bucket(manager.config.Bucket).Object(key).NewReader(ctx)
rc, err := client.Bucket(m.config.Bucket).Object(key).NewReader(ctx)
if err != nil {
return err
}
defer rc.Close()
defer func() { _ = rc.Close() }()

_, err = io.Copy(output, rc)
return err
}

func (manager *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
fileName := path.Join(manager.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))
func (m *gcsManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
fileName := path.Join(m.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))

client, err := manager.getClient(ctx)
client, err := m.getClient(ctx)
if err != nil {
return UploadedFile{}, err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

obj := client.Bucket(manager.config.Bucket).Object(fileName)
obj := client.Bucket(m.config.Bucket).Object(fileName)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, file); err != nil {
err = fmt.Errorf("copying file to GCS: %v", err)
Expand All @@ -98,82 +101,86 @@
}
attrs := w.Attrs()

return UploadedFile{Location: manager.objectURL(attrs), ObjectName: fileName}, err
return UploadedFile{Location: m.objectURL(attrs), ObjectName: fileName}, err
}

func (manager *gcsManager) Delete(ctx context.Context, keys []string) (err error) {
client, err := manager.getClient(ctx)
func (m *gcsManager) Delete(ctx context.Context, keys []string) (err error) {
client, err := m.getClient(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

for _, key := range keys {
if err := client.Bucket(manager.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
if err := client.Bucket(m.config.Bucket).Object(key).Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return err
}
}
return
}

func (manager *gcsManager) Prefix() string {
return manager.config.Prefix
func (m *gcsManager) Prefix() string {
return m.config.Prefix
}

func (manager *gcsManager) GetObjectNameFromLocation(location string) (string, error) {
splitStr := strings.Split(location, manager.config.Bucket)
func (m *gcsManager) GetObjectNameFromLocation(location string) (string, error) {
splitStr := strings.Split(location, m.config.Bucket)
object := strings.TrimLeft(splitStr[len(splitStr)-1], "/")
return object, nil
}

func (manager *gcsManager) GetDownloadKeyFromFileLocation(location string) string {
splitStr := strings.Split(location, manager.config.Bucket)
func (m *gcsManager) GetDownloadKeyFromFileLocation(location string) string {
splitStr := strings.Split(location, m.config.Bucket)
key := strings.TrimLeft(splitStr[len(splitStr)-1], "/")
return key
}

func (manager *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string {
if manager.config.EndPoint != nil && *manager.config.EndPoint != "" {
endpoint := strings.TrimSuffix(*manager.config.EndPoint, "/")
func (m *gcsManager) objectURL(objAttrs *storage.ObjectAttrs) string {
if m.config.EndPoint != nil && *m.config.EndPoint != "" {
endpoint := strings.TrimSuffix(*m.config.EndPoint, "/")
return fmt.Sprintf("%s/%s/%s", endpoint, objAttrs.Bucket, objAttrs.Name)
}
return fmt.Sprintf("https://storage.googleapis.com/%s/%s", objAttrs.Bucket, objAttrs.Name)
}

func (manager *gcsManager) getClient(ctx context.Context) (*storage.Client, error) {
var err error
func (m *gcsManager) getClient(ctx context.Context) (*storage.Client, error) {
m.clientMu.Lock()
defer m.clientMu.Unlock()

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
defer cancel()
if manager.client != nil {
return manager.client, err
if m.client != nil {
return m.client, nil
}
options := []option.ClientOption{}

if manager.config.EndPoint != nil && *manager.config.EndPoint != "" {
options = append(options, option.WithEndpoint(*manager.config.EndPoint))
var options []option.ClientOption
if m.config.EndPoint != nil && *m.config.EndPoint != "" {
options = append(options, option.WithEndpoint(*m.config.EndPoint))
}
if !googleutil.ShouldSkipCredentialsInit(manager.config.Credentials) {
if err = googleutil.CompatibleGoogleCredentialsJSON([]byte(manager.config.Credentials)); err != nil {
return manager.client, err
if !googleutil.ShouldSkipCredentialsInit(m.config.Credentials) {
if err := googleutil.CompatibleGoogleCredentialsJSON([]byte(m.config.Credentials)); err != nil {
return m.client, err
}
options = append(options, option.WithCredentialsJSON([]byte(manager.config.Credentials)))
options = append(options, option.WithCredentialsJSON([]byte(m.config.Credentials)))

Check warning on line 164 in filemanager/gcsmanager.go

View check run for this annotation

Codecov / codecov/patch

filemanager/gcsmanager.go#L164

Added line #L164 was not covered by tests
}
if manager.config.JSONReads {
if m.config.JSONReads {
options = append(options, storage.WithJSONReads())
}

manager.client, err = storage.NewClient(ctx, options...)
return manager.client, err
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

var err error
m.client, err = storage.NewClient(ctx, options...)
return m.client, err
}

type gcsManager struct {
*baseManager
config *GCSConfig

client *storage.Client
client *storage.Client
clientMu sync.Mutex
}

func gcsConfig(config map[string]interface{}) *GCSConfig {
Expand Down
Loading