Skip to content

Commit

Permalink
refactor(build logs): improve build logging
Browse files Browse the repository at this point in the history
improvements:
* let build log methods return channel to let the caller
  handle streamed log lines within its own goroutine.
* stream build log to long-term storage.
* change storage provider interface to stream data.

Closes #7422

Signed-off-by: Max Goltzsche <mgoltzsche@cloudbees.com>
  • Loading branch information
mgoltzsche committed Jul 9, 2020
1 parent 70516f3 commit e13cc93
Show file tree
Hide file tree
Showing 18 changed files with 402 additions and 627 deletions.
37 changes: 28 additions & 9 deletions pkg/cloud/amazon/storage/bucket_provider.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package storage

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"strings"

session2 "github.com/jenkins-x/jx/v2/pkg/cloud/amazon/session"
Expand Down Expand Up @@ -164,7 +164,7 @@ func (b *AmazonBucketProvider) UploadFileToBucket(reader io.Reader, outputName s
}

// DownloadFileFromBucket downloads a file from an S3 bucket and converts the contents to a bufio.Scanner
func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
downloader, err := b.s3ManagerDownloader()
if err != nil {
return nil, errors.Wrap(err, "there was a problem downloading from the bucket")
Expand All @@ -179,16 +179,35 @@ func (b *AmazonBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.
Key: aws.String(u.Path),
}

buf := aws.NewWriteAtBuffer([]byte{})
_, err = downloader.Download(buf, &requestInput)
f, err := ioutil.TempFile("", ".tmp-s3-download-")
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = f.Close()
_ = os.Remove(f.Name())
}
}()
_, err = downloader.Download(f, &requestInput)
if err != nil {
return nil, err
}

reader := bytes.NewReader(buf.Bytes())
scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
return scanner, nil
return &tempDownloadDestination{f}, nil
}

type tempDownloadDestination struct {
*os.File
}

func (f *tempDownloadDestination) Close() error {
err1 := f.File.Close()
err2 := os.Remove(f.File.Name())
if err2 != nil && err1 == nil {
err1 = err2
}
return err1
}

// NewAmazonBucketProvider create a new provider for AWS
Expand Down
7 changes: 6 additions & 1 deletion pkg/cloud/amazon/storage/bucket_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package storage

import (
"bufio"
"bytes"
"fmt"
"io"
Expand Down Expand Up @@ -181,13 +182,17 @@ func TestAmazonBucketProvider_DownloadFileFromBucket(t *testing.T) {
},
downloader: mockedDownloader{},
}
scanner, err := p.DownloadFileFromBucket("s3://bucket/key")
reader, err := p.DownloadFileFromBucket("s3://bucket/key")
assert.NoError(t, err)

scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
var bucketContent string
for scanner.Scan() {
bucketContent += fmt.Sprintln(scanner.Text())
}

