Skip to content

Commit

Permalink
Merge pull request #147 from Azure/dev
Browse files Browse the repository at this point in the history
Release 0.8.0
  • Loading branch information
zezha-msft authored Aug 17, 2019
2 parents 33c102d + 3e58b0e commit fc70003
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 67 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
## Version 0.8.0:
- Fixed error handling in high-level function DoBatchTransfer, and made it public for easy customization

## Version 0.7.0:
- Added the ability to obtain User Delegation Keys (UDK)
- Added the ability to create User Delegation SAS tokens from UDKs
Expand Down
81 changes: 43 additions & 38 deletions azblob/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
if o.BlockSize == 0 {
// If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
if bufferSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
return nil, errors.New("Buffer is too large to upload to a block blob")
return nil, errors.New("buffer is too large to upload to a block blob")
}
// If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
if bufferSize <= BlockBlobMaxUploadBlobBytes {
Expand All @@ -76,7 +76,7 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
o.BlockSize = BlobDefaultDownloadBlockSize
}
// StageBlock will be called with blockSize blocks and a parallelism of (BufferSize / BlockSize).
// StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
}
}

Expand All @@ -95,12 +95,12 @@ func UploadBufferToBlockBlob(ctx context.Context, b []byte,
progress := int64(0)
progressLock := &sync.Mutex{}

err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "UploadBufferToBlockBlob",
transferSize: bufferSize,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(offset int64, count int64) error {
err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "UploadBufferToBlockBlob",
TransferSize: bufferSize,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(offset int64, count int64, ctx context.Context) error {
// This function is called once per block.
// It is passed this block's offset within the buffer and its count of bytes
// Prepare to read the proper block/section of the buffer
Expand Down Expand Up @@ -198,12 +198,12 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co
progress := int64(0)
progressLock := &sync.Mutex{}

err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "downloadBlobToBuffer",
transferSize: count,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(chunkStart int64, count int64) error {
err := DoBatchTransfer(ctx, BatchTransferOptions{
OperationName: "downloadBlobToBuffer",
TransferSize: count,
ChunkSize: o.BlockSize,
Parallelism: o.Parallelism,
Operation: func(chunkStart int64, count int64, ctx context.Context) error {
dr, err := blobURL.Download(ctx, chunkStart+offset, count, o.AccessConditions, false)
if err != nil {
return err
Expand Down Expand Up @@ -285,64 +285,69 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun

///////////////////////////////////////////////////////////////////////////////

// BatchTransferOptions identifies options used by doBatchTransfer.
type batchTransferOptions struct {
transferSize int64
chunkSize int64
parallelism uint16
operation func(offset int64, chunkSize int64) error
operationName string
// BatchTransferOptions identifies options used by DoBatchTransfer.
type BatchTransferOptions struct {
TransferSize int64
ChunkSize int64
Parallelism uint16
Operation func(offset int64, chunkSize int64, ctx context.Context) error
OperationName string
}

// doBatchTransfer helps to execute operations in a batch manner.
func doBatchTransfer(ctx context.Context, o batchTransferOptions) error {
// DoBatchTransfer helps to execute operations in a batch manner.
// Can be used by users to customize batch works (for other scenarios that the SDK does not provide)
func DoBatchTransfer(ctx context.Context, o BatchTransferOptions) error {
if o.ChunkSize == 0 {
return errors.New("ChunkSize cannot be 0")
}

// Prepare and do parallel operations.
numChunks := uint16(((o.transferSize - 1) / o.chunkSize) + 1)
operationChannel := make(chan func() error, o.parallelism) // Create the channel that release 'parallelism' goroutines concurrently
numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1)
operationChannel := make(chan func() error, o.Parallelism) // Create the channel that release 'Parallelism' goroutines concurrently
operationResponseChannel := make(chan error, numChunks) // Holds each response
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Create the goroutines that process each operation (in parallel).
if o.parallelism == 0 {
o.parallelism = 5 // default parallelism
if o.Parallelism == 0 {
o.Parallelism = 5 // default Parallelism
}
for g := uint16(0); g < o.parallelism; g++ {
for g := uint16(0); g < o.Parallelism; g++ {
//grIndex := g
go func() {
for f := range operationChannel {
//fmt.Printf("[%s] gr-%d start action\n", o.operationName, grIndex)
err := f()
operationResponseChannel <- err
//fmt.Printf("[%s] gr-%d end action\n", o.operationName, grIndex)
}
}()
}

// Add each chunk's operation to the channel.
for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
curChunkSize := o.chunkSize
curChunkSize := o.ChunkSize

if chunkNum == numChunks-1 { // Last chunk
curChunkSize = o.transferSize - (int64(chunkNum) * o.chunkSize) // Remove size of all transferred chunks from total
curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total
}
offset := int64(chunkNum) * o.chunkSize
offset := int64(chunkNum) * o.ChunkSize

operationChannel <- func() error {
return o.operation(offset, curChunkSize)
return o.Operation(offset, curChunkSize, ctx)
}
}
close(operationChannel)

// Wait for the operations to complete.
var firstErr error = nil
for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ {
responseError := <-operationResponseChannel
if responseError != nil {
cancel() // As soon as any operation fails, cancel all remaining operation calls
return responseError // No need to process anymore responses
// record the first error (the original error which should cause the other chunks to fail with canceled context)
if responseError != nil && firstErr == nil {
cancel() // As soon as any operation fails, cancel all remaining operation calls
firstErr = responseError
}
}
return nil
return firstErr
}

////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion azblob/zt_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func ExampleNewPipeline() {
// Send the request over the network
resp, err := client.Do(request.WithContext(ctx))

return &httpResponse{response: resp}, err
return pipeline.NewHTTPResponse(resp), err
}
}),
}
Expand Down
104 changes: 104 additions & 0 deletions azblob/zt_highlevel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package azblob_test

import (
"context"
"errors"
"io/ioutil"
"os"
"sync/atomic"
"time"

"github.com/Azure/azure-storage-blob-go/azblob"
chk "gopkg.in/check.v1"
Expand Down Expand Up @@ -329,3 +332,104 @@ func (s *aztestsSuite) TestDownloadBufferWithNonZeroOffsetAndCount(c *chk.C) {
downloadCount := 6 * 1024
performUploadAndDownloadBufferTest(c, blobSize, blockSize, parallelism, downloadOffset, downloadCount)
}

func (s *aztestsSuite) TestBasicDoBatchTransfer(c *chk.C) {
// test the basic multi-routine processing
type testInstance struct {
transferSize int64
chunkSize int64
parallelism uint16
expectError bool
}

testMatrix := []testInstance{
{transferSize: 100, chunkSize: 10, parallelism: 5, expectError: false},
{transferSize: 100, chunkSize: 9, parallelism: 4, expectError: false},
{transferSize: 100, chunkSize: 8, parallelism: 15, expectError: false},
{transferSize: 100, chunkSize: 1, parallelism: 3, expectError: false},
{transferSize: 0, chunkSize: 100, parallelism: 5, expectError: false}, // empty file works
{transferSize: 100, chunkSize: 0, parallelism: 5, expectError: true}, // 0 chunk size on the other hand must fail
{transferSize: 0, chunkSize: 0, parallelism: 5, expectError: true},
}

for _, test := range testMatrix {
ctx := context.Background()
// maintain some counts to make sure the right number of chunks were queued, and the total size is correct
totalSizeCount := int64(0)
runCount := int64(0)

err := azblob.DoBatchTransfer(ctx, azblob.BatchTransferOptions{
TransferSize: test.transferSize,
ChunkSize: test.chunkSize,
Parallelism: test.parallelism,
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
atomic.AddInt64(&totalSizeCount, chunkSize)
atomic.AddInt64(&runCount, 1)
return nil
},
OperationName: "TestHappyPath",
})

if test.expectError {
c.Assert(err, chk.NotNil)
} else {
c.Assert(err, chk.IsNil)
c.Assert(totalSizeCount, chk.Equals, test.transferSize)
c.Assert(runCount, chk.Equals, ((test.transferSize-1)/test.chunkSize)+1)
}
}
}

// mock a memory mapped file (low-quality mock, meant to simulate the scenario only)
type mockMMF struct {
isClosed bool
failHandle *chk.C
}

// accept input
func (m *mockMMF) write(input string) {
if m.isClosed {
// simulate panic
m.failHandle.Fail()
}
}

func (s *aztestsSuite) TestDoBatchTransferWithError(c *chk.C) {
ctx := context.Background()
mmf := mockMMF{failHandle: c}
expectedFirstError := errors.New("#3 means trouble")

err := azblob.DoBatchTransfer(ctx, azblob.BatchTransferOptions{
TransferSize: 5,
ChunkSize: 1,
Parallelism: 5,
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
// simulate doing some work (HTTP call in real scenarios)
// later chunks later longer to finish
time.Sleep(time.Second * time.Duration(offset))
// simulate having gotten data and write it to the memory mapped file
mmf.write("input")

// with one of the chunks, pretend like an error occurred (like the network connection breaks)
if offset == 3 {
return expectedFirstError
} else if offset > 3 {
// anything after offset=3 are canceled
// so verify that the context indeed got canceled
ctxErr := ctx.Err()
c.Assert(ctxErr, chk.Equals, context.Canceled)
return ctxErr
}

// anything before offset=3 should be done without problem
return nil
},
OperationName: "TestErrorPath",
})

c.Assert(err, chk.Equals, expectedFirstError)

// simulate closing the mmf and make sure no panic occurs (as reported in #139)
mmf.isClosed = true
time.Sleep(time.Second * 5)
}
21 changes: 10 additions & 11 deletions azblob/zt_retry_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ type perByteReader struct {
injectedError error

// sleepDuraion and closeChannel are only use in "forced cancellation" tests
sleepDuration time.Duration
closeChannel chan struct{}
sleepDuration time.Duration
closeChannel chan struct{}
}

func newPerByteReader(byteCount int) *perByteReader {
perByteReader := perByteReader{
byteCount: byteCount,
byteCount: byteCount,
closeChannel: nil,
}

perByteReader.RandomBytes = make([]byte, byteCount)
_,_ = rand.Read(perByteReader.RandomBytes)
_, _ = rand.Read(perByteReader.RandomBytes)

return &perByteReader
}

func newSingleUsePerByteReader(contents []byte) *perByteReader {
perByteReader := perByteReader{
byteCount: len(contents),
byteCount: len(contents),
closeChannel: make(chan struct{}, 10),
}

Expand Down Expand Up @@ -86,7 +86,7 @@ func (r *perByteReader) Close() error {

// Test normal retry succeed, note initial response not provided.
// Tests both with and without notification of failures
func (r *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
func (s *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
// Test twice, the second time using the optional "logging"/notification callback for failed tries
// We must test both with and without the callback, since be testing without
// we are testing that it is, indeed, optional to provide the callback
Expand Down Expand Up @@ -155,7 +155,7 @@ func (r *aztestsSuite) TestRetryReaderReadWithRetry(c *chk.C) {
}

// Test normal retry fail as retry Count not enough.
func (r *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
func (s *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
// Extra setup for testing notification of failures (i.e. of unsuccessful tries)
failureMethodNumCalls := 0
failureWillRetryCount := 0
Expand Down Expand Up @@ -210,7 +210,7 @@ func (r *aztestsSuite) TestRetryReaderReadNegativeNormalFail(c *chk.C) {
}

// Test boundary case when Count equals to 0 and fail.
func (r *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
func (s *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
byteCount := 1
body := newPerByteReader(byteCount)
body.doInjectError = true
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *aztestsSuite) TestRetryReaderReadCount0(c *chk.C) {
c.Assert(err, chk.Equals, io.EOF)
}

func (r *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
func (s *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
byteCount := 1
body := newPerByteReader(byteCount)
body.doInjectError = true
Expand Down Expand Up @@ -274,7 +274,7 @@ func (r *aztestsSuite) TestRetryReaderReadNegativeNonRetriableError(c *chk.C) {
// purposes of unit testing, here we are testing the cancellation mechanism that is exposed to
// consumers of the API, to allow programmatic forcing of retries (e.g. if the consumer deems
// the read to be taking too long, they may force a retry in the hope of better performance next time).
func (r *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {
func (s *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {

for _, enableRetryOnEarlyClose := range []bool{false, true} {

Expand Down Expand Up @@ -327,5 +327,4 @@ func (r *aztestsSuite) TestRetryReaderReadWithForcedRetry(c *chk.C) {
}
}


// End testings for RetryReader
Loading

0 comments on commit fc70003

Please sign in to comment.