Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added upload progress bar tracker for ISO images. #7320

Merged
merged 14 commits into from
Feb 14, 2018
144 changes: 65 additions & 79 deletions lib/install/management/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"context"
"fmt"
"path"
"sync"
"path/filepath"
"time"

"github.com/vmware/govmomi/object"
"github.com/vmware/vic/lib/config"
"github.com/vmware/vic/lib/install/data"
"github.com/vmware/vic/lib/install/opsuser"
"github.com/vmware/vic/lib/install/vchlog"
"github.com/vmware/vic/lib/progresslog"
"github.com/vmware/vic/pkg/errors"
"github.com/vmware/vic/pkg/retry"
"github.com/vmware/vic/pkg/trace"
Expand Down Expand Up @@ -120,100 +121,85 @@ func (d *Dispatcher) uploadImages(files map[string]string) error {
// upload the images
d.op.Info("Uploading images for container")

results := make(chan error, len(files))
var wg sync.WaitGroup
// Build retry config
backoffConf := retry.NewBackoffConfig()
backoffConf.InitialInterval = uploadInitialInterval
backoffConf.MaxInterval = uploadMaxInterval
backoffConf.MaxElapsedTime = uploadMaxElapsedTime

for key, image := range files {

wg.Add(1)
go func(key string, image string) {
finalMessage := ""
d.op.Infof("\t%q", image)

// upload function that is passed to retry
operationForRetry := func() error {
// attempt to delete the iso image first in case of failed upload
dc := d.session.Datacenter
fm := d.session.Datastore.NewFileManager(dc, false)
ds := d.session.Datastore

isoTargetPath := path.Join(d.vmPathName, key)
// check iso first
d.op.Debugf("target delete path = %s", isoTargetPath)
_, err := ds.Stat(d.op, isoTargetPath)
if err != nil {
switch err.(type) {
case object.DatastoreNoSuchFileError:
d.op.Debug("File not found. Nothing to do.")
case object.DatastoreNoSuchDirectoryError:
d.op.Debug("Directory not found. Nothing to do.")
default:
// otherwise force delete
err := fm.Delete(d.op, isoTargetPath)
if err != nil {
d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error())
return err
}
baseName := filepath.Base(image)
finalMessage := ""
// upload function that is passed to retry
isoTargetPath := path.Join(d.vmPathName, key)

operationForRetry := func() error {
// attempt to delete the iso image first in case of failed upload
dc := d.session.Datacenter
fm := d.session.Datastore.NewFileManager(dc, false)
ds := d.session.Datastore

// check iso first
d.op.Debugf("Checking if file already exists: %s", isoTargetPath)
_, err := ds.Stat(d.op, isoTargetPath)
if err != nil {
switch err.(type) {
case object.DatastoreNoSuchFileError:
d.op.Debug("File not found. Nothing to do.")
case object.DatastoreNoSuchDirectoryError:
d.op.Debug("Directory not found. Nothing to do.")
default:
d.op.Debugf("ISO file already exists, deleting: %s", isoTargetPath)
err := fm.Delete(d.op, isoTargetPath)
if err != nil {
d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error())
return err
}
}

return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key), nil)
}

// counter for retry decider
retryCount := uploadRetryLimit
d.op.Infof("Uploading %s as %s", baseName, key)

// decider for our retry, will retry the upload uploadRetryLimit times
uploadRetryDecider := func(err error) bool {
if err == nil {
return false
}
ul := progresslog.NewUploadLogger(d.op.Infof, baseName, time.Second*3)
// need to wait since UploadLogger is asynchronous.
defer ul.Wait()

retryCount--
if retryCount < 0 {
d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit)
return false
}
d.op.Warnf("failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount)
return true
}
return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key),
progresslog.UploadParams(ul))
}

// Build retry config
backoffConf := retry.NewBackoffConfig()
backoffConf.InitialInterval = uploadInitialInterval
backoffConf.MaxInterval = uploadMaxInterval
backoffConf.MaxElapsedTime = uploadMaxElapsedTime

uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf)
if uploadErr != nil {
finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr)
if d.force {
finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage)
finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image)
results <- errors.New(finalMessage)
} else {
results <- errors.New(finalMessage)
}
// counter for retry decider
retryCount := uploadRetryLimit

// decider for our retry, will retry the upload uploadRetryLimit times
uploadRetryDecider := func(err error) bool {
if err == nil {
return false
}
wg.Done()
}(key, image)
}

wg.Wait()
close(results)
retryCount--
if retryCount < 0 {
d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit)
return false
}
d.op.Warnf("Failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount)
return true
}

uploadFailed := false
for err := range results {
if err != nil {
d.op.Error(err)
uploadFailed = true
uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf)
if uploadErr != nil {
finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr)
if d.force {
finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage)
finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image)
}
d.op.Error(finalMessage)
return errors.New("Failed to upload iso images.")
}
}