assert.Equal(t, expectedBucketContents, bucketContent, "the returned contents should be match")
err = reader.Close()
assert.NoError(t, err, "reader.Close()")
}
37 changes: 20 additions & 17 deletions pkg/cloud/buckets/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package buckets
import (
"context"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -47,7 +47,7 @@ func KubeProviderToBucketScheme(provider string) string {

// ReadURL reads the given URL from either a http/https endpoint or a bucket URL path.
// if specified the httpFn is a function which can append the user/password or token and/or add a header with the token if using a git provider
func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string) (string, func(*http.Request), error)) ([]byte, error) {
func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string) (string, func(*http.Request), error)) (io.ReadCloser, error) {
u, err := url.Parse(urlText)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse URL %s", urlText)
Expand All @@ -68,7 +68,7 @@ func ReadURL(urlText string, timeout time.Duration, httpFn func(urlString string
}

// ReadHTTPURL reads the HTTP based URL, modifying the headers as needed, and returns the data or returning an error if a 2xx status is not returned
func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration) ([]byte, error) {
func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration) (io.ReadCloser, error) {
httpClient := util.GetClientWithTimeout(timeout)

req, err := http.NewRequest("GET", u, nil)
Expand All @@ -81,30 +81,26 @@ func ReadHTTPURL(u string, headerFunc func(*http.Request), timeout time.Duration
return nil, errors.Wrapf(err, "failed to invoke GET on %s", u)
}
stream := resp.Body
defer stream.Close()

data, err := ioutil.ReadAll(stream)
if err != nil {
return nil, errors.Wrapf(err, "failed to GET data from %s", u)
}
if resp.StatusCode >= 400 {
return data, fmt.Errorf("status %s when performing GET on %s", resp.Status, u)
_ = stream.Close()
return nil, fmt.Errorf("status %s when performing GET on %s", resp.Status, u)
}
return data, err
return stream, nil
}

// ReadBucketURL reads the content of a bucket URL of the for 's3://bucketName/foo/bar/whatnot.txt?param=123'
// where any of the query arguments are applied to the underlying Bucket URL and the path is extracted and resolved
// within the bucket
func ReadBucketURL(u *url.URL, timeout time.Duration) ([]byte, error) {
func ReadBucketURL(u *url.URL, timeout time.Duration) (io.ReadCloser, error) {
bucketURL, key := SplitBucketURL(u)

ctx, _ := context.WithTimeout(context.Background(), timeout)
bucket, err := blob.Open(ctx, bucketURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to open bucket %s", bucketURL)
}
data, err := bucket.ReadAll(ctx, key)
data, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return data, errors.Wrapf(err, "failed to read key %s in bucket %s", key, bucketURL)
}
Expand All @@ -113,24 +109,31 @@ func ReadBucketURL(u *url.URL, timeout time.Duration) ([]byte, error) {

// WriteBucketURL writes the data to a bucket URL of the for 's3://bucketName/foo/bar/whatnot.txt?param=123'
// with the given timeout
func WriteBucketURL(u *url.URL, data []byte, timeout time.Duration) error {
func WriteBucketURL(u *url.URL, data io.Reader, timeout time.Duration) error {
bucketURL, key := SplitBucketURL(u)
return WriteBucket(bucketURL, key, data, timeout)
}

// WriteBucket writes the data to a bucket URL and key of the for 's3://bucketName' and key 'foo/bar/whatnot.txt'
// with the given timeout
func WriteBucket(bucketURL string, key string, data []byte, timeout time.Duration) error {
func WriteBucket(bucketURL string, key string, data io.Reader, timeout time.Duration) (err error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
bucket, err := blob.Open(ctx, bucketURL)
if err != nil {
return errors.Wrapf(err, "failed to open bucket %s", bucketURL)
}
err = bucket.WriteAll(ctx, key, data, nil)
w, err := bucket.NewWriter(ctx, key, nil)
if err != nil {
return errors.Wrapf(err, "failed to write key %s in bucket %s", key, bucketURL)
return errors.Wrapf(err, "failed to create key %s in bucket %s", key, bucketURL)
}
return nil
defer func() {
if e := w.Close(); e != nil && err == nil {
err = e
}
err = errors.Wrapf(err, "failed to write key %s in bucket %s", key, bucketURL)
}()
_, err = io.Copy(w, data)
return
}

// SplitBucketURL splits the full bucket URL into the URL to open the bucket and the file name to refer to
Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/buckets/default_provider.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package buckets

import (
"bufio"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -38,7 +37,7 @@ func (LegacyBucketProvider) EnsureBucketIsCreated(bucketURL string) error {
}

// DownloadFileFromBucket is not supported for LegacyBucketProvider
func (LegacyBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (LegacyBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
return nil, fmt.Errorf("DownloadFileFromBucket not implemented for LegacyBucketProvider")
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/cloud/buckets/interface.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package buckets

import (
"bufio"
"io"
)

Expand All @@ -12,5 +11,5 @@ type Provider interface {
CreateNewBucketForCluster(clusterName string, bucketKind string) (string, error)
EnsureBucketIsCreated(bucketURL string) error
UploadFileToBucket(r io.Reader, outputName string, bucketURL string) (string, error)
DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error)
DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error)
}
9 changes: 4 additions & 5 deletions pkg/cloud/buckets/mocks/buckets_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions pkg/cloud/gke/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"

"time"

osUser "os/user"
Expand Down Expand Up @@ -555,7 +555,7 @@ func (g *GCloud) DeleteAllObjectsInBucket(bucketName string) error {

// StreamTransferFileFromBucket will perform a stream transfer from the GCS bucket to stdout and return a scanner
// with the piped result
func StreamTransferFileFromBucket(fullBucketURL string) (*bufio.Scanner, error) {
func StreamTransferFileFromBucket(fullBucketURL string) (io.ReadCloser, error) {
bucketAccessible, err := isBucketAccessible(fullBucketURL)
if !bucketAccessible || err != nil {
return nil, errors.Wrap(err, "can't access bucket")
Expand All @@ -571,9 +571,7 @@ func StreamTransferFileFromBucket(fullBucketURL string) (*bufio.Scanner, error)
if err != nil {
return nil, errors.Wrap(err, "error streaming the logs from bucket")
}
scanner := bufio.NewScanner(stdout)
scanner.Split(bufio.ScanLines)
return scanner, nil
return stdout, nil
}

func isBucketAccessible(bucketURL string) (bool, error) {
Expand Down
13 changes: 3 additions & 10 deletions pkg/cloud/gke/storage/bucket_provider.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package storage

import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -80,18 +78,13 @@ func (b *GKEBucketProvider) EnsureBucketIsCreated(bucketURL string) error {

// UploadFileToBucket uploads a file to the provided GCS bucket with the provided outputName
func (b *GKEBucketProvider) UploadFileToBucket(reader io.Reader, key string, bucketURL string) (string, error) {
data, err := ioutil.ReadAll(reader)
if err != nil {
return "", err
}

log.Logger().Debugf("Uploading %d bytes to bucket %s with key %s", len(data), bucketURL, key)
err = buckets.WriteBucket(bucketURL, key, data, defaultBucketWriteTimeout)
log.Logger().Debugf("Uploading to bucket %s with key %s", bucketURL, key)
err := buckets.WriteBucket(bucketURL, key, reader, defaultBucketWriteTimeout)
return bucketURL + "/" + key, err
}

// DownloadFileFromBucket downloads a file from GCS from the given bucketURL and server its contents with a bufio.Scanner
func (b *GKEBucketProvider) DownloadFileFromBucket(bucketURL string) (*bufio.Scanner, error) {
func (b *GKEBucketProvider) DownloadFileFromBucket(bucketURL string) (io.ReadCloser, error) {
return gke.StreamTransferFileFromBucket(bucketURL)
}

Expand Down
Loading

0 comments on commit e13cc93

Please sign in to comment.