Skip to content

Commit

Permalink
Added upload progress bar tracker for ISO images. (#7320)
Browse files Browse the repository at this point in the history
* Added upload progress bar tracker for ISO images.

Removed concurrent upload since it doesn't make any significant performance imapact.
When I tried to measure performance differene with and without concurrent uppload,
the results were fluctuating in a wide range so no good measurement was possible.
  • Loading branch information
vburenin authored Feb 14, 2018
1 parent a6a4ff0 commit 50e1761
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 79 deletions.
144 changes: 65 additions & 79 deletions lib/install/management/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"context"
"fmt"
"path"
"sync"
"path/filepath"
"time"

"github.com/vmware/govmomi/object"
"github.com/vmware/vic/lib/config"
"github.com/vmware/vic/lib/install/data"
"github.com/vmware/vic/lib/install/opsuser"
"github.com/vmware/vic/lib/install/vchlog"
"github.com/vmware/vic/lib/progresslog"
"github.com/vmware/vic/pkg/errors"
"github.com/vmware/vic/pkg/retry"
"github.com/vmware/vic/pkg/trace"
Expand Down Expand Up @@ -120,100 +121,85 @@ func (d *Dispatcher) uploadImages(files map[string]string) error {
// upload the images
d.op.Info("Uploading images for container")

results := make(chan error, len(files))
var wg sync.WaitGroup
// Build retry config
backoffConf := retry.NewBackoffConfig()
backoffConf.InitialInterval = uploadInitialInterval
backoffConf.MaxInterval = uploadMaxInterval
backoffConf.MaxElapsedTime = uploadMaxElapsedTime

for key, image := range files {

wg.Add(1)
go func(key string, image string) {
finalMessage := ""
d.op.Infof("\t%q", image)

// upload function that is passed to retry
operationForRetry := func() error {
// attempt to delete the iso image first in case of failed upload
dc := d.session.Datacenter
fm := d.session.Datastore.NewFileManager(dc, false)
ds := d.session.Datastore

isoTargetPath := path.Join(d.vmPathName, key)
// check iso first
d.op.Debugf("target delete path = %s", isoTargetPath)
_, err := ds.Stat(d.op, isoTargetPath)
if err != nil {
switch err.(type) {
case object.DatastoreNoSuchFileError:
d.op.Debug("File not found. Nothing to do.")
case object.DatastoreNoSuchDirectoryError:
d.op.Debug("Directory not found. Nothing to do.")
default:
// otherwise force delete
err := fm.Delete(d.op, isoTargetPath)
if err != nil {
d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error())
return err
}
baseName := filepath.Base(image)
finalMessage := ""
// upload function that is passed to retry
isoTargetPath := path.Join(d.vmPathName, key)

operationForRetry := func() error {
// attempt to delete the iso image first in case of failed upload
dc := d.session.Datacenter
fm := d.session.Datastore.NewFileManager(dc, false)
ds := d.session.Datastore

// check iso first
d.op.Debugf("Checking if file already exists: %s", isoTargetPath)
_, err := ds.Stat(d.op, isoTargetPath)
if err != nil {
switch err.(type) {
case object.DatastoreNoSuchFileError:
d.op.Debug("File not found. Nothing to do.")
case object.DatastoreNoSuchDirectoryError:
d.op.Debug("Directory not found. Nothing to do.")
default:
d.op.Debugf("ISO file already exists, deleting: %s", isoTargetPath)
err := fm.Delete(d.op, isoTargetPath)
if err != nil {
d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error())
return err
}
}

return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key), nil)
}

// counter for retry decider
retryCount := uploadRetryLimit
d.op.Infof("Uploading %s as %s", baseName, key)

// decider for our retry, will retry the upload uploadRetryLimit times
uploadRetryDecider := func(err error) bool {
if err == nil {
return false
}
ul := progresslog.NewUploadLogger(d.op.Infof, baseName, time.Second*3)
// need to wait since UploadLogger is asynchronous.
defer ul.Wait()

retryCount--
if retryCount < 0 {
d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit)
return false
}
d.op.Warnf("failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount)
return true
}
return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key),
progresslog.UploadParams(ul))
}

// Build retry config
backoffConf := retry.NewBackoffConfig()
backoffConf.InitialInterval = uploadInitialInterval
backoffConf.MaxInterval = uploadMaxInterval
backoffConf.MaxElapsedTime = uploadMaxElapsedTime

uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf)
if uploadErr != nil {
finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr)
if d.force {
finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage)
finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image)
results <- errors.New(finalMessage)
} else {
results <- errors.New(finalMessage)
}
// counter for retry decider
retryCount := uploadRetryLimit

// decider for our retry, will retry the upload uploadRetryLimit times
uploadRetryDecider := func(err error) bool {
if err == nil {
return false
}
wg.Done()
}(key, image)
}

wg.Wait()
close(results)
retryCount--
if retryCount < 0 {
d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit)
return false
}
d.op.Warnf("Failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount)
return true
}

