Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve build log stream #7429

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions pkg/cloud/amazon/storage/bucket_provider.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package storage

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"strings"

session2 "github.com/jenkins-x/jx/v2/pkg/cloud/amazon/session"
Expand Down Expand Up @@ -164,7 +164,7 @@ func (b *AmazonBucketProvider) UploadFileToBucket(reader io.Reader, outputName s
}

// DownloadFileFromBucket downloads a file from an S3 bucket and converts the contents to a bufio.Scanner
func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
downloader, err := b.s3ManagerDownloader()
if err != nil {
return nil, errors.Wrap(err, "there was a problem downloading from the bucket")
Expand All @@ -179,16 +179,35 @@ func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.
Key: aws.String(u.Path),
}

buf := aws.NewWriteAtBuffer([]byte{})
_, err = downloader.Download(buf, &requestInput)
f, err := ioutil.TempFile("", ".tmp-s3-download-")
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = f.Close()
_ = os.Remove(f.Name())
}
}()
_, err = downloader.Download(f, &requestInput)
if err != nil {
return nil, err
}

reader := bytes.NewReader(buf.Bytes())
scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
return scanner, nil
return &tempDownloadDestination{f}, nil
}

type tempDownloadDestination struct {
*os.File
}

func (f *tempDownloadDestination) Close() error {
err1 := f.File.Close()
err2 := os.Remove(f.File.Name())
if err2 != nil && err1 == nil {
err1 = err2
}
return err1
}

// NewAmazonBucketProvider create a new provider for AWS
Expand Down
7 changes: 6 additions & 1 deletion pkg/cloud/amazon/storage/bucket_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package storage

import (
"bufio"
"bytes"
"fmt"
"io"
Expand Down Expand Up @@ -181,13 +182,17 @@ func TestAmazonBucketProvider_DownloadFileFromBucket(t *testing.T) {
},
downloader: mockedDownloader{},
}
scanner, err := p.DownloadFileFromBucket("s3://bucket/key")
reader, err := p.DownloadFileFromBucket("s3://bucket/key")
assert.NoError(t, err)

scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
var bucketContent string
for scanner.Scan() {
bucketContent += fmt.Sprintln(scanner.Text())
}

assert.Equal(t, expectedBucketContents, bucketContent, "the returned contents should be match")
err = reader.Close()
assert.NoError(t, err, "reader.Close()")
}
37 changes: 20 additions & 17 deletions pkg/cloud/buckets/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package buckets
import (
"context"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -47,7 +47,7 @@ func KubeProviderToBucketScheme(provider string) string {

// ReadURL reads the given URL from either a http/https endpoint or a bucket URL path.
// if specified the httpFn is a function which can append the user/password or token and/or add a header with the token if using a git provider
func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string) (string, func(*http.Request), error)) ([]byte, error) {
func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string) (string, func(*http.Request), error)) (io.ReadCloser, error) {
u, err := url.Parse(urlText)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse URL %s", urlText)
Expand All @@ -68,7 +68,7 @@ func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string
}

// ReadHTTPURL reads the HTTP based URL, modifying the headers as needed, and returns the data or returning an error if a 2xx status is not returned
func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration) ([]byte, error) {
func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration) (io.ReadCloser, error) {
httpClient := util.GetClientWithTimeout(timeout)

req, err := http.NewRequest("GET", u, nil)
Expand All @@ -81,30 +81,26 @@ func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration
return nil, errors.Wrapf(err, "failed to invoke GET on %s", u)
}
stream := resp.Body
defer stream.Close()

data, err := ioutil.ReadAll(stream)
if err != nil {
return nil, errors.Wrapf(err, "failed to GET data from %s", u)
}
if resp.StatusCode >= 400 {
return data, fmt.Errorf("status %s when performing GET on %s", resp.Status, u)
_ = stream.Close()
return nil, fmt.Errorf("status %s when performing GET on %s", resp.Status, u)
}
return data, err
return stream, nil
}

