From 6ddcbbdcc3f7f3ad4c5eac20cff98fe46b0566a3 Mon Sep 17 00:00:00 2001 From: garenchan Date: Thu, 7 Jul 2022 10:39:55 +0800 Subject: [PATCH] Pods using the same volume share mount --- .../templates/daemonset.yml | 7 + pkg/driver/controllerserver.go | 23 +-- pkg/driver/nodeserver.go | 191 ++++++++++++------ pkg/driver/utils.go | 45 ++++- pkg/driver/volume.go | 147 ++++++++++++++ 5 files changed, 331 insertions(+), 82 deletions(-) create mode 100644 pkg/driver/volume.go diff --git a/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml b/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml index 60a1338..8e768d3 100644 --- a/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml +++ b/deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml @@ -87,6 +87,9 @@ spec: volumeMounts: - name: plugin-dir mountPath: /csi + - name: plugins-dir + mountPath: /var/lib/kubelet/plugins + mountPropagation: "Bidirectional" - name: pods-mount-dir mountPath: /var/lib/kubelet/pods mountPropagation: "Bidirectional" @@ -105,6 +108,10 @@ spec: hostPath: path: /var/lib/kubelet/plugins/{{ .Values.driverName }} type: DirectoryOrCreate + - name: plugins-dir + hostPath: + path: /var/lib/kubelet/plugins + type: Directory - name: pods-mount-dir hostPath: path: /var/lib/kubelet/pods diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index 6a8585c..3ab0853 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -11,11 +11,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" _ "google.golang.org/grpc/resolver/passthrough" "google.golang.org/grpc/status" ) @@ -175,25 +172,7 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap } func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { - volumeID := req.GetVolumeId() - glog.V(0).Infof("Controller expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes) - - localSocket := GetLocalSocket(volumeID) - clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, err - } - defer clientConn.Close() - - client := mount_pb.NewSeaweedMountClient(clientConn) - _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ - CollectionCapacity: req.CapacityRange.RequiredBytes, - }) - - return &csi.ControllerExpandVolumeResponse{ - CapacityBytes: req.CapacityRange.RequiredBytes, - }, err - + return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index d48875f..5a7206e 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -4,13 +4,11 @@ import ( "context" "os" "strings" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" _ "google.golang.org/grpc/resolver/passthrough" "google.golang.org/grpc/status" "k8s.io/utils/mount" @@ -19,16 +17,19 @@ import ( type NodeServer struct { Driver *SeaweedFsDriver mounter mount.Interface + + // information about the managed volumes + volumes sync.Map + volumeMutexes *KeyMutex } var _ = csi.NodeServer(&NodeServer{}) -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { volumeID := req.GetVolumeId() // mount the fs here - targetPath := req.GetTargetPath() - - glog.V(0).Infof("NodePublishVolume volume %s to %s", volumeID, targetPath) + stagingTargetPath := req.GetStagingTargetPath() + glog.V(0).Infof("node stage volume %s to %s", volumeID, stagingTargetPath) // Check arguments if req.GetVolumeCapability() == nil { @@ -40,25 +41,30 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if targetPath == "" { + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - // check whether it can be mounted - notMnt, err := checkMount(targetPath) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - if !notMnt { - return &csi.NodePublishVolumeResponse{}, nil + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + // The volume has been staged. + if _, ok := ns.volumes.Load(volumeID); ok { + glog.V(0).Infof("volume %s has been staged", volumeID) + return &csi.NodeStageVolumeResponse{}, nil } volContext := req.GetVolumeContext() - mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext) + readOnly := isVolumeReadOnly(req) + + mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext) if err != nil { return nil, err } - if err := mounter.Mount(targetPath); err != nil { + + volume := NewVolume(volumeID, mounter) + if err := volume.Stage(stagingTargetPath); err != nil { if os.IsPermission(err) { return nil, status.Error(codes.PermissionDenied, err.Error()) } @@ -68,25 +74,75 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } - glog.V(0).Infof("volume %s successfully mounted to %s", volumeID, targetPath) + ns.volumes.Store(volumeID, volume) + glog.V(0).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath) + + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + + glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath) + + // Check arguments + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) { + // return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") + } + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet") + } + + // When pod uses a volume in read-only mode, k8s will automatically + // mount the volume as a read-only file system. + if err := volume.(*Volume).Publish(targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + glog.V(0).Infof("volume %s successfully published to %s", volumeID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { - + volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() + glog.V(0).Infof("node unpublish volume %s from %s", volumeID, targetPath) + + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } if targetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if err := fuseUnmount(targetPath); err != nil { - return nil, status.Error(codes.Internal, err.Error()) + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been published", volumeID) + return &csi.NodeUnpublishVolumeResponse{}, nil } - err := os.Remove(targetPath) - if err != nil && !os.IsNotExist(err) { + if err := volume.(*Volume).Unpublish(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -106,6 +162,13 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC return &csi.NodeGetCapabilitiesResponse{ Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -124,60 +187,72 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol } func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + stagingTargetPath := req.GetStagingTargetPath() + glog.V(0).Infof("node unstage volume %s from %s", volumeID, stagingTargetPath) + // Check arguments - if req.GetVolumeId() == "" { + if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - if req.GetStagingTargetPath() == "" { + if stagingTargetPath == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - return &csi.NodeUnstageVolumeResponse{}, nil -} + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() -func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - // Check arguments - if req.GetVolumeId() == "" { - return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + volume, ok := ns.volumes.Load(volumeID) + if !ok { + glog.Warningf("volume %s hasn't been staged", volumeID) + return &csi.NodeUnstageVolumeResponse{}, nil } - if req.GetStagingTargetPath() == "" { - return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + + if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else { + ns.volumes.Delete(volumeID) } - return &csi.NodeStageVolumeResponse{}, nil + + return &csi.NodeUnstageVolumeResponse{}, nil } func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { volumeID := req.GetVolumeId() - glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes) + requiredBytes := req.GetCapacityRange().GetRequiredBytes() + glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, requiredBytes) - localSocket := GetLocalSocket(volumeID) - clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, err + volumeMutex := ns.getVolumeMutex(volumeID) + volumeMutex.Lock() + defer volumeMutex.Unlock() + + if volume, ok := ns.volumes.Load(volumeID); ok { + if err := volume.(*Volume).Expand(requiredBytes); err != nil { + return nil, err + } } - defer clientConn.Close() - client := mount_pb.NewSeaweedMountClient(clientConn) - _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ - CollectionCapacity: req.CapacityRange.RequiredBytes, - }) + return &csi.NodeExpandVolumeResponse{}, nil +} - return &csi.NodeExpandVolumeResponse{ - CapacityBytes: req.CapacityRange.RequiredBytes, - }, err +func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex { + return ns.volumeMutexes.GetMutex(volumeID) } -func checkMount(targetPath string) (bool, error) { - notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) - if err != nil { - if os.IsNotExist(err) { - if err = os.MkdirAll(targetPath, 0750); err != nil { - return false, err - } - notMnt = true - } else { - return false, err +func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool { + mode := req.GetVolumeCapability().GetAccessMode().Mode + + readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, + csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + } + + for _, readOnlyMode := range readOnlyModes { + if mode == readOnlyMode { + return true } } - return notMnt, nil + + return false } diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go index b783f99..0f45733 100644 --- a/pkg/driver/utils.go +++ b/pkg/driver/utils.go @@ -2,18 +2,23 @@ package driver import ( "fmt" + "os" "strings" + "sync" - "github.com/container-storage-interface/spec/lib/go/csi" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/container-storage-interface/spec/lib/go/csi" "golang.org/x/net/context" "google.golang.org/grpc" + "k8s.io/utils/mount" ) func NewNodeServer(n *SeaweedFsDriver) *NodeServer { return &NodeServer{ - Driver: n, + Driver: n, + volumeMutexes: NewKeyMutex(32), } } @@ -58,3 +63,39 @@ func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, h glog.V(3).Infof("GRPC %s response %+v", info.FullMethod, resp) return resp, err } + +func checkMount(targetPath string) (bool, error) { + notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath) + if err != nil { + if os.IsNotExist(err) { + if err = os.MkdirAll(targetPath, 0750); err != nil { + return false, err + } + notMnt = true + } else { + return false, err + } + } + return notMnt, nil +} + +type KeyMutex struct { + mutexes []sync.RWMutex + size int32 +} + +func NewKeyMutex(size int32) *KeyMutex { + return &KeyMutex{ + mutexes: make([]sync.RWMutex, size), + size: size, + } +} + +func (km *KeyMutex) GetMutex(key string) *sync.RWMutex { + index := util.HashToInt32([]byte(key)) + if index < 0 { + index = -index + } + + return &km.mutexes[index%km.size] +} diff --git a/pkg/driver/volume.go b/pkg/driver/volume.go new file mode 100644 index 0000000..44b4e65 --- /dev/null +++ b/pkg/driver/volume.go @@ -0,0 +1,147 @@ +package driver + +import ( + "context" + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type Volume struct { + VolumeId string + + // volume's real mount point + stagingTargetPath string + + // Target paths to which the volume has been published. + // These paths are symbolic links to the real mount point. + // So multiple pods using the same volume can share a mount. + targetPaths map[string]bool + + mounter Mounter + + // unix socket used to manage volume + localSocket string +} + +func NewVolume(volumeID string, mounter Mounter) *Volume { + return &Volume{ + VolumeId: volumeID, + mounter: mounter, + targetPaths: make(map[string]bool), + } +} + +func (vol *Volume) Stage(stagingTargetPath string) error { + if vol.isStaged() { + return nil + } + + // check whether it can be mounted + if notMnt, err := checkMount(stagingTargetPath); err != nil { + return err + } else if !notMnt { + // maybe already mounted? + return nil + } + + if err := vol.mounter.Mount(stagingTargetPath); err != nil { + return err + } + + vol.stagingTargetPath = stagingTargetPath + return nil +} + +func (vol *Volume) Publish(targetPath string) error { + if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) { + return err + } + + if err := os.Symlink(vol.stagingTargetPath, targetPath); err != nil { + return err + } + + vol.targetPaths[targetPath] = true + return nil +} + +func (vol *Volume) Expand(sizeByte int64) error { + if !vol.isStaged() { + return nil + } + + target := fmt.Sprintf("passthrough:///unix://%s", vol.getLocalSocket()) + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + clientConn, err := grpc.Dial(target, dialOption) + if err != nil { + return err + } + defer clientConn.Close() + + client := mount_pb.NewSeaweedMountClient(clientConn) + _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ + CollectionCapacity: sizeByte, + }) + return err +} + +func (vol *Volume) Unpublish(targetPath string) error { + // Check whether the volume is published to the target path. + if _, ok := vol.targetPaths[targetPath]; !ok { + glog.Warningf("volume %s hasn't been published to %s", vol.VolumeId, targetPath) + return nil + } + + delete(vol.targetPaths, targetPath) + + if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +func (vol *Volume) Unstage(_ string) error { + if !vol.isStaged() { + return nil + } + + mountPoint := vol.stagingTargetPath + glog.V(0).Infof("unmounting volume %s from %s", vol.VolumeId, mountPoint) + + if err := fuseUnmount(mountPoint); err != nil { + return err + } + + if err := os.Remove(mountPoint); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +func (vol *Volume) isStaged() bool { + return vol.stagingTargetPath != "" +} + +func (vol *Volume) getLocalSocket() string { + if vol.localSocket != "" { + return vol.localSocket + } + + montDirHash := util.HashToInt32([]byte(vol.VolumeId)) + if montDirHash < 0 { + montDirHash = -montDirHash + } + + socket := fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", montDirHash) + vol.localSocket = socket + return socket +}