Skip to content

Commit

Permalink
Merge pull request #235 from Azure/dev
Browse files Browse the repository at this point in the history
Release v0.12.0
  • Loading branch information
mohsha-msft authored Dec 9, 2020
2 parents 456ab47 + 559b75b commit 6df5d9a
Show file tree
Hide file tree
Showing 37 changed files with 1,666 additions and 818 deletions.
3 changes: 3 additions & 0 deletions BreakingChanges.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

> See the [Change Log](ChangeLog.md) for a summary of storage library changes.
## Version 0.12.0:
- Added [`ClientProvidedKeyOptions`](https://github.com/Azure/azure-storage-blob-go/blob/dev/azblob/request_common.go#L11) in function signatures.

## Version 0.3.0:
- Removed most panics from the library. Several functions now return an error.
- Removed 2016 and 2017 service versions.
7 changes: 7 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
## Version 0.12.0:
- Added support for [Customer Provided Key](https://docs.microsoft.com/en-us/azure/storage/common/storage-service-encryption) which will let users encrypt their data within client applications before uploading to Azure Storage, and decrypting data while downloading to the client
- Read here to know more about [Azure key vault](https://docs.microsoft.com/en-us/azure/key-vault/general/overview), [Encryption scope](https://docs.microsoft.com/en-us/azure/storage/blobs/encryption-scope-manage?tabs=portal), [managing encryption scope](https://docs.microsoft.com/en-us/azure/storage/blobs/encryption-scope-manage?tabs=portal), and how to [configure customer managed keys](https://docs.microsoft.com/en-us/azure/data-explorer/customer-managed-keys-portal)
- Stopped using memory-mapped files and switched to the `io.ReaderAt` and `io.WriterAt` interfaces. Please refer [this](https://github.com/Azure/azure-storage-blob-go/pull/223/commits/0e3e7a4e260c059c49a418a0f1501452d3e05a44) to know more
- Fixed issue [#214](https://github.com/Azure/azure-storage-blob-go/issues/214)
- Fixed issue [#230](https://github.com/Azure/azure-storage-blob-go/issues/230)

## Version 0.11.0:
- Added support for the service version [`2019-12-12`](https://docs.microsoft.com/en-us/rest/api/storageservices/versioning-for-the-azure-storage-services).
- Added [Get Blob Tags](https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-tags) and [Set Blob Tags](https://docs.microsoft.com/en-us/rest/api/storageservices/set-blob-tags) APIs which allow user-defined tags to be added to a blob which then act as a secondary index.
Expand Down
24 changes: 24 additions & 0 deletions azblob/bytes_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package azblob

import (
"errors"
)

type bytesWriter []byte

func newBytesWriter(b []byte) bytesWriter {
return b
}

func (c bytesWriter) WriteAt(b []byte, off int64) (int, error) {
if off >= int64(len(c)) || off < 0 {
return 0, errors.New("Offset value is out of range")
}

n := copy(c[int(off):], b)
if n < len(b) {
return n, errors.New("Not enough space for all bytes")
}

return n, nil
}
30 changes: 30 additions & 0 deletions azblob/bytes_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package azblob

import (
"bytes"

chk "gopkg.in/check.v1"
)

func (s *aztestsSuite) TestBytesWriterWriteAt(c *chk.C) {
b := make([]byte, 10)
buffer := newBytesWriter(b)

count, err := buffer.WriteAt([]byte{1, 2}, 10)
c.Assert(err, chk.ErrorMatches, "Offset value is out of range")
c.Assert(count, chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, -1)
c.Assert(err, chk.ErrorMatches, "Offset value is out of range")
c.Assert(count, chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, 9)
c.Assert(err, chk.ErrorMatches, "Not enough space for all bytes")
c.Assert(count, chk.Equals, 1)
c.Assert(bytes.Compare(b, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}), chk.Equals, 0)

count, err = buffer.WriteAt([]byte{1, 2}, 8)
c.Assert(err, chk.IsNil)
c.Assert(count, chk.Equals, 2)
c.Assert(bytes.Compare(b, []byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 2}), chk.Equals, 0)
}
11 changes: 5 additions & 6 deletions azblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
// This allows us to provide a local implementation that fakes the server for hermetic testing.
type blockWriter interface {
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte) (*BlockBlobStageBlockResponse, error)
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions, AccessTierType, BlobTagsMap) (*BlockBlobCommitBlockListResponse, error)
StageBlock(context.Context, string, io.ReadSeeker, LeaseAccessConditions, []byte, ClientProvidedKeyOptions) (*BlockBlobStageBlockResponse, error)
CommitBlockList(context.Context, []string, BlobHTTPHeaders, Metadata, BlobAccessConditions, AccessTierType, BlobTagsMap, ClientProvidedKeyOptions) (*BlockBlobCommitBlockListResponse, error)
}

// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
Expand Down Expand Up @@ -183,8 +183,7 @@ func (c *copier) write(chunk copierChunk) error {
if err := c.ctx.Err(); err != nil {
return err
}

_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), LeaseAccessConditions{}, nil)
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
if err != nil {
return fmt.Errorf("write error: %w", err)
}
Expand All @@ -201,11 +200,11 @@ func (c *copier) close() error {
}

var err error
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier, c.o.BlobTagsMap)
c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier, c.o.BlobTagsMap, c.o.ClientProvidedKeyOptions)
return err
}