// ReadBucketURL reads the content of a bucket URL of the for 's3://bucketName/foo/bar/whatnot.txt?param=123'
// where any of the query arguments are applied to the underlying Bucket URL and the path is extracted and resolved
// within the bucket
func ReadBucketURL(u *url.URL, timeout time.Duration) ([]byte, error) {
func ReadBucketURL(u *url.URL, timeout time.Duration) (io.ReadCloser, error) {
bucketURL, key := SplitBucketURL(u)

ctx, _ := context.WithTimeout(context.Background(), timeout)
bucket, err := blob.Open(ctx, bucketURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to open bucket %s", bucketURL)
}
data, err := bucket.ReadAll(ctx, key)
data, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return data, errors.Wrapf(err, "failed to read key %s in bucket %s", key, bucketURL)
}
Expand All @@ -113,24 +109,31 @@ func ReadBucketURL(u *url.URL, timeout time.Duration) ([]byte, error) {

// WriteBucketURL writes the data to a bucket URL of the for 's3://bucketName/foo/bar/whatnot.txt?param=123'
// with the given timeout
func WriteBucketURL(u *url.URL, data []byte, timeout time.Duration) error {
func WriteBucketURL(u *url.URL, data io.Reader, timeout time.Duration) error {
bucketURL, key := SplitBucketURL(u)
return WriteBucket(bucketURL, key, data, timeout)
}

// WriteBucket writes the data to a bucket URL and key of the for 's3://bucketName' and key 'foo/bar/whatnot.txt'
// with the given timeout
func WriteBucket(bucketURL string, key string, data []byte, timeout time.Duration) error {
func WriteBucket(bucketURL string, key string, data io.Reader, timeout time.Duration) (err error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
bucket, err := blob.Open(ctx, bucketURL)
if err != nil {
return errors.Wrapf(err, "failed to open bucket %s", bucketURL)
}
err = bucket.WriteAll(ctx, key, data, nil)
w, err := bucket.NewWriter(ctx, key, nil)
if err != nil {
return errors.Wrapf(err, "failed to write key %s in bucket %s", key, bucketURL)
return errors.Wrapf(err, "failed to create key %s in bucket %s", key, bucketURL)
}
return nil
defer func() {
if e := w.Close(); e != nil && err == nil {
err = e
}
err = errors.Wrapf(err, "failed to write key %s in bucket %s", key, bucketURL)
}()
_, err = io.Copy(w, data)
return
}

// SplitBucketURL splits the full bucket URL into the URL to open the bucket and the file name to refer to
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/buckets/default_provider.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package buckets

import (
"bufio"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -38,7 +37,7 @@ func (LegacyBucketProvider) EnsureBucketIsCreated(bucketURL string) error {
}

// DownloadFileFromBucket is not supported for LegacyBucketProvider
func (LegacyBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (LegacyBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
return nil, fmt.Errorf("DownloadFileFromBucket not implemented for LegacyBucketProvider")
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/buckets/interface.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package buckets

import (
"bufio"
"io"
)

Expand All @@ -12,5 +11,5 @@ type Provider interface {
CreateNewBucketForCluster(clusterName string, bucketKind string) (string, error)
EnsureBucketIsCreated(bucketURL string) error
UploadFileToBucket(r io.Reader, outputName string, bucketURL string) (string, error)
DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error)
DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error)
}
9 changes: 4 additions & 5 deletions pkg/cloud/buckets/mocks/buckets_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions pkg/cloud/gke/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"

"time"

osUser "os/user"
Expand Down Expand Up @@ -555,7 +555,7 @@ func (g *GCloud) DeleteAllObjectsInBucket(bucketName string) error {

// StreamTransferFileFromBucket will perform a stream transfer from the GCS bucket to stdout and return a scanner
// with the piped result
func StreamTransferFileFromBucket(fullBucketURL string) (*bufio.Scanner, error) {
func StreamTransferFileFromBucket(fullBucketURL string) (io.ReadCloser, error) {
bucketAccessible, err := isBucketAccessible(fullBucketURL)
if !bucketAccessible || err != nil {
return nil, errors.Wrap(err, "can't access bucket")
Expand All @@ -571,9 +571,7 @@ func StreamTransferFileFromBucket(fullBucketURL string) (*bufio.Scanner, error)
if err != nil {
return nil, errors.Wrap(err, "error streaming the logs from bucket")
}
scanner := bufio.NewScanner(stdout)
scanner.Split(bufio.ScanLines)
return scanner, nil
return stdout, nil
}

func isBucketAccessible(bucketURL string) (bool, error) {
Expand Down
13 changes: 3 additions & 10 deletions pkg/cloud/gke/storage/bucket_provider.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package storage

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -80,18 +78,13 @@ func (b *GKEBucketProvider) EnsureBucketIsCreated(bucketURL string) error {

// UploadFileToBucket uploads a file to the provided GCS bucket with the provided outputName
func (b *GKEBucketProvider) UploadFileToBucket(reader io.Reader, key string, bucketURL string) (string, error) {
data, err := ioutil.ReadAll(reader)
if err != nil {
return "", err
}

log.Logger().Debugf("Uploading %d bytes to bucket %s with key %s", len(data), bucketURL, key)
err = buckets.WriteBucket(bucketURL, key, data, defaultBucketWriteTimeout)
log.Logger().Debugf("Uploading to bucket %s with key %s", bucketURL, key)
err := buckets.WriteBucket(bucketURL, key, reader, defaultBucketWriteTimeout)
return bucketURL + "/" + key, err
}

// DownloadFileFromBucket downloads a file from GCS from the given bucketURL and server its contents with a bufio.Scanner
func (b *GKEBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (b *GKEBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
return gke.StreamTransferFileFromBucket(bucketURL)
}

Expand Down
Loading