diff --git a/pkg/node/node.go b/pkg/node/node.go index 0f30f6576..eff682ca8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -353,10 +353,15 @@ func (s *CSINodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUns } currStatus := volumeCR.Spec.CSIStatus - if currStatus == apiV1.Created { + switch currStatus { + case apiV1.Failed: + ll.Warningf("Volume status: %s. Need to retry.", currStatus) + case apiV1.Created: ll.Info("Volume has been already unstaged") return &csi.NodeUnstageVolumeResponse{}, nil - } else if currStatus != apiV1.VolumeReady { + case apiV1.VolumeReady: + ll.Infof("Expected volume status: %s", currStatus) + default: msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]", currStatus, apiV1.Created, apiV1.VolumeReady) ll.Error(msg) @@ -468,8 +473,9 @@ func (s *CSINodeService) NodePublishVolume(ctx context.Context, req *csi.NodePub } currStatus := volumeCR.Spec.CSIStatus - // if currStatus not in [VolumeReady, Published] - if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published { + if currStatus == apiV1.Failed { + ll.Warningf("Volume status: %s. Need to retry.", currStatus) + } else if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published { msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]", currStatus, apiV1.VolumeReady, apiV1.Published) ll.Error(msg) @@ -488,6 +494,46 @@ func (s *CSINodeService) NodePublishVolume(ctx context.Context, req *csi.NodePub resp, errToReturn = nil, fmt.Errorf("failed to publish volume: fake attach error %s", err.Error()) } } else { + // will check whether srcPath is mounted, if not, need to redo NodeStageVolume + srcMounted, err := s.fsOps.IsMounted(srcPath) + if err != nil { + errMsg := fmt.Sprintf("execute IsMounted on %s with error: %s", srcPath, err.Error()) + ll.Error(errMsg) + return nil, fmt.Errorf("failed to publish volume: %s", errMsg) + } + if !srcMounted { + ll.Warnf("staging path %s is not mounted! need to redo NodeStageVolume!", srcPath) + nodeStageReq := &csi.NodeStageVolumeRequest{ + VolumeId: volumeID, + StagingTargetPath: req.GetStagingTargetPath(), + VolumeCapability: req.GetVolumeCapability(), + } + + // unlock volume to redo NodeStageVolume + err := s.volMu.UnlockKey(req.GetVolumeId()) + if err != nil { + errMsg := fmt.Sprintf("unlock volume %s to redo NodeStageVolume with error: %s", volumeID, err.Error()) + ll.Error(errMsg) + return nil, fmt.Errorf("failed to publish volume: %s", errMsg) + } + nodeStageResp, err := s.NodeStageVolume(ctx, nodeStageReq) + + // re-lock the volume to proceed NodePublishVolume + s.volMu.LockKey(req.GetVolumeId()) + + if nodeStageResp == nil && err != nil { + errMsg := fmt.Sprintf("redo NodeStageVolume on volume %s with error: %s", volumeID, err.Error()) + ll.Error(errMsg) + return nil, fmt.Errorf("failed to publish volume: %s", errMsg) + } + + // update the content of volume + volumeCR, err = s.crHelper.GetVolumeByID(volumeID) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("unable to get updated volume %s", volumeID)) + } + } + _, isBlock := req.GetVolumeCapability().GetAccessType().(*csi.VolumeCapability_Block) if err := s.fsOps.PrepareAndPerformMount(srcPath, dstPath, isBlock, !isBlock, mountOptions...); err != nil { ll.Errorf("Unable to mount volume: %v", err) @@ -554,8 +600,9 @@ func (s *CSINodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU } currStatus := volumeCR.Spec.CSIStatus - // if currStatus not in [VolumeReady, Published] - if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published { + if currStatus == apiV1.Failed { + ll.Warningf("Volume status: %s. Need to retry.", currStatus) + } else if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published { msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]", currStatus, apiV1.VolumeReady, apiV1.Published) ll.Error(msg) @@ -602,6 +649,9 @@ func (s *CSINodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU volumeCR.Spec.Owners = owners if len(volumeCR.Spec.Owners) == 0 { volumeCR.Spec.CSIStatus = apiV1.VolumeReady + } else { + // ensure the Published status of volume in the successful processing + volumeCR.Spec.CSIStatus = apiV1.Published } if updateErr := s.k8sClient.UpdateCR(ctxWithID, volumeCR); updateErr != nil { ll.Errorf("Unable to set volume CR status to VolumeReady: %v", updateErr) diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index ad4e6e7cb..9a06aadaf 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -87,9 +87,9 @@ var _ = Describe("CSINodeService NodePublish()", func() { req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) req.VolumeContext[util.PodNameKey] = testPod1Name - fsOps.On("PrepareAndPerformMount", - path.Join(req.GetStagingTargetPath(), stagingFileName), req.GetTargetPath(), false, true). - Return(nil) + srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName) + fsOps.On("PrepareAndPerformMount", srcPath, req.GetTargetPath(), false, true).Return(nil) + fsOps.On("IsMounted", srcPath).Return(true, nil) resp, err := node.NodePublishVolume(testCtx, req) Expect(resp).NotTo(BeNil()) @@ -101,7 +101,9 @@ var _ = Describe("CSINodeService NodePublish()", func() { Expect(err).To(BeNil()) Expect(volumeCR.Spec.Owners[0]).To(Equal(testPod1Name)) - // publish again such volume + // publish again such volume in Failed Status + volumeCR.Spec.CSIStatus = apiV1.Failed + err = node.k8sClient.UpdateCR(testCtx, volumeCR) resp, err = node.NodePublishVolume(testCtx, req) Expect(resp).NotTo(BeNil()) Expect(err).To(BeNil()) @@ -111,6 +113,29 @@ var _ = Describe("CSINodeService NodePublish()", func() { err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR) Expect(err).To(BeNil()) Expect(len(volumeCR.Spec.Owners)).To(Equal(1)) + Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Published)) + }) + It("Should publish volume successfully with stagingPath unmounted", func() { + req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) + req.VolumeContext[util.PodNameKey] = testPod1Name + + partitionPath := "/partition/path/for/volume1" + stagingPath := path.Join(req.GetStagingTargetPath(), stagingFileName) + prov.On("GetVolumePath", &testVolume1).Return(partitionPath, nil) + fsOps.On("PrepareAndPerformMount", partitionPath, stagingPath, true, false).Return(nil) + + fsOps.On("PrepareAndPerformMount", stagingPath, req.GetTargetPath(), false, true).Return(nil) + fsOps.On("IsMounted", stagingPath).Return(false, nil) + + resp, err := node.NodePublishVolume(testCtx, req) + Expect(resp).NotTo(BeNil()) + Expect(err).To(BeNil()) + + // check owner appearance + volumeCR := &vcrd.Volume{} + err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR) + Expect(err).To(BeNil()) + Expect(volumeCR.Spec.Owners[0]).To(Equal(testPod1Name)) }) }) @@ -157,12 +182,12 @@ var _ = Describe("CSINodeService NodePublish()", func() { Expect(err).NotTo(BeNil()) Expect(err.Error()).To(ContainSubstring("Staging Path missing in request")) }) - It("Should fail, because Volume has failed status", func() { + It("Should fail, because Volume has unexpected status", func() { req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) vol1 := &vcrd.Volume{} err := node.k8sClient.ReadCR(testCtx, testVolume1.Id, testNs, vol1) Expect(err).To(BeNil()) - vol1.Spec.CSIStatus = apiV1.Failed + vol1.Spec.CSIStatus = apiV1.Creating err = node.k8sClient.UpdateCR(testCtx, vol1) Expect(err).To(BeNil()) @@ -184,15 +209,44 @@ var _ = Describe("CSINodeService NodePublish()", func() { It("Should fail, because of PrepareAndPerformMount failed", func() { req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) - fsOps.On("PrepareAndPerformMount", - path.Join(req.GetStagingTargetPath(), stagingFileName), req.GetTargetPath(), false, true). - Return(errors.New("error mount")) + srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName) + fsOps.On("PrepareAndPerformMount", srcPath, req.GetTargetPath(), false, true).Return(errors.New("error mount")) + fsOps.On("IsMounted", srcPath).Return(true, nil) resp, err := node.NodePublishVolume(testCtx, req) Expect(resp).To(BeNil()) Expect(err).NotTo(BeNil()) Expect(err.Error()).To(ContainSubstring("mount error")) }) + It("Should fail, because the check on whether stagingPath is mounted failed", func() { + req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) + + srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName) + errMsg := fmt.Sprintf("unable to check whether %s is mounted", srcPath) + fsOps.On("IsMounted", srcPath).Return(false, errors.New(errMsg)) + + resp, err := node.NodePublishVolume(testCtx, req) + Expect(resp).To(BeNil()) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(ContainSubstring(errMsg)) + }) + It("Should fail, because redoing NodeStageVolume when stagingPath unmounted failed", func() { + req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap) + req.VolumeContext[util.PodNameKey] = testPod1Name + + partitionPath := "/partition/path/for/volume1" + stagingPath := path.Join(req.GetStagingTargetPath(), stagingFileName) + prov.On("GetVolumePath", &testVolume1).Return(partitionPath, nil) + fsOps.On("PrepareAndPerformMount", partitionPath, stagingPath, true, false).Return(errors.New("mount error")) + fsOps.On("IsMounted", stagingPath).Return(false, nil) + + redoErr := fmt.Sprintf("redo NodeStageVolume on volume %s with error", testV1ID) + + resp, err := node.NodePublishVolume(testCtx, req) + Expect(resp).To(BeNil()) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(ContainSubstring(redoErr)) + }) }) }) @@ -364,6 +418,19 @@ var _ = Describe("CSINodeService NodeUnPublish()", func() { Expect(err).To(BeNil()) Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.VolumeReady)) Expect(volumeCR.Spec.Owners).To(BeNil()) + + // unpublish again on failed volume + volumeCR.Spec.CSIStatus = apiV1.Failed + err = node.k8sClient.UpdateCR(testCtx, volumeCR) + Expect(err).To(BeNil()) + + resp, err = node.NodeUnpublishVolume(testCtx, req) + Expect(resp).NotTo(BeNil()) + Expect(err).To(BeNil()) + + err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR) + Expect(err).To(BeNil()) + Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.VolumeReady)) }) It("Should unpublish volume and don't change volume CR status", func() { req := getNodeUnpublishRequest(testV1ID, targetPath1) @@ -506,6 +573,18 @@ var _ = Describe("CSINodeService NodeUnStage()", func() { err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR) Expect(err).To(BeNil()) Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Created)) + + // retry unstage on failed volume + volumeCR.Spec.CSIStatus = apiV1.Failed + err = node.k8sClient.UpdateCR(testCtx, volumeCR) + Expect(err).To(BeNil()) + resp, err = node.NodeUnstageVolume(testCtx, req) + Expect(resp).NotTo(BeNil()) + Expect(err).To(BeNil()) + + err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR) + Expect(err).To(BeNil()) + Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Created)) }) }) @@ -564,12 +643,12 @@ var _ = Describe("CSINodeService NodeUnStage()", func() { Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Failed)) }) - It("Should failed, because Volume has failed status", func() { + It("Should failed, because Volume has unexpected status", func() { req := getNodeUnstageRequest(testV1ID, targetPath) vol1 := &vcrd.Volume{} err := node.k8sClient.ReadCR(testCtx, testVolume1.Id, testNs, vol1) Expect(err).To(BeNil()) - vol1.Spec.CSIStatus = apiV1.Failed + vol1.Spec.CSIStatus = apiV1.Creating err = node.k8sClient.UpdateCR(testCtx, vol1) Expect(err).To(BeNil())