From f55c7aed4a8876426c24cf4992f14e694cc2e409 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Wed, 26 Jul 2023 11:23:48 +0200 Subject: [PATCH] fix: minio manager data race (#82) --- filemanager/miniomanager.go | 85 +++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/filemanager/miniomanager.go b/filemanager/miniomanager.go index bf7d7549..4a277f5d 100644 --- a/filemanager/miniomanager.go +++ b/filemanager/miniomanager.go @@ -8,6 +8,7 @@ import ( "os" "path" "strings" + "sync" "time" "github.com/minio/minio-go/v7" @@ -36,7 +37,7 @@ func NewMinioManager(config map[string]interface{}, log logger.Logger, defaultTi }, nil } -func (manager *minioManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession { +func (m *minioManager) ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession { return &minioListSession{ baseListSession: &baseListSession{ ctx: ctx, @@ -44,78 +45,78 @@ func (manager *minioManager) ListFilesWithPrefix(ctx context.Context, startAfter prefix: prefix, maxItems: maxItems, }, - manager: manager, + manager: m, isTruncated: true, } } -func (manager *minioManager) Download(ctx context.Context, file *os.File, key string) error { - minioClient, err := manager.getClient() +func (m *minioManager) Download(ctx context.Context, file *os.File, key string) error { + minioClient, err := m.getClient() if err != nil { return err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() - err = minioClient.FGetObject(ctx, manager.config.Bucket, key, file.Name(), minio.GetObjectOptions{}) + err = minioClient.FGetObject(ctx, m.config.Bucket, key, file.Name(), minio.GetObjectOptions{}) return err } -func (manager *minioManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) { - if manager.config.Bucket == "" { +func (m *minioManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (UploadedFile, error) { + if m.config.Bucket == "" { return UploadedFile{}, errors.New("no storage bucket configured to uploader") } - minioClient, err := manager.getClient() + minioClient, err := m.getClient() if err != nil { return UploadedFile{}, err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() - exists, err := minioClient.BucketExists(ctx, manager.config.Bucket) + exists, err := minioClient.BucketExists(ctx, m.config.Bucket) if err != nil { return UploadedFile{}, fmt.Errorf("checking bucket: %w", err) } if !exists { - if err = minioClient.MakeBucket(ctx, manager.config.Bucket, minio.MakeBucketOptions{Region: "us-east-1"}); err != nil { + if err = minioClient.MakeBucket(ctx, m.config.Bucket, minio.MakeBucketOptions{Region: "us-east-1"}); err != nil { return UploadedFile{}, fmt.Errorf("creating bucket: %w", err) } } - fileName := path.Join(manager.config.Prefix, path.Join(prefixes...), path.Base(file.Name())) + fileName := path.Join(m.config.Prefix, path.Join(prefixes...), path.Base(file.Name())) - _, err = minioClient.FPutObject(ctx, manager.config.Bucket, fileName, file.Name(), minio.PutObjectOptions{}) + _, err = minioClient.FPutObject(ctx, m.config.Bucket, fileName, file.Name(), minio.PutObjectOptions{}) if err != nil { return UploadedFile{}, err } - return UploadedFile{Location: manager.objectUrl(fileName), ObjectName: fileName}, nil + return UploadedFile{Location: m.objectUrl(fileName), ObjectName: fileName}, nil } -func (manager *minioManager) Delete(ctx context.Context, keys []string) (err error) { +func (m *minioManager) Delete(ctx context.Context, keys []string) (err error) { objectChannel := make(chan minio.ObjectInfo, len(keys)) for _, key := range keys { objectChannel <- minio.ObjectInfo{Key: key} } close(objectChannel) - minioClient, err := manager.getClient() + minioClient, err := m.getClient() if err != nil { return err } - ctx, cancel := context.WithTimeout(ctx, manager.getTimeout()) + ctx, cancel := context.WithTimeout(ctx, m.getTimeout()) defer cancel() - tmp := <-minioClient.RemoveObjects(ctx, manager.config.Bucket, objectChannel, minio.RemoveObjectsOptions{}) + tmp := <-minioClient.RemoveObjects(ctx, m.config.Bucket, objectChannel, minio.RemoveObjectsOptions{}) return tmp.Err } -func (manager *minioManager) Prefix() string { - return manager.config.Prefix +func (m *minioManager) Prefix() string { + return m.config.Prefix } /* @@ -124,54 +125,56 @@ GetObjectNameFromLocation gets the object name/key name from the object location https://minio-endpoint/bucket-name/key1 - >> key1 http://minio-endpoint/bucket-name/key2 - >> key2 */ -func (manager *minioManager) GetObjectNameFromLocation(location string) (string, error) { +func (m *minioManager) GetObjectNameFromLocation(location string) (string, error) { var baseURL string - if manager.config.UseSSL { + if m.config.UseSSL { baseURL += "https://" } else { baseURL += "http://" } - baseURL += manager.config.EndPoint + "/" - baseURL += manager.config.Bucket + "/" + baseURL += m.config.EndPoint + "/" + baseURL += m.config.Bucket + "/" return location[len(baseURL):], nil } -func (manager *minioManager) GetDownloadKeyFromFileLocation(location string) string { +func (m *minioManager) GetDownloadKeyFromFileLocation(location string) string { parsedUrl, err := url.Parse(location) if err != nil { fmt.Println("error while parsing location url: ", err) } trimedUrl := strings.TrimLeft(parsedUrl.Path, "/") - return strings.TrimPrefix(trimedUrl, fmt.Sprintf(`%s/`, manager.config.Bucket)) + return strings.TrimPrefix(trimedUrl, fmt.Sprintf(`%s/`, m.config.Bucket)) } -func (manager *minioManager) objectUrl(objectName string) string { +func (m *minioManager) objectUrl(objectName string) string { protocol := "http" - if manager.config.UseSSL { + if m.config.UseSSL { protocol = "https" } - return protocol + "://" + manager.config.EndPoint + "/" + manager.config.Bucket + "/" + objectName + return protocol + "://" + m.config.EndPoint + "/" + m.config.Bucket + "/" + objectName } -func (manager *minioManager) getClient() (*minio.Client, error) { - var err error - if manager.client == nil { - manager.client, err = minio.New(manager.config.EndPoint, &minio.Options{ - Creds: credentials.NewStaticV4(manager.config.AccessKeyID, manager.config.SecretAccessKey, ""), - Secure: manager.config.UseSSL, +func (m *minioManager) getClient() (*minio.Client, error) { + m.clientOnce.Do(func() { + m.client, m.clientErr = minio.New(m.config.EndPoint, &minio.Options{ + Creds: credentials.NewStaticV4(m.config.AccessKeyID, m.config.SecretAccessKey, ""), + Secure: m.config.UseSSL, }) - if err != nil { - return &minio.Client{}, err + if m.clientErr != nil { + m.client = &minio.Client{} } - } - return manager.client, nil + }) + + return m.client, m.clientErr } type minioManager struct { *baseManager config *MinioConfig - client *minio.Client + client *minio.Client + clientErr error + clientOnce sync.Once } func minioConfig(config map[string]interface{}) *MinioConfig {