Skip to content

Commit

Permalink
List objects instead when checking if bucket exists in Azure
Browse files Browse the repository at this point in the history
Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
  • Loading branch information
somtochiama committed Oct 5, 2022
1 parent f4de0a4 commit 736e091
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
11 changes: 9 additions & 2 deletions docs/spec/v1beta2/buckets.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,15 @@ The leading question mark is optional.
The query values from the `sasKey` data field in the Secrets gets merged with the ones in the `spec.endpoint` of the `Bucket`.
If the same key is present in the both of them, the value in the `sasKey` takes precedence.

Note that the Azure SAS Token has an expiry date and it should be updated before it expires so that Flux can
continue to access Azure Storage.
**Note:** SAS Token has an expiry date and it should be updated before it expires so that Flux can
continue to access Azure Storage. Also, The source-controller can use a account-level SAS token or a container-level SAS token.
The minimum permissions for an account-level SAS Token is:
- Allowed services: Blob
- Allowed resource types: Container, Object
- Allowed permission: Read, List

The minimum permissions for a container-level SAS Token is:
- Permission: Read, List

#### GCP

Expand Down
23 changes: 17 additions & 6 deletions pkg/azure/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,25 @@ func (c *BlobClient) BucketExists(ctx context.Context, bucketName string) (bool,
if err != nil {
return false, err
}
_, err = container.GetProperties(ctx, nil)
if err != nil {
var stgErr *azblob.StorageError
if errors.As(err, &stgErr) {
if stgErr.ErrorCode == azblob.StorageErrorCodeContainerNotFound {

var max int32 = 1
items := container.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
MaxResults: &max,
})
// We call next page only once since we just want to see if we get an error
items.NextPage(ctx)
if err := items.Err(); err != nil {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
if respErr.ErrorCode == string(*azblob.StorageErrorCodeContainerNotFound.ToPtr()) {
return false, nil
}
err = stgErr
err = respErr

// For a container-level SASToken, we get an AuthenticationFailed when the bucket doesn't exist
if respErr.ErrorCode == string(azblob.StorageErrorCodeAuthenticationFailed) {
return false, fmt.Errorf("(Bucket might not exist) %w", err)
}
}
return false, err
}
Expand Down
86 changes: 79 additions & 7 deletions pkg/azure/blob_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,12 @@ func TestBlobClientSASKey_FGetObject(t *testing.T) {
localPath := filepath.Join(tempDir, testFile)

// use the shared key client to create a SAS key for the account
sasKey, err := client.GetSASToken(azblob.AccountSASResourceTypes{Object: true, Container: true},
sasKey, err := client.GetSASURL(azblob.AccountSASResourceTypes{Object: true, Container: true},
azblob.AccountSASPermissions{List: true, Read: true},
azblob.AccountSASServices{Blob: true},
time.Now(),
time.Now().Add(48*time.Hour))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(sasKey).ToNot(BeEmpty())

// the sdk returns the full SAS url e.g test.blob.core.windows.net/?<actual-sas-token>
sasKey = strings.TrimPrefix(sasKey, testBucket.Spec.Endpoint+"/")
testSASKeySecret := corev1.Secret{
Expand All @@ -213,9 +211,14 @@ func TestBlobClientSASKey_FGetObject(t *testing.T) {
sasKeyClient, err := NewClient(testBucket.DeepCopy(), testSASKeySecret.DeepCopy())
g.Expect(err).ToNot(HaveOccurred())

// Test if blob exists using sasKey.
// Test if bucket and blob exists using sasKey.
ctx, timeout = context.WithTimeout(context.Background(), testTimeout)
defer timeout()

ok, err := sasKeyClient.BucketExists(ctx, testContainer)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(ok).To(BeTrue())

_, err = sasKeyClient.FGetObject(ctx, testContainer, testFile, localPath)

g.Expect(err).ToNot(HaveOccurred())
Expand All @@ -224,6 +227,68 @@ func TestBlobClientSASKey_FGetObject(t *testing.T) {
g.Expect(f).To(Equal([]byte(testFileData)))
}

func TestBlobClientContainerSASKey_BucketExists(t *testing.T) {
g := NewWithT(t)

// create a client with the shared key
client, err := NewClient(testBucket.DeepCopy(), testSecret.DeepCopy())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(client).ToNot(BeNil())

g.Expect(client.CanGetAccountSASToken()).To(BeTrue())

// Generate test container name.
testContainer := generateString(testContainerGenerateName)

// Create test container.
ctx, timeout := context.WithTimeout(context.Background(), testTimeout)
defer timeout()
g.Expect(createContainer(ctx, client, testContainer)).To(Succeed())
t.Cleanup(func() {
g.Expect(deleteContainer(context.Background(), client, testContainer)).To(Succeed())
})

// Create test blob.
ctx, timeout = context.WithTimeout(context.Background(), testTimeout)
defer timeout()
g.Expect(createBlob(ctx, client, testContainer, testFile, testFileData))

// use the container client to create a container-level SAS key for the account
containerClient, err := client.NewContainerClient(testContainer)
g.Expect(err).ToNot(HaveOccurred())
// sasKey
sasKey, err := containerClient.GetSASURL(azblob.ContainerSASPermissions{Read: true, List: true},
time.Now(),
time.Now().Add(48*time.Hour))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(sasKey).ToNot(BeEmpty())
// the sdk returns the full SAS url e.g test.blob.core.windows.net/<container-name>?<actual-sas-token>
sasKey = strings.TrimPrefix(sasKey, testBucket.Spec.Endpoint+"/"+testContainer)
testSASKeySecret := corev1.Secret{
Data: map[string][]byte{
sasKeyField: []byte(sasKey),
},
}

sasKeyClient, err := NewClient(testBucket.DeepCopy(), testSASKeySecret.DeepCopy())
g.Expect(err).ToNot(HaveOccurred())

ctx, timeout = context.WithTimeout(context.Background(), testTimeout)
defer timeout()

// Test if bucket and blob exists using sasKey.
ok, err := sasKeyClient.BucketExists(ctx, testContainer)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(ok).To(BeTrue())

// BucketExists returns a good error message if the bucket doesn't exist with container level SAS
// since the error code is AuthenticationFailed.
ok, err = sasKeyClient.BucketExists(ctx, "non-existent")
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring("Bucket might not exist"))
g.Expect(ok).To(BeFalse())
}

func TestBlobClient_FGetObject_NotFoundErr(t *testing.T) {
g := NewWithT(t)

Expand Down Expand Up @@ -340,8 +405,15 @@ func createContainer(ctx context.Context, client *BlobClient, name string) error
}

func createBlob(ctx context.Context, client *BlobClient, containerName, name, data string) error {
container := client.NewContainerClient(containerName)
blob := container.NewAppendBlobClient(name)
container, err := client.NewContainerClient(containerName)
if err != nil {
return err
}

blob, err := container.NewAppendBlobClient(name)
if err != nil {
return err
}

ctx, timeout := context.WithTimeout(context.Background(), testTimeout)
defer timeout()
Expand All @@ -350,7 +422,7 @@ func createBlob(ctx context.Context, client *BlobClient, containerName, name, da
}

hash := md5.Sum([]byte(data))
if _, err := blob.AppendBlock(ctx, streaming.NopCloser(strings.NewReader(data)), &azblob.AppendBlockOptions{
if _, err := blob.AppendBlock(ctx, streaming.NopCloser(strings.NewReader(data)), &azblob.AppendBlobAppendBlockOptions{
TransactionalContentMD5: hash[:16],
}); err != nil {
return err
Expand Down

0 comments on commit 736e091

Please sign in to comment.