diff --git a/lib/install/management/create.go b/lib/install/management/create.go index 0a9b7b1099..a85866d4ff 100644 --- a/lib/install/management/create.go +++ b/lib/install/management/create.go @@ -18,7 +18,7 @@ import ( "context" "fmt" "path" - "sync" + "path/filepath" "time" "github.com/vmware/govmomi/object" @@ -26,6 +26,7 @@ import ( "github.com/vmware/vic/lib/install/data" "github.com/vmware/vic/lib/install/opsuser" "github.com/vmware/vic/lib/install/vchlog" + "github.com/vmware/vic/lib/progresslog" "github.com/vmware/vic/pkg/errors" "github.com/vmware/vic/pkg/retry" "github.com/vmware/vic/pkg/trace" @@ -120,100 +121,85 @@ func (d *Dispatcher) uploadImages(files map[string]string) error { // upload the images d.op.Info("Uploading images for container") - results := make(chan error, len(files)) - var wg sync.WaitGroup + // Build retry config + backoffConf := retry.NewBackoffConfig() + backoffConf.InitialInterval = uploadInitialInterval + backoffConf.MaxInterval = uploadMaxInterval + backoffConf.MaxElapsedTime = uploadMaxElapsedTime for key, image := range files { - - wg.Add(1) - go func(key string, image string) { - finalMessage := "" - d.op.Infof("\t%q", image) - - // upload function that is passed to retry - operationForRetry := func() error { - // attempt to delete the iso image first in case of failed upload - dc := d.session.Datacenter - fm := d.session.Datastore.NewFileManager(dc, false) - ds := d.session.Datastore - - isoTargetPath := path.Join(d.vmPathName, key) - // check iso first - d.op.Debugf("target delete path = %s", isoTargetPath) - _, err := ds.Stat(d.op, isoTargetPath) - if err != nil { - switch err.(type) { - case object.DatastoreNoSuchFileError: - d.op.Debug("File not found. Nothing to do.") - case object.DatastoreNoSuchDirectoryError: - d.op.Debug("Directory not found. Nothing to do.") - default: - // otherwise force delete - err := fm.Delete(d.op, isoTargetPath) - if err != nil { - d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error()) - return err - } + baseName := filepath.Base(image) + finalMessage := "" + // upload function that is passed to retry + isoTargetPath := path.Join(d.vmPathName, key) + + operationForRetry := func() error { + // attempt to delete the iso image first in case of failed upload + dc := d.session.Datacenter + fm := d.session.Datastore.NewFileManager(dc, false) + ds := d.session.Datastore + + // check iso first + d.op.Debugf("Checking if file already exists: %s", isoTargetPath) + _, err := ds.Stat(d.op, isoTargetPath) + if err != nil { + switch err.(type) { + case object.DatastoreNoSuchFileError: + d.op.Debug("File not found. Nothing to do.") + case object.DatastoreNoSuchDirectoryError: + d.op.Debug("Directory not found. Nothing to do.") + default: + d.op.Debugf("ISO file already exists, deleting: %s", isoTargetPath) + err := fm.Delete(d.op, isoTargetPath) + if err != nil { + d.op.Debugf("Failed to delete image (%s) with error (%s)", image, err.Error()) + return err } } - - return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key), nil) } - // counter for retry decider - retryCount := uploadRetryLimit + d.op.Infof("Uploading %s as %s", baseName, key) - // decider for our retry, will retry the upload uploadRetryLimit times - uploadRetryDecider := func(err error) bool { - if err == nil { - return false - } + ul := progresslog.NewUploadLogger(d.op.Infof, baseName, time.Second*3) + // need to wait since UploadLogger is asynchronous. + defer ul.Wait() - retryCount-- - if retryCount < 0 { - d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit) - return false - } - d.op.Warnf("failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount) - return true - } + return d.session.Datastore.UploadFile(d.op, image, path.Join(d.vmPathName, key), + progresslog.UploadParams(ul)) + } - // Build retry config - backoffConf := retry.NewBackoffConfig() - backoffConf.InitialInterval = uploadInitialInterval - backoffConf.MaxInterval = uploadMaxInterval - backoffConf.MaxElapsedTime = uploadMaxElapsedTime - - uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf) - if uploadErr != nil { - finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr) - if d.force { - finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage) - finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image) - results <- errors.New(finalMessage) - } else { - results <- errors.New(finalMessage) - } + // counter for retry decider + retryCount := uploadRetryLimit + + // decider for our retry, will retry the upload uploadRetryLimit times + uploadRetryDecider := func(err error) bool { + if err == nil { + return false } - wg.Done() - }(key, image) - } - wg.Wait() - close(results) + retryCount-- + if retryCount < 0 { + d.op.Warnf("Attempted upload a total of %d times without success, Upload process failed.", uploadRetryLimit) + return false + } + d.op.Warnf("Failed an attempt to upload isos with err (%s), %d retries remain", err.Error(), retryCount) + return true + } - uploadFailed := false - for err := range results { - if err != nil { - d.op.Error(err) - uploadFailed = true + uploadErr := retry.DoWithConfig(operationForRetry, uploadRetryDecider, backoffConf) + if uploadErr != nil { + finalMessage = fmt.Sprintf("\t\tUpload failed for %q: %s\n", image, uploadErr) + if d.force { + finalMessage = fmt.Sprintf("%s\t\tContinuing despite failures (due to --force option)\n", finalMessage) + finalMessage = fmt.Sprintf("%s\t\tNote: The VCH will not function without %q...", finalMessage, image) + } + d.op.Error(finalMessage) + return errors.New("Failed to upload iso images.") } - } - if uploadFailed { - return errors.New("Failed to upload iso images successfully.") } return nil + } // cleanupAfterCreationFailed cleans up the dangling resource pool for the failed VCH and any bridge network if there is any. diff --git a/lib/progresslog/progresslog.go b/lib/progresslog/progresslog.go new file mode 100644 index 0000000000..1f38b7ce0e --- /dev/null +++ b/lib/progresslog/progresslog.go @@ -0,0 +1,100 @@ +// Copyright 2018 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package progresslog + +import ( + "sync" + "time" + + "github.com/vmware/govmomi/vim25/progress" + "github.com/vmware/govmomi/vim25/soap" +) + +// UploadParams uses default upload settings as initial input and set UploadLogger as a logger. +func UploadParams(ul *UploadLogger) *soap.Upload { + params := soap.DefaultUpload + params.Progress = ul + return ¶ms +} + +// UploadLogger is used to track upload progress to ESXi/VC of a specific file. +type UploadLogger struct { + wg sync.WaitGroup + filename string + interval time.Duration + logTo func(format string, args ...interface{}) +} + +// NewUploadLogger returns a new instance of UploadLogger. User can provide a logger interface +// that will be used to dump output of the current upload status. +func NewUploadLogger(logTo func(format string, args ...interface{}), + filename string, progressInterval time.Duration) *UploadLogger { + + return &UploadLogger{ + logTo: logTo, + filename: filename, + interval: progressInterval, + } +} + +// Sink returns a channel that receives current upload progress status. +// Channel has to be closed by the caller. +func (ul *UploadLogger) Sink() chan<- progress.Report { + ul.wg.Add(1) + ch := make(chan progress.Report) + fmtStr := "Uploading %s. Progress: %.2f%%" + + go func() { + var curProgress float32 + var lastProgress float32 + ul.logTo(fmtStr, ul.filename, curProgress) + + mu := sync.Mutex{} + ticker := time.NewTicker(ul.interval) + + // Print progress every ul.interval seconds. + go func() { + for range ticker.C { + mu.Lock() + lastProgress = curProgress + mu.Unlock() + ul.logTo(fmtStr, ul.filename, lastProgress) + } + }() + + for v := range ch { + mu.Lock() + curProgress = v.Percentage() + mu.Unlock() + } + + ticker.Stop() + + if lastProgress != curProgress { + ul.logTo(fmtStr, ul.filename, curProgress) + } + + if curProgress == 100.0 { + ul.logTo("Uploading of %s has been completed", ul.filename) + } + ul.wg.Done() + }() + return ch +} + +// Wait waits for Sink to complete its work, due to its async nature logging messages may be not sequential. +func (ul *UploadLogger) Wait() { + ul.wg.Wait() +} diff --git a/lib/progresslog/progresslog_test.go b/lib/progresslog/progresslog_test.go new file mode 100644 index 0000000000..2826ecf79c --- /dev/null +++ b/lib/progresslog/progresslog_test.go @@ -0,0 +1,93 @@ +// Copyright 2018 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package progresslog + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/vmware/govmomi/vim25/progress" +) + +type ProgressResults struct { + percentage float32 +} + +func (pr *ProgressResults) Percentage() float32 { + return pr.percentage +} + +func (pr *ProgressResults) Detail() string { + return "" +} + +func (pr *ProgressResults) Error() error { + return nil +} + +var _ progress.Report = &ProgressResults{} + +func TestNewUploadLoggerComplete(t *testing.T) { + var logs []string + logTo := func(format string, args ...interface{}) { + logs = append(logs, fmt.Sprintf(format, args...)) + } + pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10) + progressChan := pl.Sink() + for i := 0; i <= 10; i++ { + res := &ProgressResults{percentage: float32(i * 10)} + progressChan <- res + time.Sleep(time.Duration(time.Millisecond * 5)) + } + close(progressChan) + pl.Wait() + + if assert.True(t, len(logs) > 3) { + last := len(logs) - 1 + assert.Contains(t, logs[0], "unittest") + assert.Contains(t, logs[0], "0.00%") + assert.Contains(t, logs[1], ".00%") + assert.Contains(t, logs[last-1], "100.00%") + assert.Contains(t, logs[last], "complete") + } +} + +func TestNewUploadLoggerNotComplete(t *testing.T) { + var logs []string + logTo := func(format string, args ...interface{}) { + logs = append(logs, fmt.Sprintf(format, args...)) + } + pl := NewUploadLogger(logTo, "unittest", time.Millisecond*10) + progressChan := pl.Sink() + for i := 0; i < 10; i++ { + res := &ProgressResults{percentage: float32(i * 10)} + progressChan <- res + time.Sleep(time.Duration(time.Millisecond * 5)) + } + close(progressChan) + pl.Wait() + + if assert.True(t, len(logs) > 3) { + last := len(logs) - 1 + assert.Contains(t, logs[0], "unittest") + assert.Contains(t, logs[0], "0.00%") + assert.Contains(t, logs[1], ".00%") + assert.NotContains(t, logs[last], "100.00%") + assert.NotContains(t, logs[last], "complete") + } +}