Skip to content

Commit

Permalink
Added a support of the azure blob storage
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanovOleg committed Nov 11, 2018
1 parent a001bff commit a71c6d9
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 1 deletion.
9 changes: 8 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ import:
- aws
- aws/session
- service/s3
- package: github.com/Azure/azure-pipeline-go
version: "v0.1.8"
subpackages:
- pipeline
- package: github.com/Azure/azure-storage-blob-go
version: "0.3.0"
subpackages:
- azblob
- package: github.com/spf13/cobra
version: v0.0.3
- package: k8s.io/api
Expand All @@ -22,4 +30,3 @@ import:
- kubernetes
- rest
- tools/clientcmd

184 changes: 184 additions & 0 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package skbn

import (
"bytes"
"context"
"fmt"
"log"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
)

var err error

func validateAbsPath(pathSplit []string) error {
if len(pathSplit) >= 1 {
return nil
}
return fmt.Errorf("illegal path: %s", filepath.Join(pathSplit...))
}

func initAbsVariables(split []string) (string, string, string) {
account := split[0]
container := split[1]
path := filepath.Join(split[2:]...)

return account, container, path
}

func getNewPipeline() (pipeline.Pipeline, error) {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")

if len(accountName) == 0 || len(accountKey) == 0 {
log.Fatal("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
}

credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)

if err != nil {
log.Fatal("Invalid credentials with error: " + err.Error())
}
pl := azblob.NewPipeline(credential, azblob.PipelineOptions{})

return pl, err
}

func getServiceURL(pl pipeline.Pipeline, accountName string) (azblob.ServiceURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/", accountName))

surl := azblob.NewServiceURL(*URL, pl)
return surl, err
}

func getContainerURL(pl pipeline.Pipeline, accountName string, containerName string) (azblob.ContainerURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))

curl := azblob.NewContainerURL(*URL, pl)
return curl, err
}

func getBlobURL(curl azblob.ContainerURL, blob string) (azblob.BlockBlobURL, error) {
return curl.NewBlockBlobURL(blob), err
}

func createContainer(ctx context.Context, pl pipeline.Pipeline, curl azblob.ContainerURL) (*azblob.ContainerCreateResponse, error) {
cr, err := curl.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
return cr, err
}

func listContainers(ctx context.Context, surl azblob.ServiceURL) ([]azblob.ContainerItem, error) {
lc, err := surl.ListContainersSegment(ctx, azblob.Marker{}, azblob.ListContainersSegmentOptions{})
return lc.ContainerItems, err
}

func containerExists(list []azblob.ContainerItem, containerName string) bool {
exists := false
for _, v := range list {
if containerName == v.Name {
exists = true
}
}
return exists
}

// GetClientToAbs checks the connection to azure blob storage and returns the tested client (pipeline)
func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error) {
pSplit := strings.Split(path, "/")
a, c, _ := initAbsVariables(pSplit)
pl, err := getNewPipeline()
su, err := getServiceURL(pl, a)
lc, err := listContainers(ctx, su)
cu, err := getContainerURL(pl, a, c)

if !containerExists(lc, c) {
_, err := createContainer(ctx, pl, cu)

if err != nil {
return nil, err
}
}

return pl, err
}

// GetListOfFilesFromAbs gets list of files in path from azure blob storage (recursive)
func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string) ([]string, error) {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)
bl := []string{}

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := cu.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})

if err != nil {
return nil, err
}

marker = listBlob.NextMarker

for _, blobInfo := range listBlob.Segment.BlobItems {
bl = append(bl, strings.Replace(blobInfo.Name, p, "", 1))
}
}

return bl, err
}

// DownloadFromAbs downloads a single file from azure blob storage
func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]byte, error) {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)
bu, err := getBlobURL(cu, p)
dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
dd := bytes.Buffer{}
_, err = dd.ReadFrom(bs)

return dd.Bytes(), err
}

// UploadToAbs uploads a single file to azure blob storage
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, buffer []byte) error {
pSplit := strings.Split(toPath, "/")

if err := validateAbsPath(pSplit); err != nil {
return err
}

if len(pSplit) == 1 {
_, fn := filepath.Split(fromPath)
pSplit = append(pSplit, fn)
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)
bu, err := getBlobURL(cu, p)

_, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})

return err
}
50 changes: 50 additions & 0 deletions pkg/skbn/skbn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package skbn

import (
"context"
"fmt"
"log"
"math"
Expand Down Expand Up @@ -45,13 +46,15 @@ func TestImplementationsExist(srcPrefix, dstPrefix string) error {
switch srcPrefix {
case "k8s":
case "s3":
case "abs":
default:
return fmt.Errorf(srcPrefix + " not implemented")
}

switch dstPrefix {
case "k8s":
case "s3":
case "abs":
default:
return fmt.Errorf(dstPrefix + " not implemented")
}
Expand All @@ -64,8 +67,12 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int
var srcClient interface{}
var dstClient interface{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

k8sTested := false
s3Tested := false
absTested := false

switch srcPrefix {
case "k8s":
Expand All @@ -82,6 +89,13 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int
}
srcClient = client
s3Tested = true
case "abs":
client, err := GetClientToAbs(ctx, srcPath)
if err != nil {
return nil, nil, err
}
srcClient = client
absTested = true
default:
return nil, nil, fmt.Errorf(srcPrefix + " not implemented")
}
Expand All @@ -107,6 +121,16 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int
return nil, nil, err
}
dstClient = client
case "abs":
if absTested {
dstClient = srcClient
break
}
client, err := GetClientToAbs(ctx, dstPath)
if err != nil {
return nil, nil, err
}
dstClient = client
default:
return nil, nil, fmt.Errorf(srcPrefix + " not implemented")
}
Expand Down Expand Up @@ -178,6 +202,9 @@ func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string,
func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) {
var relativePaths []string

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

switch prefix {
case "k8s":
paths, err := GetListOfFilesFromK8s(client, path, "f", "*")
Expand All @@ -191,6 +218,12 @@ func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) {
return nil, err
}
relativePaths = paths
case "abs":
paths, err := GetListOfFilesFromAbs(ctx, client, path)
if err != nil {
return nil, err
}
relativePaths = paths
default:
return nil, fmt.Errorf(prefix + " not implemented")
}
Expand All @@ -202,6 +235,9 @@ func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) {
func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error) {
var buffer []byte

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

switch srcPrefix {
case "k8s":
bytes, err := DownloadFromK8s(srcClient, srcPath)
Expand All @@ -215,6 +251,12 @@ func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error)
return nil, err
}
buffer = bytes
case "abs":
bytes, err := DownloadFromAbs(ctx, srcClient, srcPath)
if err != nil {
return nil, err
}
buffer = bytes
default:
return nil, fmt.Errorf(srcPrefix + " not implemented")
}
Expand All @@ -224,6 +266,9 @@ func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error)

// Upload uploads a single file provided as a byte array to path
func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, buffer []byte) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

switch dstPrefix {
case "k8s":
err := UploadToK8s(dstClient, dstPath, srcPath, buffer)
Expand All @@ -235,6 +280,11 @@ func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, buffer []
if err != nil {
return err
}
case "abs":
err := UploadToAbs(ctx, dstClient, dstPath, srcPath, buffer)
if err != nil {
return err
}
default:
return fmt.Errorf(dstPrefix + " not implemented")
}
Expand Down

0 comments on commit a71c6d9

Please sign in to comment.