// id allows the creation of unique IDs based on UUID4 + an int32. This autoincrements.
// id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments.
type id struct {
u [64]byte
num uint32
Expand Down
4 changes: 2 additions & 2 deletions azblob/chunkwriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newFakeBlockWriter() *fakeBlockWriter {
return f
}

func (f *fakeBlockWriter) StageBlock(ctx context.Context, blockID string, r io.ReadSeeker, cond LeaseAccessConditions, md5 []byte) (*BlockBlobStageBlockResponse, error) {
func (f *fakeBlockWriter) StageBlock(ctx context.Context, blockID string, r io.ReadSeeker, cond LeaseAccessConditions, md5 []byte, cpk ClientProvidedKeyOptions) (*BlockBlobStageBlockResponse, error) {
n := atomic.AddInt32(&f.block, 1)
if n == f.errOnBlock {
return nil, io.ErrNoProgress
Expand All @@ -58,7 +58,7 @@ func (f *fakeBlockWriter) StageBlock(ctx context.Context, blockID string, r io.R
return &BlockBlobStageBlockResponse{}, nil
}

func (f *fakeBlockWriter) CommitBlockList(ctx context.Context, blockIDs []string, headers BlobHTTPHeaders, meta Metadata, access BlobAccessConditions, tier AccessTierType, blobTagsMap BlobTagsMap) (*BlockBlobCommitBlockListResponse, error) {
func (f *fakeBlockWriter) CommitBlockList(ctx context.Context, blockIDs []string, headers BlobHTTPHeaders, meta Metadata, access BlobAccessConditions, tier AccessTierType, blobTagsMap BlobTagsMap, options ClientProvidedKeyOptions) (*BlockBlobCommitBlockListResponse, error) {
dst, err := os.OpenFile(filepath.Join(f.path, finalFileName), os.O_CREATE+os.O_WRONLY, 0600)
if err != nil {
return nil, err
Expand Down
97 changes: 50 additions & 47 deletions azblob/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,59 +58,61 @@ type UploadToBlockBlobOptions struct {
// BlobAccessTier indicates the tier of blob
BlobAccessTier AccessTierType

// BlobTagsStg
// BlobTagsMap
BlobTagsMap BlobTagsMap

// ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data.
ClientProvidedKeyOptions ClientProvidedKeyOptions

// Parallelism indicates the maximum number of blocks to upload in parallel (0=default)
Parallelism uint16
}

// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
func UploadBufferToBlockBlob(ctx context.Context, b []byte,
// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
bufferSize := int64(len(b))
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if bufferSize <= BlockBlobMaxUploadBlobBytes {
if readerSize <= BlockBlobMaxUploadBlobBytes {
o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
} else {
o.BlockSize = bufferSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
}
}

if bufferSize <= BlockBlobMaxUploadBlobBytes {
if readerSize <= BlockBlobMaxUploadBlobBytes {
// If the size can fit in 1 Upload call, do it this way
var body io.ReadSeeker = bytes.NewReader(b)
var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
if o.Progress != nil {
body = pipeline.NewRequestBodyProgress(body, o.Progress)
}
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap)
return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
}

var numBlocks = uint16(((bufferSize - 1) / o.BlockSize) + 1)
var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)

blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
progress := int64(0)
progressLock := &sync.Mutex{}

err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "UploadBufferToBlockBlob",
TransferSize: bufferSize,
OperationName: "uploadReaderAtToBlockBlob",
TransferSize: readerSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
var body io.ReadSeeker = bytes.NewReader(b[offset : offset+count])
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
blockNum := offset / o.BlockSize
if o.Progress != nil {
blockProgress := int64(0)
Expand All @@ -128,15 +130,21 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
// at the same time causing PutBlockList to get a mix of blocks from all the clients.
blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil)
_, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
return err
},
})
if err != nil {
return nil, err
}
// All put blocks were successful, call Put Block List to finalize the blob
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap)
return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions)
}

// UploadBufferToBlockBlob uploads a buffer in blocks to a block blob.
func UploadBufferToBlockBlob(ctx context.Context, b []byte,
blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
return uploadReaderAtToBlockBlob(ctx, bytes.NewReader(b), int64(len(b)), blockBlobURL, o)
}

// UploadFileToBlockBlob uploads a file in blocks to a block blob.
Expand All @@ -147,15 +155,7 @@ func UploadFileToBlockBlob(ctx context.Context, file *os.File,
if err != nil {
return nil, err
}
m := mmf{} // Default to an empty slice; used for 0-size file
if stat.Size() != 0 {
m, err = newMMF(file, false, 0, int(stat.Size()))
if err != nil {
return nil, err
}
defer m.unmap()
}
return UploadBufferToBlockBlob(ctx, m, blockBlobURL, o)
return uploadReaderAtToBlockBlob(ctx, file, stat.Size(), blockBlobURL, o)
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -173,16 +173,19 @@ type DownloadFromBlobOptions struct {
// AccessConditions indicates the access conditions used when making HTTP GET requests against the blob.
AccessConditions BlobAccessConditions

// ClientProvidedKeyOptions indicates the client provided key by name and/or by value to encrypt/decrypt data.
ClientProvidedKeyOptions ClientProvidedKeyOptions

// Parallelism indicates the maximum number of blocks to download in parallel (0=default)
Parallelism uint16

// RetryReaderOptionsPerBlock is used when downloading each block.
RetryReaderOptionsPerBlock RetryReaderOptions
}

// downloadBlobToBuffer downloads an Azure blob to a buffer with parallel.
func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
b []byte, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
// downloadBlobToWriterAt downloads an Azure blob to a buffer with parallel.
func downloadBlobToWriterAt(ctx context.Context, blobURL BlobURL, offset int64, count int64,
writer io.WriterAt, o DownloadFromBlobOptions, initialDownloadResponse *DownloadResponse) error {
if o.BlockSize == 0 {
o.BlockSize = BlobDefaultDownloadBlockSize
}
Expand All @@ -192,25 +195,30 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
count = initialDownloadResponse.ContentLength() - offset // if we have the length, use it
} else {
// If we don't have the length at all, get it
dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false)
dr, err := blobURL.Download(ctx, 0, CountToEnd, o.AccessConditions, false, o.ClientProvidedKeyOptions)
if err != nil {
return err
}
count = dr.ContentLength() - offset
}
}

