Skip to content

Commit

Permalink
fix: minio manager data race (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula authored Jul 26, 2023
1 parent a5953a0 commit f55c7ae
Showing 1 changed file with 44 additions and 41 deletions.
85 changes: 44 additions & 41 deletions filemanager/miniomanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"strings"
"sync"
"time"

"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -36,86 +37,86 @@ func NewMinioManager(config map[string]interface{}, log logger.Logger, defaultTi
}, nil
}

func (manager *minioManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
func (m *minioManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession {
return &minioListSession{
baseListSession: &baseListSession{
ctx: ctx,
startAfter: startAfter,
prefix: prefix,
maxItems: maxItems,
},
manager: manager,
manager: m,
isTruncated: true,
}
}

func (manager *minioManager) Download(ctx context.Context, file *os.File, key string) error {
minioClient, err := manager.getClient()
func (m *minioManager) Download(ctx context.Context, file *os.File, key string) error {
minioClient, err := m.getClient()
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

err = minioClient.FGetObject(ctx, manager.config.Bucket, key, file.Name(), minio.GetObjectOptions{})
err = minioClient.FGetObject(ctx, m.config.Bucket, key, file.Name(), minio.GetObjectOptions{})
return err
}

func (manager *minioManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
if manager.config.Bucket == "" {
func (m *minioManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) {
if m.config.Bucket == "" {
return UploadedFile{}, errors.New("no storage bucket configured to uploader")
}

minioClient, err := manager.getClient()
minioClient, err := m.getClient()
if err != nil {
return UploadedFile{}, err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

exists, err := minioClient.BucketExists(ctx, manager.config.Bucket)
exists, err := minioClient.BucketExists(ctx, m.config.Bucket)
if err != nil {
return UploadedFile{}, fmt.Errorf("checking bucket: %w", err)
}
if !exists {
if err = minioClient.MakeBucket(ctx, manager.config.Bucket, minio.MakeBucketOptions{Region: "us-east-1"}); err != nil {
if err = minioClient.MakeBucket(ctx, m.config.Bucket, minio.MakeBucketOptions{Region: "us-east-1"}); err != nil {
return UploadedFile{}, fmt.Errorf("creating bucket: %w", err)
}
}

fileName := path.Join(manager.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))
fileName := path.Join(m.config.Prefix, path.Join(prefixes...), path.Base(file.Name()))

_, err = minioClient.FPutObject(ctx, manager.config.Bucket, fileName, file.Name(), minio.PutObjectOptions{})
_, err = minioClient.FPutObject(ctx, m.config.Bucket, fileName, file.Name(), minio.PutObjectOptions{})
if err != nil {
return UploadedFile{}, err
}

return UploadedFile{Location: manager.objectUrl(fileName), ObjectName: fileName}, nil
return UploadedFile{Location: m.objectUrl(fileName), ObjectName: fileName}, nil
}

func (manager *minioManager) Delete(ctx context.Context, keys []string) (err error) {
func (m *minioManager) Delete(ctx context.Context, keys []string) (err error) {
objectChannel := make(chan minio.ObjectInfo, len(keys))
for _, key := range keys {
objectChannel <- minio.ObjectInfo{Key: key}
}
close(objectChannel)

minioClient, err := manager.getClient()
minioClient, err := m.getClient()
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(ctx, manager.getTimeout())
ctx, cancel := context.WithTimeout(ctx, m.getTimeout())
defer cancel()

tmp := <-minioClient.RemoveObjects(ctx, manager.config.Bucket, objectChannel, minio.RemoveObjectsOptions{})
tmp := <-minioClient.RemoveObjects(ctx, m.config.Bucket, objectChannel, minio.RemoveObjectsOptions{})
return tmp.Err
}

func (manager *minioManager) Prefix() string {
return manager.config.Prefix
func (m *minioManager) Prefix() string {
return m.config.Prefix
}

/*
Expand All @@ -124,54 +125,56 @@ GetObjectNameFromLocation gets the object name/key name from the object location
https://minio-endpoint/bucket-name/key1 - >> key1
http://minio-endpoint/bucket-name/key2 - >> key2
*/
func (manager *minioManager) GetObjectNameFromLocation(location string) (string, error) {
func (m *minioManager) GetObjectNameFromLocation(location string) (string, error) {
var baseURL string
if manager.config.UseSSL {
if m.config.UseSSL {
baseURL += "https://"
} else {
baseURL += "http://"
}
baseURL += manager.config.EndPoint + "/"
baseURL += manager.config.Bucket + "/"
baseURL += m.config.EndPoint + "/"
baseURL += m.config.Bucket + "/"
return location[len(baseURL):], nil
}

func (manager *minioManager) GetDownloadKeyFromFileLocation(location string) string {
func (m *minioManager) GetDownloadKeyFromFileLocation(location string) string {
parsedUrl, err := url.Parse(location)
if err != nil {
fmt.Println("error while parsing location url: ", err)
}
trimedUrl := strings.TrimLeft(parsedUrl.Path, "/")
return strings.TrimPrefix(trimedUrl, fmt.Sprintf(`%s/`, manager.config.Bucket))
return strings.TrimPrefix(trimedUrl, fmt.Sprintf(`%s/`, m.config.Bucket))
}

func (manager *minioManager) objectUrl(objectName string) string {
func (m *minioManager) objectUrl(objectName string) string {
protocol := "http"
if manager.config.UseSSL {
if m.config.UseSSL {
protocol = "https"
}
return protocol + "://" + manager.config.EndPoint + "/" + manager.config.Bucket + "/" + objectName
return protocol + "://" + m.config.EndPoint + "/" + m.config.Bucket + "/" + objectName
}

func (manager *minioManager) getClient() (*minio.Client, error) {
var err error
if manager.client == nil {
manager.client, err = minio.New(manager.config.EndPoint, &minio.Options{
Creds: credentials.NewStaticV4(manager.config.AccessKeyID, manager.config.SecretAccessKey, ""),
Secure: manager.config.UseSSL,
func (m *minioManager) getClient() (*minio.Client, error) {
m.clientOnce.Do(func() {
m.client, m.clientErr = minio.New(m.config.EndPoint, &minio.Options{
Creds: credentials.NewStaticV4(m.config.AccessKeyID, m.config.SecretAccessKey, ""),
Secure: m.config.UseSSL,
})
if err != nil {
return &minio.Client{}, err
if m.clientErr != nil {
m.client = &minio.Client{}
}
}
return manager.client, nil
})

return m.client, m.clientErr
}

type minioManager struct {
*baseManager
config *MinioConfig

client *minio.Client
client *minio.Client
clientErr error
clientOnce sync.Once
}

func minioConfig(config map[string]interface{}) *MinioConfig {
Expand Down

0 comments on commit f55c7ae

Please sign in to comment.