Skip to content

Commit

Permalink
Add new DV reason: ImagePullFailed
Browse files Browse the repository at this point in the history
When the image pull fails, the DataVolume Running condition, will have
the Reason field of `ImagePullFailed`, to allow better error handling in
code taht uses it.

Signed-off-by: Nahshon Unna-Tsameret <nunnatsa@redhat.com>
  • Loading branch information
nunnatsa committed Apr 14, 2024
1 parent efa362f commit 0174c9a
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 49 deletions.
8 changes: 8 additions & 0 deletions cmd/cdi-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/utils/ptr"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"

"kubevirt.io/containerized-data-importer/pkg/common"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/image"
Expand Down Expand Up @@ -179,6 +180,13 @@ func handleImport(
err := processor.ProcessData()

scratchSpaceRequired := errors.Is(err, importer.ErrRequiresScratchSpace)

ipfErr := &importer.ErrImagePullFailed{}
isImagePullFailure := errors.As(err, &ipfErr)
if isImagePullFailure {
err = ipfErr
}

if err != nil && !scratchSpaceRequired {
klog.Errorf("%+v", err)
if err := util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"k8s.io/klog/v2"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"

"kubevirt.io/containerized-data-importer/pkg/common"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/util"
"kubevirt.io/containerized-data-importer/pkg/util/cert"
Expand All @@ -60,6 +60,9 @@ const (
// ScratchSpaceRequiredReason is a const that defines the pod exited due to a lack of scratch space
ScratchSpaceRequiredReason = "Scratch space required"

// ImagePullFailedReason is a const that defines the pod exited due to failure when pulling image
ImagePullFailedReason = "ImagePullFailed"

// ImportCompleteMessage is a const that defines the pod completeded the import successfully
ImportCompleteMessage = "Import Complete"

Expand Down Expand Up @@ -327,6 +330,11 @@ func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, termMs
if strings.Contains(containerState.Terminated.Message, common.PreallocationApplied) {
anno[cc.AnnPreallocationApplied] = "true"
}

if strings.Contains(containerState.Terminated.Message, "failed to pull image") {
anno[prefix+".reason"] = ImagePullFailedReason
return
}
}
anno[prefix+".reason"] = containerState.Terminated.Reason
}
Expand Down
1 change: 1 addition & 0 deletions pkg/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"data-processor.go",
"errors.go",
"format-readers.go",
"gcs-datasource.go",
"http-datasource.go",
Expand Down
14 changes: 0 additions & 14 deletions pkg/importer/data-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package importer

import (
"fmt"
"net/url"
"os"

Expand Down Expand Up @@ -62,19 +61,6 @@ const (
ProcessingPhaseMergeDelta ProcessingPhase = "MergeDelta"
)

// ValidationSizeError is an error indication size validation failure.
type ValidationSizeError struct {
err error
}

func (e ValidationSizeError) Error() string { return e.err.Error() }

// ErrRequiresScratchSpace indicates that we require scratch space.
var ErrRequiresScratchSpace = fmt.Errorf(common.ScratchSpaceRequired)

// ErrInvalidPath indicates that the path is invalid.
var ErrInvalidPath = fmt.Errorf("invalid transfer path")

// may be overridden in tests
var getAvailableSpaceBlockFunc = util.GetAvailableSpaceBlock
var getAvailableSpaceFunc = util.GetAvailableSpace
Expand Down
42 changes: 42 additions & 0 deletions pkg/importer/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package importer

import (
"fmt"

"kubevirt.io/containerized-data-importer/pkg/common"
)

// ValidationSizeError is an error indication size validation failure.
type ValidationSizeError struct {
err error
}

func (e ValidationSizeError) Error() string { return e.err.Error() }

// ErrRequiresScratchSpace indicates that we require scratch space.
var ErrRequiresScratchSpace = fmt.Errorf(common.ScratchSpaceRequired)

// ErrInvalidPath indicates that the path is invalid.
var ErrInvalidPath = fmt.Errorf("invalid transfer path")

// ErrImagePullFailed indicates that the importer failed to pull an image; This error type wraps the actual error.
type ErrImagePullFailed struct {
err error
}

// NewErrImagePullFailed creates new ErrImagePullFailed error object, with embedded error.
//
// Use the err parameter fot the actual wrapped error
func NewErrImagePullFailed(err error) *ErrImagePullFailed {
return &ErrImagePullFailed{
err: err,
}
}

func (err *ErrImagePullFailed) Error() string {
return fmt.Sprintf("%s: %s", "failed to pull image", err.err.Error())
}

func (err *ErrImagePullFailed) Unwrap() error {
return err.err
}
4 changes: 2 additions & 2 deletions pkg/importer/gcs-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (sd *GCSDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, ErrInvalidPath
}

err := util.StreamDataToFile(sd.readers.TopReader(), file)
err := streamDataToFile(sd.readers.TopReader(), file)

if err != nil {
klog.V(3).Infoln("GCS Importer: Transfer Error: ", err)
Expand All @@ -145,7 +145,7 @@ func (sd *GCSDataSource) TransferFile(fileName string) (ProcessingPhase, error)
return ProcessingPhaseError, err
}

err := util.StreamDataToFile(sd.readers.TopReader(), fileName)
err := streamDataToFile(sd.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/importer/http-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/klog/v2"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"

"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/image"
"kubevirt.io/containerized-data-importer/pkg/util"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (hs *HTTPDataSource) Transfer(path string) (ProcessingPhase, error) {
if err != nil || size <= 0 {
return ProcessingPhaseError, ErrInvalidPath
}
err = util.StreamDataToFile(hs.readers.TopReader(), file)
err = streamDataToFile(hs.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
Expand All @@ -183,7 +184,7 @@ func (hs *HTTPDataSource) TransferFile(fileName string) (ProcessingPhase, error)
return ProcessingPhaseError, err
}
hs.readers.StartProgressUpdate()
err := util.StreamDataToFile(hs.readers.TopReader(), fileName)
err := streamDataToFile(hs.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/importer/imageio-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (is *ImageioDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, ErrInvalidPath
}
is.readers.StartProgressUpdate()
err = util.StreamDataToFile(is.readers.TopReader(), file)
err = streamDataToFile(is.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (is *ImageioDataSource) TransferFile(fileName string) (ProcessingPhase, err
return ProcessingPhaseError, err
}
} else {
err := util.StreamDataToFile(is.readers.TopReader(), fileName)
err := streamDataToFile(is.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/importer/s3-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (sd *S3DataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, ErrInvalidPath
}

err := util.StreamDataToFile(sd.readers.TopReader(), file)
err := streamDataToFile(sd.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
Expand All @@ -115,7 +115,7 @@ func (sd *S3DataSource) TransferFile(fileName string) (ProcessingPhase, error) {
return ProcessingPhaseError, err
}

err := util.StreamDataToFile(sd.readers.TopReader(), fileName)
err := streamDataToFile(sd.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/importer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/klog/v2"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/util"
)

const (
Expand Down Expand Up @@ -76,7 +75,7 @@ func readImageSource(ctx context.Context, sys *types.SystemContext, img string)
src, err := ref.NewImageSource(ctx, sys)
if err != nil {
klog.Errorf("Could not create image reference: %v", err)
return nil, errors.Wrap(err, "Could not create image reference")
return nil, NewErrImagePullFailed(err)
}

return src, nil
Expand Down Expand Up @@ -157,7 +156,7 @@ func processLayer(ctx context.Context,
return false, errors.Wrap(err, "Error creating output file's directory")
}

if err := util.StreamDataToFile(tarReader, filepath.Join(destDir, hdr.Name)); err != nil {
if err := streamDataToFile(tarReader, filepath.Join(destDir, hdr.Name)); err != nil {
klog.Errorf("Error copying file: %v", err)
return false, errors.Wrap(err, "Error copying file")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/importer/upload-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/klog/v2"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"

"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ func (ud *UploadDataSource) Transfer(path string) (ProcessingPhase, error) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
err = util.StreamDataToFile(ud.readers.TopReader(), file)
err = streamDataToFile(ud.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
Expand All @@ -94,7 +95,7 @@ func (ud *UploadDataSource) TransferFile(fileName string) (ProcessingPhase, erro
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := util.StreamDataToFile(ud.readers.TopReader(), fileName)
err := streamDataToFile(ud.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down Expand Up @@ -158,7 +159,7 @@ func (aud *AsyncUploadDataSource) Transfer(path string) (ProcessingPhase, error)
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
err = util.StreamDataToFile(aud.uploadDataSource.readers.TopReader(), file)
err = streamDataToFile(aud.uploadDataSource.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
Expand All @@ -173,7 +174,7 @@ func (aud *AsyncUploadDataSource) TransferFile(fileName string) (ProcessingPhase
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := util.StreamDataToFile(aud.uploadDataSource.readers.TopReader(), fileName)
err := streamDataToFile(aud.uploadDataSource.readers.TopReader(), fileName)
if err != nil {
return ProcessingPhaseError, err
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/importer/util.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package importer

import (
"io"
"net/url"
"os"
"os/signal"
"strings"
"syscall"

"github.com/pkg/errors"
"k8s.io/klog/v2"

"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
Expand Down Expand Up @@ -82,3 +84,20 @@ func envToLabel(env string) string {

return strings.ToLower(label)
}

// streamDataToFile provides a function to stream the specified io.Reader to the specified local file
func streamDataToFile(r io.Reader, fileName string) error {
outFile, err := util.OpenFileOrBlockDevice(fileName)
if err != nil {
return err
}
defer outFile.Close()
klog.V(1).Infof("Writing data...\n")
if _, err = io.Copy(outFile, r); err != nil {
klog.Errorf("Unable to write file from dataReader: %v\n", err)
os.Remove(outFile.Name())
return NewErrImagePullFailed(err)
}
err = outFile.Sync()
return err
}
3 changes: 1 addition & 2 deletions pkg/importer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
. "github.com/onsi/gomega"

"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
)

var _ = Describe("Parse endpoints", func() {
Expand Down Expand Up @@ -69,7 +68,7 @@ var _ = Describe("Stream Data To File", func() {
if useTmpDir {
fileName = filepath.Join(tmpDir, fileName)
}
err = util.StreamDataToFile(r, fileName)
err = streamDataToFile(r, fileName)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
} else {
Expand Down
18 changes: 1 addition & 17 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"

"kubevirt.io/containerized-data-importer/pkg/common"
)

Expand Down Expand Up @@ -178,23 +179,6 @@ func OpenFileOrBlockDevice(fileName string) (*os.File, error) {
return outFile, nil
}

// StreamDataToFile provides a function to stream the specified io.Reader to the specified local file
func StreamDataToFile(r io.Reader, fileName string) error {
outFile, err := OpenFileOrBlockDevice(fileName)
if err != nil {
return err
}
defer outFile.Close()
klog.V(1).Infof("Writing data...\n")
if _, err = io.Copy(outFile, r); err != nil {
klog.Errorf("Unable to write file from dataReader: %v\n", err)
os.Remove(outFile.Name())
return errors.Wrapf(err, "unable to write to file")
}
err = outFile.Sync()
return err
}

// UnArchiveTar unarchives a tar file and streams its files
// using the specified io.Reader to the specified destination.
func UnArchiveTar(reader io.Reader, destDir string) error {
Expand Down
Loading

0 comments on commit 0174c9a

Please sign in to comment.