if count <= 0 {
// The file is empty, there is nothing to download.
return nil
}

// Prepare and do parallel download.
progress := int64(0)
progressLock := &sync.Mutex{}

err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "downloadBlobToBuffer",
OperationName: "downloadBlobToWriterAt",
TransferSize: count,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(chunkStart int64, count int64, ctx context.Context) error {
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false)
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false, o.ClientProvidedKeyOptions)
if err != nil {
return err
}
Expand All @@ -228,7 +236,7 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
progressLock.Unlock()
})
}
_, err = io.ReadFull(body, b[chunkStart:chunkStart+count])
_, err = io.Copy(newSectionWriter(writer, chunkStart, count), body)
body.Close()
return err
},
Expand All @@ -243,7 +251,7 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
// Offset and count are optional, pass 0 for both to download the entire blob.
func DownloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, count int64,
b []byte, o DownloadFromBlobOptions) error {
return downloadBlobToBuffer(ctx, blobURL, offset, count, b, o, nil)
return downloadBlobToWriterAt(ctx, blobURL, offset, count, newBytesWriter(b), o, nil)
}

// DownloadBlobToFile downloads an Azure blob to a local file.
Expand All @@ -256,7 +264,7 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun

if count == CountToEnd {
// Try to get Azure blob's size
props, err := blobURL.GetProperties(ctx, o.AccessConditions)
props, err := blobURL.GetProperties(ctx, o.AccessConditions, o.ClientProvidedKeyOptions)
if err != nil {
return err
}
Expand All @@ -277,13 +285,7 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
}

