From a71c6d9bbb8ef682e91c8dc3c9c7e0e796b94a2d Mon Sep 17 00:00:00 2001 From: Oleg Ivanov Date: Sun, 11 Nov 2018 22:58:28 +0200 Subject: [PATCH] Added a support of the azure blob storage --- glide.yaml | 9 ++- pkg/skbn/abs.go | 184 +++++++++++++++++++++++++++++++++++++++++++++++ pkg/skbn/skbn.go | 50 +++++++++++++ 3 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 pkg/skbn/abs.go diff --git a/glide.yaml b/glide.yaml index a34abaa..8f66d21 100644 --- a/glide.yaml +++ b/glide.yaml @@ -6,6 +6,14 @@ import: - aws - aws/session - service/s3 +- package: github.com/Azure/azure-pipeline-go + version: "v0.1.8" + subpackages: + - pipeline +- package: github.com/Azure/azure-storage-blob-go + version: "0.3.0" + subpackages: + - azblob - package: github.com/spf13/cobra version: v0.0.3 - package: k8s.io/api @@ -22,4 +30,3 @@ import: - kubernetes - rest - tools/clientcmd - diff --git a/pkg/skbn/abs.go b/pkg/skbn/abs.go new file mode 100644 index 0000000..b7371a1 --- /dev/null +++ b/pkg/skbn/abs.go @@ -0,0 +1,184 @@ +package skbn + +import ( + "bytes" + "context" + "fmt" + "log" + "net/url" + "os" + "path/filepath" + "strings" + + "github.com/Azure/azure-pipeline-go/pipeline" + "github.com/Azure/azure-storage-blob-go/azblob" +) + +var err error + +func validateAbsPath(pathSplit []string) error { + if len(pathSplit) >= 1 { + return nil + } + return fmt.Errorf("illegal path: %s", filepath.Join(pathSplit...)) +} + +func initAbsVariables(split []string) (string, string, string) { + account := split[0] + container := split[1] + path := filepath.Join(split[2:]...) + + return account, container, path +} + +func getNewPipeline() (pipeline.Pipeline, error) { + accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") + + if len(accountName) == 0 || len(accountKey) == 0 { + log.Fatal("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") + } + + credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) + + if err != nil { + log.Fatal("Invalid credentials with error: " + err.Error()) + } + pl := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + + return pl, err +} + +func getServiceURL(pl pipeline.Pipeline, accountName string) (azblob.ServiceURL, error) { + URL, err := url.Parse( + fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)) + + surl := azblob.NewServiceURL(*URL, pl) + return surl, err +} + +func getContainerURL(pl pipeline.Pipeline, accountName string, containerName string) (azblob.ContainerURL, error) { + URL, err := url.Parse( + fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName)) + + curl := azblob.NewContainerURL(*URL, pl) + return curl, err +} + +func getBlobURL(curl azblob.ContainerURL, blob string) (azblob.BlockBlobURL, error) { + return curl.NewBlockBlobURL(blob), err +} + +func createContainer(ctx context.Context, pl pipeline.Pipeline, curl azblob.ContainerURL) (*azblob.ContainerCreateResponse, error) { + cr, err := curl.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) + return cr, err +} + +func listContainers(ctx context.Context, surl azblob.ServiceURL) ([]azblob.ContainerItem, error) { + lc, err := surl.ListContainersSegment(ctx, azblob.Marker{}, azblob.ListContainersSegmentOptions{}) + return lc.ContainerItems, err +} + +func containerExists(list []azblob.ContainerItem, containerName string) bool { + exists := false + for _, v := range list { + if containerName == v.Name { + exists = true + } + } + return exists +} + +// GetClientToAbs checks the connection to azure blob storage and returns the tested client (pipeline) +func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error) { + pSplit := strings.Split(path, "/") + a, c, _ := initAbsVariables(pSplit) + pl, err := getNewPipeline() + su, err := getServiceURL(pl, a) + lc, err := listContainers(ctx, su) + cu, err := getContainerURL(pl, a, c) + + if !containerExists(lc, c) { + _, err := createContainer(ctx, pl, cu) + + if err != nil { + return nil, err + } + } + + return pl, err +} + +// GetListOfFilesFromAbs gets list of files in path from azure blob storage (recursive) +func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string) ([]string, error) { + pSplit := strings.Split(path, "/") + + if err := validateAbsPath(pSplit); err != nil { + return nil, err + } + + a, c, p := initAbsVariables(pSplit) + pl := iClient.(pipeline.Pipeline) + cu, err := getContainerURL(pl, a, c) + bl := []string{} + + for marker := (azblob.Marker{}); marker.NotDone(); { + listBlob, err := cu.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{}) + + if err != nil { + return nil, err + } + + marker = listBlob.NextMarker + + for _, blobInfo := range listBlob.Segment.BlobItems { + bl = append(bl, strings.Replace(blobInfo.Name, p, "", 1)) + } + } + + return bl, err +} + +// DownloadFromAbs downloads a single file from azure blob storage +func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]byte, error) { + pSplit := strings.Split(path, "/") + + if err := validateAbsPath(pSplit); err != nil { + return nil, err + } + + a, c, p := initAbsVariables(pSplit) + pl := iClient.(pipeline.Pipeline) + cu, err := getContainerURL(pl, a, c) + bu, err := getBlobURL(cu, p) + dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) + bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}) + dd := bytes.Buffer{} + _, err = dd.ReadFrom(bs) + + return dd.Bytes(), err +} + +// UploadToAbs uploads a single file to azure blob storage +func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, buffer []byte) error { + pSplit := strings.Split(toPath, "/") + + if err := validateAbsPath(pSplit); err != nil { + return err + } + + if len(pSplit) == 1 { + _, fn := filepath.Split(fromPath) + pSplit = append(pSplit, fn) + } + + a, c, p := initAbsVariables(pSplit) + pl := iClient.(pipeline.Pipeline) + cu, err := getContainerURL(pl, a, c) + bu, err := getBlobURL(cu, p) + + _, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{ + BlockSize: 4 * 1024 * 1024, + Parallelism: 16}) + + return err +} diff --git a/pkg/skbn/skbn.go b/pkg/skbn/skbn.go index 354896c..1602da9 100644 --- a/pkg/skbn/skbn.go +++ b/pkg/skbn/skbn.go @@ -1,6 +1,7 @@ package skbn import ( + "context" "fmt" "log" "math" @@ -45,6 +46,7 @@ func TestImplementationsExist(srcPrefix, dstPrefix string) error { switch srcPrefix { case "k8s": case "s3": + case "abs": default: return fmt.Errorf(srcPrefix + " not implemented") } @@ -52,6 +54,7 @@ func TestImplementationsExist(srcPrefix, dstPrefix string) error { switch dstPrefix { case "k8s": case "s3": + case "abs": default: return fmt.Errorf(dstPrefix + " not implemented") } @@ -64,8 +67,12 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int var srcClient interface{} var dstClient interface{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + k8sTested := false s3Tested := false + absTested := false switch srcPrefix { case "k8s": @@ -82,6 +89,13 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int } srcClient = client s3Tested = true + case "abs": + client, err := GetClientToAbs(ctx, srcPath) + if err != nil { + return nil, nil, err + } + srcClient = client + absTested = true default: return nil, nil, fmt.Errorf(srcPrefix + " not implemented") } @@ -107,6 +121,16 @@ func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, int return nil, nil, err } dstClient = client + case "abs": + if absTested { + dstClient = srcClient + break + } + client, err := GetClientToAbs(ctx, dstPath) + if err != nil { + return nil, nil, err + } + dstClient = client default: return nil, nil, fmt.Errorf(srcPrefix + " not implemented") } @@ -178,6 +202,9 @@ func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string, func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) { var relativePaths []string + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + switch prefix { case "k8s": paths, err := GetListOfFilesFromK8s(client, path, "f", "*") @@ -191,6 +218,12 @@ func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) { return nil, err } relativePaths = paths + case "abs": + paths, err := GetListOfFilesFromAbs(ctx, client, path) + if err != nil { + return nil, err + } + relativePaths = paths default: return nil, fmt.Errorf(prefix + " not implemented") } @@ -202,6 +235,9 @@ func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) { func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error) { var buffer []byte + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + switch srcPrefix { case "k8s": bytes, err := DownloadFromK8s(srcClient, srcPath) @@ -215,6 +251,12 @@ func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error) return nil, err } buffer = bytes + case "abs": + bytes, err := DownloadFromAbs(ctx, srcClient, srcPath) + if err != nil { + return nil, err + } + buffer = bytes default: return nil, fmt.Errorf(srcPrefix + " not implemented") } @@ -224,6 +266,9 @@ func Download(srcClient interface{}, srcPrefix, srcPath string) ([]byte, error) // Upload uploads a single file provided as a byte array to path func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, buffer []byte) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + switch dstPrefix { case "k8s": err := UploadToK8s(dstClient, dstPath, srcPath, buffer) @@ -235,6 +280,11 @@ func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, buffer [] if err != nil { return err } + case "abs": + err := UploadToAbs(ctx, dstClient, dstPath, srcPath, buffer) + if err != nil { + return err + } default: return fmt.Errorf(dstPrefix + " not implemented") }