uploadFailed := false
for err := range results {
if err != nil {
d.op.Error(err)
uploadFailed = true
uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf)
if uploadErr != nil {
finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr)
if d.force {
finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage)
finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image)
}
d.op.Error(finalMessage)
return errors.New("Failed to upload iso images.")
}
}

if uploadFailed {
return errors.New("Failed to upload iso images successfully.")
}
return nil

}

// cleanupAfterCreationFailed cleans up the dangling resource pool for the failed VCH and any bridge network if there is any.
Expand Down
100 changes: 100 additions & 0 deletions lib/progresslog/progresslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2018 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progresslog

import (
"sync"
"time"

"github.com/vmware/govmomi/vim25/progress"
"github.com/vmware/govmomi/vim25/soap"
)

// UploadParams uses default upload settings as initial input and set UploadLogger as a logger.
func UploadParams(ul *UploadLogger) *soap.Upload {
params := soap.DefaultUpload
params.Progress = ul
return &params
}

// UploadLogger is used to track upload progress to ESXi/VC of a specific file.
type UploadLogger struct {
wg sync.WaitGroup
filename string
interval time.Duration
logTo func(format string, args ...interface{})
}

// NewUploadLogger returns a new instance of UploadLogger. User can provide a logger interface
// that will be used to dump output of the current upload status.
func NewUploadLogger(logTo func(format string, args ...interface{}),
filename string, progressInterval time.Duration) *UploadLogger {

return &UploadLogger{
logTo: logTo,
filename: filename,
interval: progressInterval,
}
}

// Sink returns a channel that receives current upload progress status.
// Channel has to be closed by the caller.
func (ul *UploadLogger) Sink() chan<- progress.Report {
ul.wg.Add(1)
ch := make(chan progress.Report)
fmtStr := "Uploading %s. Progress: %.2f%%"

go func() {
var curProgress float32
var lastProgress float32
ul.logTo(fmtStr, ul.filename, curProgress)

mu := sync.Mutex{}
ticker := time.NewTicker(ul.interval)

// Print progress every ul.interval seconds.
go func() {
for range ticker.C {
mu.Lock()
lastProgress = curProgress
mu.Unlock()
ul.logTo(fmtStr, ul.filename, lastProgress)
}
}()

for v := range ch {
mu.Lock()
curProgress = v.Percentage()
mu.Unlock()
}

ticker.Stop()

if lastProgress != curProgress {
ul.logTo(fmtStr, ul.filename, curProgress)
}

if curProgress == 100.0 {
ul.logTo("Uploading of %s has been completed", ul.filename)
}
ul.wg.Done()
}()
return ch
}

// Wait waits for Sink to complete its work, due to its async nature logging messages may be not sequential.
func (ul *UploadLogger) Wait() {
ul.wg.Wait()
}
93 changes: 93 additions & 0 deletions lib/progresslog/progresslog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2018 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progresslog

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/vmware/govmomi/vim25/progress"
)

type ProgressResults struct {
percentage float32
}

func (pr *ProgressResults) Percentage() float32 {
return pr.percentage
}

func (pr *ProgressResults) Detail() string {
return ""
}

func (pr *ProgressResults) Error() error {
return nil
}

var _ progress.Report = &ProgressResults{}

func TestNewUploadLoggerComplete(t *testing.T) {
var logs []string
logTo := func(format string, args ...interface{}) {
logs = append(logs, fmt.Sprintf(format, args...))
}
pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10)
progressChan := pl.Sink()
for i := 0; i <= 10; i++ {
res := &ProgressResults{percentage: float32(i * 10)}
progressChan <- res
time.Sleep(time.Duration(time.Millisecond * 5))
}
close(progressChan)
pl.Wait()

if assert.True(t, len(logs) > 3) {
last := len(logs) - 1
assert.Contains(t, logs[0], "unittest")
assert.Contains(t, logs[0], "0.00%")
assert.Contains(t, logs[1], ".00%")
assert.Contains(t, logs[last-1], "100.00%")
assert.Contains(t, logs[last], "complete")
}
}

func TestNewUploadLoggerNotComplete(t *testing.T) {
var logs []string
logTo := func(format string, args ...interface{}) {
logs = append(logs, fmt.Sprintf(format, args...))
}
pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10)
progressChan := pl.Sink()
for i := 0; i < 10; i++ {
res := &ProgressResults{percentage: float32(i * 10)}
progressChan <- res
time.Sleep(time.Duration(time.Millisecond * 5))
}
close(progressChan)
pl.Wait()

if assert.True(t, len(logs) > 3) {
last := len(logs) - 1
assert.Contains(t, logs[0], "unittest")
assert.Contains(t, logs[0], "0.00%")
assert.Contains(t, logs[1], ".00%")
assert.NotContains(t, logs[last], "100.00%")
assert.NotContains(t, logs[last], "complete")
}
}

0 comments on commit 50e1761

Please sign in to comment.