if size > 0 {
// 3. Set mmap and call downloadBlobToBuffer.
m, err := newMMF(file, true, 0, int(size))
if err != nil {
return err
}
defer m.unmap()
return downloadBlobToBuffer(ctx, blobURL, offset, size, m, o, nil)
return downloadBlobToWriterAt(ctx, blobURL, offset, size, file, o, nil)
} else { // if the blob's size is 0, there is no need in downloading it
return nil
}
Expand Down Expand Up @@ -365,12 +367,13 @@ type UploadStreamToBlockBlobOptions struct {
// BufferSize sizes the buffer used to read data from source. If < 1 MiB, defaults to 1 MiB.
BufferSize int
// MaxBuffers defines the number of simultaneous uploads will be performed to upload the file.
MaxBuffers int
BlobHTTPHeaders BlobHTTPHeaders
Metadata Metadata
AccessConditions BlobAccessConditions
BlobAccessTier AccessTierType
BlobTagsMap BlobTagsMap
MaxBuffers int
BlobHTTPHeaders BlobHTTPHeaders
Metadata Metadata
AccessConditions BlobAccessConditions
BlobAccessTier AccessTierType
BlobTagsMap BlobTagsMap
ClientProvidedKeyOptions ClientProvidedKeyOptions
}

func (u *UploadStreamToBlockBlobOptions) defaults() {
Expand Down
6 changes: 0 additions & 6 deletions azblob/parsing_urls.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package azblob

import (
"errors"
"net"
"net/url"
"strings"
Expand Down Expand Up @@ -135,11 +134,6 @@ func (up BlobURLParts) URL() url.URL {

rawQuery := up.UnparsedParams

// Check: Both snapshot and version id cannot be present in the request URL.
if up.Snapshot != "" && up.VersionID != "" {
errors.New("Snapshot and versioning cannot be enabled simultaneously")
}

//If no snapshot is initially provided, fill it in from the SAS query properties to help the user
if up.Snapshot == "" && !up.SAS.snapshotTime.IsZero() {
up.Snapshot = up.SAS.snapshotTime.Format(SnapshotTimeFormat)
Expand Down
Loading

0 comments on commit 6df5d9a

Please sign in to comment.