Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pods using the same volume share mount #68

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deploy/helm/seaweedfs-csi-driver/templates/daemonset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ spec:
volumeMounts:
- name: plugin-dir
mountPath: /csi
- name: plugins-dir
mountPath: /var/lib/kubelet/plugins
mountPropagation: "Bidirectional"
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
Expand All @@ -105,6 +108,10 @@ spec:
hostPath:
path: /var/lib/kubelet/plugins/{{ .Values.driverName }}
type: DirectoryOrCreate
- name: plugins-dir
hostPath:
path: /var/lib/kubelet/plugins
type: Directory
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods
Expand Down
23 changes: 1 addition & 22 deletions pkg/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (

"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -175,25 +172,7 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
}

func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
glog.V(0).Infof("Controller expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)

localSocket := GetLocalSocket(volumeID)
clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
defer clientConn.Close()

client := mount_pb.NewSeaweedMountClient(clientConn)
_, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
CollectionCapacity: req.CapacityRange.RequiredBytes,
})

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: req.CapacityRange.RequiredBytes,
}, err

return nil, status.Error(codes.Unimplemented, "")
}

func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
Expand Down
191 changes: 133 additions & 58 deletions pkg/driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"os"
"strings"
"sync"

"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"google.golang.org/grpc/status"
"k8s.io/utils/mount"
Expand All @@ -19,16 +17,19 @@ import (
type NodeServer struct {
Driver *SeaweedFsDriver
mounter mount.Interface

// information about the managed volumes
volumes sync.Map
volumeMutexes *KeyMutex
}

var _ = csi.NodeServer(&NodeServer{})

func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeID := req.GetVolumeId()
// mount the fs here
targetPath := req.GetTargetPath()

glog.V(0).Infof("NodePublishVolume volume %s to %s", volumeID, targetPath)
stagingTargetPath := req.GetStagingTargetPath()
glog.V(0).Infof("node stage volume %s to %s", volumeID, stagingTargetPath)

// Check arguments
if req.GetVolumeCapability() == nil {
Expand All @@ -40,25 +41,30 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if targetPath == "" {
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

// check whether it can be mounted
notMnt, err := checkMount(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !notMnt {
return &csi.NodePublishVolumeResponse{}, nil
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

// The volume has been staged.
if _, ok := ns.volumes.Load(volumeID); ok {
glog.V(0).Infof("volume %s has been staged", volumeID)
return &csi.NodeStageVolumeResponse{}, nil
}

volContext := req.GetVolumeContext()
mounter, err := newMounter(volumeID, req.GetReadonly(), ns.Driver, volContext)
readOnly := isVolumeReadOnly(req)

mounter, err := newMounter(volumeID, readOnly, ns.Driver, volContext)
if err != nil {
return nil, err
}
if err := mounter.Mount(targetPath); err != nil {

volume := NewVolume(volumeID, mounter)
if err := volume.Stage(stagingTargetPath); err != nil {
if os.IsPermission(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
Expand All @@ -68,25 +74,75 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.Internal, err.Error())
}

glog.V(0).Infof("volume %s successfully mounted to %s", volumeID, targetPath)
ns.volumes.Store(volumeID, volume)
glog.V(0).Infof("volume %s successfully staged to %s", volumeID, stagingTargetPath)

return &csi.NodeStageVolumeResponse{}, nil
}

func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()

glog.V(0).Infof("node publish volume %s to %s", volumeID, targetPath)

// Check arguments
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if !isValidVolumeCapabilities(ns.Driver.vcap, []*csi.VolumeCapability{req.GetVolumeCapability()}) {
// return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume, ok := ns.volumes.Load(volumeID)
if !ok {
return nil, status.Error(codes.FailedPrecondition, "volume hasn't been staged yet")
}

// When pod uses a volume in read-only mode, k8s will automatically
// mount the volume as a read-only file system.
if err := volume.(*Volume).Publish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

glog.V(0).Infof("volume %s successfully published to %s", volumeID, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}

func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {

volumeID := req.GetVolumeId()
targetPath := req.GetTargetPath()
glog.V(0).Infof("node unpublish volume %s from %s", volumeID, targetPath)

if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}

if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

if err := fuseUnmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

volume, ok := ns.volumes.Load(volumeID)
if !ok {
glog.Warningf("volume %s hasn't been published", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}

err := os.Remove(targetPath)
if err != nil && !os.IsNotExist(err) {
if err := volume.(*Volume).Unpublish(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -106,6 +162,13 @@ func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC

return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand All @@ -124,60 +187,72 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVol
}

func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeID := req.GetVolumeId()
stagingTargetPath := req.GetStagingTargetPath()
glog.V(0).Infof("node unstage volume %s from %s", volumeID, stagingTargetPath)

// Check arguments
if req.GetVolumeId() == "" {
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetStagingTargetPath() == "" {
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}

return &csi.NodeUnstageVolumeResponse{}, nil
}
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
// Check arguments
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
volume, ok := ns.volumes.Load(volumeID)
if !ok {
glog.Warningf("volume %s hasn't been staged", volumeID)
return &csi.NodeUnstageVolumeResponse{}, nil
}
if req.GetStagingTargetPath() == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")

if err := volume.(*Volume).Unstage(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
ns.volumes.Delete(volumeID)
}
return &csi.NodeStageVolumeResponse{}, nil

return &csi.NodeUnstageVolumeResponse{}, nil
}

func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, req.CapacityRange.RequiredBytes)
requiredBytes := req.GetCapacityRange().GetRequiredBytes()
glog.V(0).Infof("Node expand volume %s to %d bytes", volumeID, requiredBytes)

localSocket := GetLocalSocket(volumeID)
clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
volumeMutex := ns.getVolumeMutex(volumeID)
volumeMutex.Lock()
defer volumeMutex.Unlock()

if volume, ok := ns.volumes.Load(volumeID); ok {
if err := volume.(*Volume).Expand(requiredBytes); err != nil {
return nil, err
}
}
defer clientConn.Close()

client := mount_pb.NewSeaweedMountClient(clientConn)
_, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
CollectionCapacity: req.CapacityRange.RequiredBytes,
})
return &csi.NodeExpandVolumeResponse{}, nil
}

return &csi.NodeExpandVolumeResponse{
CapacityBytes: req.CapacityRange.RequiredBytes,
}, err
func (ns *NodeServer) getVolumeMutex(volumeID string) *sync.RWMutex {
return ns.volumeMutexes.GetMutex(volumeID)
}

func checkMount(targetPath string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.MkdirAll(targetPath, 0750); err != nil {
return false, err
}
notMnt = true
} else {
return false, err
func isVolumeReadOnly(req *csi.NodeStageVolumeRequest) bool {
mode := req.GetVolumeCapability().GetAccessMode().Mode

readOnlyModes := []csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
}

for _, readOnlyMode := range readOnlyModes {
if mode == readOnlyMode {
return true
}
}
return notMnt, nil

return false
}
Loading