Skip to content

Commit

Permalink
Increase timeout of csi-provisioner and csi attacher (#93)
Browse files Browse the repository at this point in the history
* Increased timeout of csi-provisioner and
csi-attacher. This fixes random failures
in the k8s e2e test.

Cleaned up some linter issues with using
deprecated functions.

Ignore server rejection errors from kubevirt
addvolume/removevolume and retry them.

Signed-off-by: Alexander Wels <awels@redhat.com>

* Address review comments.
Aligned retry-interval-max between csi-provisioner and
csi-attacher

Signed-off-by: Alexander Wels <awels@redhat.com>

* Use exponetial backoff instead of poll immediate

Signed-off-by: Alexander Wels <awels@redhat.com>

---------

Signed-off-by: Alexander Wels <awels@redhat.com>
  • Loading branch information
awels authored Sep 22, 2023
1 parent df0fbec commit 8250632
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 65 deletions.
10 changes: 5 additions & 5 deletions cmd/kubevirt-csi-driver/kubevirt-csi-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
)

var (
endpoint = flag.String("endpoint", "unix:/csi/csi.sock", "CSI endpoint")
nodeName = flag.String("node-name", "", "The node name - the node this pods runs on")
infraClusterNamespace = flag.String("infra-cluster-namespace", "", "The infra-cluster namespace")
infraClusterKubeconfig = flag.String("infra-cluster-kubeconfig", "", "the infra-cluster kubeconfig file. If not set, defaults to in cluster config.")
infraClusterLabels = flag.String("infra-cluster-labels", "", "The infra-cluster labels to use when creating resources in infra cluster. 'name=value' fields separated by a comma")
endpoint = flag.String("endpoint", "unix:/csi/csi.sock", "CSI endpoint")
nodeName = flag.String("node-name", "", "The node name - the node this pods runs on")
infraClusterNamespace = flag.String("infra-cluster-namespace", "", "The infra-cluster namespace")
infraClusterKubeconfig = flag.String("infra-cluster-kubeconfig", "", "the infra-cluster kubeconfig file. If not set, defaults to in cluster config.")
infraClusterLabels = flag.String("infra-cluster-labels", "", "The infra-cluster labels to use when creating resources in infra cluster. 'name=value' fields separated by a comma")
// infraStorageClassEnforcement = flag.String("infra-storage-class-enforcement", "", "A string encoded yaml that represents the policy of enforcing which infra storage classes are allowed in persistentVolume of type kubevirt")
infraStorageClassEnforcement = os.Getenv("INFRA_STORAGE_CLASS_ENFORCEMENT")

Expand Down
26 changes: 15 additions & 11 deletions deploy/controller-infra/base/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
- "--tenant-cluster-kubeconfig=/var/run/secrets/tenantcluster/value"
- "--run-node-service=false"
- "--run-controller-service=true"
- --v=5
- "--v=5"
ports:
- name: healthz
containerPort: 10301
Expand Down Expand Up @@ -76,10 +76,12 @@ spec:
- name: csi-provisioner
image: quay.io/openshift/origin-csi-external-provisioner:latest
args:
- --csi-address=$(ADDRESS)
- --default-fstype=ext4
- --kubeconfig=/var/run/secrets/tenantcluster/value
- --v=5
- "--csi-address=$(ADDRESS)"
- "--default-fstype=ext4"
- "--kubeconfig=/var/run/secrets/tenantcluster/value"
- "--v=5"
- "--timeout=3m"
- "--retry-interval-max=1m"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand All @@ -91,9 +93,11 @@ spec:
- name: csi-attacher
image: quay.io/openshift/origin-csi-external-attacher:latest
args:
- --csi-address=$(ADDRESS)
- --kubeconfig=/var/run/secrets/tenantcluster/value
- --v=5
- "--csi-address=$(ADDRESS)"
- "--kubeconfig=/var/run/secrets/tenantcluster/value"
- "--v=5"
- "--timeout=3m"
- "--retry-interval-max=1m"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand All @@ -109,9 +113,9 @@ spec:
- name: csi-liveness-probe
image: quay.io/openshift/origin-csi-livenessprobe:latest
args:
- --csi-address=/csi/csi.sock
- --probe-timeout=3s
- --health-port=10301
- "--csi-address=/csi/csi.sock"
- "--probe-timeout=3s"
- "--health-port=10301"
volumeMounts:
- name: socket-dir
mountPath: /csi
Expand Down
13 changes: 7 additions & 6 deletions deploy/tenant/base/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ spec:
- "--node-name=$(KUBE_NODE_NAME)"
- "--run-node-service=true"
- "--run-controller-service=false"
- "--v=5"
env:
- name: KUBE_NODE_NAME
valueFrom:
Expand Down Expand Up @@ -203,9 +204,9 @@ spec:
- name: csi-node-driver-registrar
image: quay.io/openshift/origin-csi-node-driver-registrar:latest
args:
- --csi-address=$(ADDRESS)
- --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)
- --v=5
- "--csi-address=$(ADDRESS)"
- "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)"
- "--v=5"
lifecycle:
preStop:
exec:
Expand All @@ -227,9 +228,9 @@ spec:
- name: csi-liveness-probe
image: quay.io/openshift/origin-csi-livenessprobe:latest
args:
- --csi-address=/csi/csi.sock
- --probe-timeout=3s
- --health-port=10300
- "--csi-address=/csi/csi.sock"
- "--probe-timeout=3s"
- "--health-port=10300"
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubevirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *client) RemoveVolumeFromVM(namespace string, vmName string, hotPlugRequ
// EnsureVolumeAvailable checks to make sure the volume is available in the node before returning, checks for 2 minutes
func (c *client) EnsureVolumeAvailable(namespace, vmName, volumeName string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, func() (done bool, err error) {
vmi, err := c.virtClient.VirtualMachineInstance(namespace).Get(context.TODO(), vmName, &metav1.GetOptions{})
vmi, err := c.GetVirtualMachine(namespace, vmName)
if err != nil {
return false, err
}
Expand All @@ -85,7 +85,7 @@ func (c *client) EnsureVolumeAvailable(namespace, vmName, volumeName string, tim
// EnsureVolumeAvailable checks to make sure the volume is available in the node before returning, checks for 2 minutes
func (c *client) EnsureVolumeRemoved(namespace, vmName, volumeName string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, func() (done bool, err error) {
vmi, err := c.virtClient.VirtualMachineInstance(namespace).Get(context.TODO(), vmName, &metav1.GetOptions{})
vmi, err := c.GetVirtualMachine(namespace, vmName)
if err != nil {
return false, err
}
Expand Down
113 changes: 83 additions & 30 deletions pkg/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
kubevirtv1 "kubevirt.io/api/core/v1"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -195,7 +196,7 @@ func (c *ControllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, err
}
dvName := req.VolumeId
klog.Infof("Removing data volume with %s", dvName)
klog.V(3).Infof("Removing data volume with %s", dvName)

err := c.virtClient.DeleteDataVolume(c.infraClusterNamespace, dvName)
if err != nil {
Expand Down Expand Up @@ -231,7 +232,7 @@ func (c *ControllerService) ControllerPublishVolume(
}
dvName := req.GetVolumeId()

klog.Infof("Attaching DataVolume %s to Node ID %s", dvName, req.NodeId)
klog.V(3).Infof("Attaching DataVolume %s to Node ID %s", dvName, req.NodeId)

// Get VM name
vmName, err := c.getVMNameByCSINodeID(req.NodeId)
Expand All @@ -247,7 +248,7 @@ func (c *ControllerService) ControllerPublishVolume(
bus := req.VolumeContext[busParameter]

// hotplug DataVolume to VM
klog.Infof("Start attaching DataVolume %s to VM %s. Volume name: %s. Serial: %s. Bus: %s", dvName, vmName, dvName, serial, bus)
klog.V(3).Infof("Start attaching DataVolume %s to VM %s. Volume name: %s. Serial: %s. Bus: %s", dvName, vmName, dvName, serial, bus)

addVolumeOptions := &kubevirtv1.AddVolumeOptions{
Name: dvName,
Expand All @@ -266,32 +267,58 @@ func (c *ControllerService) ControllerPublishVolume(
},
}

volumeFound := false
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: time.Second,
Steps: 5,
Factor: 2,
Cap: time.Second * 30,
}, func() (bool, error) {
if err := c.addVolumeToVm(dvName, vmName, addVolumeOptions); err != nil {
klog.Infof("failed adding volume %s to VM %s, retrying, err: %v", dvName, vmName, err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}

// Ensure that the csi-attacher and csi-provisioner --timeout values are > the timeout specified here so we don't get
// odd failures with detaching volumes.
err = c.virtClient.EnsureVolumeAvailable(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be ready in time (2m) in VM %s, %v", dvName, vmName, err)
return nil, err
}

klog.V(3).Infof("Successfully attached volume %s to VM %s", dvName, vmName)
return &csi.ControllerPublishVolumeResponse{}, nil
}

func (c *ControllerService) isVolumeAttached(dvName, vmName string) (bool, error) {
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err != nil {
return false, err
}
for _, volumeStatus := range vm.Status.VolumeStatus {
if volumeStatus.Name == dvName {
volumeFound = true
break
return true, nil
}
}
return false, nil
}

func (c *ControllerService) addVolumeToVm(dvName, vmName string, addVolumeOptions *kubevirtv1.AddVolumeOptions) error {
volumeFound, err := c.isVolumeAttached(dvName, vmName)
if err != nil {
return err
}
if !volumeFound {
err = c.virtClient.AddVolumeToVM(c.infraClusterNamespace, vmName, addVolumeOptions)
if err != nil {
klog.Errorf("failed adding volume %s to VM %s, %v", dvName, vmName, err)
return nil, err
return err
}
}

err = c.virtClient.EnsureVolumeAvailable(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be ready in time in VM %s, %v", dvName, vmName, err)
return nil, err
}

return &csi.ControllerPublishVolumeResponse{}, nil
return nil
}

func (c *ControllerService) validateControllerUnpublishVolumeRequest(req *csi.ControllerUnpublishVolumeRequest) error {
Expand All @@ -314,19 +341,44 @@ func (c *ControllerService) ControllerUnpublishVolume(ctx context.Context, req *
return nil, err
}
dvName := req.VolumeId
klog.Infof("Detaching DataVolume %s from Node ID %s", dvName, req.NodeId)
klog.V(3).Infof("Detaching DataVolume %s from Node ID %s", dvName, req.NodeId)

// Get VM name
vmName, err := c.getVMNameByCSINodeID(req.NodeId)
if err != nil {
return nil, err
}

vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: time.Second,
Steps: 5,
Factor: 2,
Cap: time.Second * 30,
}, func() (bool, error) {
if err := c.removeVolumeFromVm(dvName, vmName); err != nil {
klog.Infof("failed removing volume %s from VM %s, err: %v", dvName, vmName, err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}

err = c.virtClient.EnsureVolumeRemoved(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Error("failed getting virtual machine " + vmName)
klog.Errorf("volume %s failed to be removed in time (2m) from VM %s, %v", dvName, vmName, err)
return nil, err
}

klog.V(3).Infof("Successfully unpublished volume %s from VM %s", dvName, vmName)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

func (c *ControllerService) removeVolumeFromVm(dvName, vmName string) error {
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err != nil {
return err
}
removePossible := false
for _, volumeStatus := range vm.Status.VolumeStatus {
if volumeStatus.HotplugVolume != nil && volumeStatus.Name == dvName {
Expand All @@ -337,17 +389,10 @@ func (c *ControllerService) ControllerUnpublishVolume(ctx context.Context, req *
// Detach DataVolume from VM
err = c.virtClient.RemoveVolumeFromVM(c.infraClusterNamespace, vmName, &kubevirtv1.RemoveVolumeOptions{Name: dvName})
if err != nil {
klog.Error("failed removing volume " + dvName + " from VM " + vmName)
return nil, err
return err
}
}
err = c.virtClient.EnsureVolumeRemoved(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be removed in time from VM %s, %v", dvName, vmName, err)
return nil, err
}

return &csi.ControllerUnpublishVolumeResponse{}, nil
return nil
}

// ValidateVolumeCapabilities unimplemented
Expand All @@ -359,19 +404,27 @@ func (c *ControllerService) ValidateVolumeCapabilities(ctx context.Context, req
if len(req.VolumeCapabilities) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "volumeCapabilities not provided for %s", req.VolumeId)
}

klog.V(3).Info("Calling volume capabilities")
for _, cap := range req.GetVolumeCapabilities() {
if cap.GetMount() == nil {
return nil, status.Error(codes.InvalidArgument, "mount type is undefined")
}
}
dvName := req.GetVolumeId()
klog.V(3).Infof("DataVolume name %s", dvName)
if _, err := c.virtClient.GetDataVolume(c.infraClusterNamespace, dvName); errors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "volume %s not found", req.GetVolumeId())
} else if err != nil {
return nil, err
}

klog.V(5).Info("Returning capabilities %v", &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
VolumeCapabilities: req.GetVolumeCapabilities(),
Parameters: req.GetParameters(),
},
})
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
Expand Down
Loading

0 comments on commit 8250632

Please sign in to comment.