if uploadFailed {
return errors.New("Failed to upload iso images successfully.")
}
return nil

}

// cleanupAfterCreationFailed cleans up the dangling resource pool for the failed VCH and any bridge network if there is any.
Expand Down
100 changes: 100 additions & 0 deletions lib/progresslog/progresslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2018 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progresslog

import (
"sync"
"time"

"github.com/vmware/govmomi/vim25/progress"
"github.com/vmware/govmomi/vim25/soap"
)

// UploadParams uses default upload settings as initial input and set UploadLogger as a logger.
func UploadParams(ul *UploadLogger) *soap.Upload {
params := soap.DefaultUpload
params.Progress = ul
return &params
}

// UploadLogger is used to track upload progress to ESXi/VC of a specific file.
type UploadLogger struct {
wg sync.WaitGroup
filename string
interval time.Duration
logTo func(format string, args ...interface{})
}

// NewUploadLogger returns a new instance of UploadLogger. User can provide a logger interface
// that will be used to dump output of the current upload status.
func NewUploadLogger(logTo func(format string, args ...interface{}),
filename string, progressInterval time.Duration) *UploadLogger {

return &UploadLogger{
logTo: logTo,
filename: filename,
interval: progressInterval,
}
}

// Sink returns a channel that receives current upload progress status.
// Channel has to be closed by the caller.
func (ul *UploadLogger) Sink() chan<- progress.Report {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there multiple calls to this function per download?

Copy link
Contributor

@matthewavery matthewavery Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I missed the range ticker.C :)

ul.wg.Add(1)
ch := make(chan progress.Report)
fmtStr := "Uploading %s. Progress: %.2f%%"

go func() {
var curProgress float32
var lastProgress float32
ul.logTo(fmtStr, ul.filename, curProgress)

mu := sync.Mutex{}
ticker := time.NewTicker(ul.interval)

// Print progress every ul.interval seconds.
go func() {
for range ticker.C {
mu.Lock()
lastProgress = curProgress
mu.Unlock()
ul.logTo(fmtStr, ul.filename, lastProgress)
}
}()

for v := range ch {
mu.Lock()
curProgress = v.Percentage()
mu.Unlock()
}

ticker.Stop()

if lastProgress != curProgress {
ul.logTo(fmtStr, ul.filename, curProgress)
}

if curProgress == 100.0 {
ul.logTo("Uploading of %s has been completed", ul.filename)
}
ul.wg.Done()
}()
return ch
}

// Wait waits for Sink to complete its work, due to its async nature logging messages may be not sequential.
func (ul *UploadLogger) Wait() {
ul.wg.Wait()
}
93 changes: 93 additions & 0 deletions lib/progresslog/progresslog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2018 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progresslog

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/vmware/govmomi/vim25/progress"
)

type ProgressResults struct {
percentage float32
}

func (pr *ProgressResults) Percentage() float32 {
return pr.percentage
}

func (pr *ProgressResults) Detail() string {
return ""
}

func (pr *ProgressResults) Error() error {
return nil
}

var _ progress.Report = &ProgressResults{}

func TestNewUploadLoggerComplete(t *testing.T) {
var logs []string
logTo := func(format string, args ...interface{}) {
logs = append(logs, fmt.Sprintf(format, args...))
}
pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10)
progressChan := pl.Sink()
for i := 0; i <= 10; i++ {
res := &ProgressResults{percentage: float32(i * 10)}
progressChan <- res
time.Sleep(time.Duration(time.Millisecond * 5))
}
close(progressChan)
pl.Wait()

if assert.True(t, len(logs) > 3) {
last := len(logs) - 1
assert.Contains(t, logs[0], "unittest")
assert.Contains(t, logs[0], "0.00%")
assert.Contains(t, logs[1], ".00%")
assert.Contains(t, logs[last-1], "100.00%")
assert.Contains(t, logs[last], "complete")
}
}

func TestNewUploadLoggerNotComplete(t *testing.T) {
var logs []string
logTo := func(format string, args ...interface{}) {
logs = append(logs, fmt.Sprintf(format, args...))
}
pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10)
progressChan := pl.Sink()
for i := 0; i < 10; i++ {
res := &ProgressResults{percentage: float32(i * 10)}
progressChan <- res
time.Sleep(time.Duration(time.Millisecond * 5))
}
close(progressChan)
pl.Wait()

if assert.True(t, len(logs) > 3) {
last := len(logs) - 1
assert.Contains(t, logs[0], "unittest")
assert.Contains(t, logs[0], "0.00%")
assert.Contains(t, logs[1], ".00%")
assert.NotContains(t, logs[last], "100.00%")
assert.NotContains(t, logs[last], "complete")
}
}