Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: VolumeGroupSnapshot support #4502

Open
wants to merge 16 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/journal/voljournal.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func NewCSISnapshotJournal(suffix string) *Config {
cephSnapSourceKey: "csi.source",
namespace: "",
csiImageIDKey: "csi.imageid",
csiGroupIDKey: "csi.groupid",
encryptKMSKey: "csi.volume.encryptKMS",
encryptionType: "csi.volume.encryptionType",
ownerKey: "csi.volume.owner",
Expand Down Expand Up @@ -805,7 +806,8 @@ func (conn *Connection) StoreAttribute(ctx context.Context, pool, reservedUUID,

// StoreGroupID stores an groupID in omap.
func (conn *Connection) StoreGroupID(ctx context.Context, pool, reservedUUID, groupID string) error {
err := conn.StoreAttribute(ctx, pool, reservedUUID, conn.config.csiGroupIDKey, groupID)
err := setOMapKeys(ctx, conn, pool, conn.config.namespace, conn.config.cephUUIDDirectoryPrefix+reservedUUID,
map[string]string{conn.config.csiGroupIDKey: groupID})
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to store groupID %w", err)
}
Expand Down
15 changes: 15 additions & 0 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/features"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -123,6 +124,19 @@ func (r *Driver) Run(conf *util.Config) {
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
})

// GroupSnapGetInfo is used within the VolumeGroupSnapshot implementation
vgsSupported, vgsErr := features.SupportsGroupSnapGetInfo()
if vgsSupported {
r.cd.AddGroupControllerServiceCapabilities([]csi.GroupControllerServiceCapability_RPC_Type{
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT,
})
} else {
log.DefaultLog("not enabling VolumeGroupSnapshot service capability")
}
if vgsErr != nil {
log.ErrorLogMsg("failed detecting VolumeGroupSnapshot support: %v", vgsErr)
}
}

if k8s.RunsOnKubernetes() && conf.IsNodeServer {
Expand Down Expand Up @@ -178,6 +192,7 @@ func (r *Driver) Run(conf *util.Config) {
IS: r.ids,
CS: r.cs,
NS: r.ns,
GS: r.cs,
}
s.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
Expand Down
1 change: 1 addition & 0 deletions internal/rbd/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (rv *rbdVolume) AddToGroup(ctx context.Context, vg types.VolumeGroup) error
if err != nil {
return fmt.Errorf("failed to open image %q: %w", rv, err)
}
defer image.Close()

info, err := image.GetGroup()
if err != nil {
Expand Down
244 changes: 244 additions & 0 deletions internal/rbd/group/group_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
Copyright 2024 The Ceph-CSI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package group

import (
"context"
"fmt"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)

// volumeGroupSnapshot handles all requests for 'rbd group snap' operations.
type volumeGroupSnapshot struct {
commonVolumeGroup

// snapshots is a list of rbd-images that are part of the group. The ID
// of each snapshot is stored in the journal.
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved
snapshots []types.Snapshot

// snapshotsToFree contains Snapshots that were resolved during
// GetVolumeGroupSnapshot.
snapshotsToFree []types.Snapshot
}

// verify that volumeGroupSnapshot implements the VolumeGroupSnapshot interface.
var _ types.VolumeGroupSnapshot = &volumeGroupSnapshot{}

// GetVolumeGroupSnapshot initializes a new VolumeGroupSnapshot object that can
// be used to inspect and delete a group of snapshots that was created by a
// VolumeGroup.
func GetVolumeGroupSnapshot(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
snapshotResolver types.SnapshotResolver,
) (types.VolumeGroupSnapshot, error) {
cleanVGS := true
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved

vgs := &volumeGroupSnapshot{}
err := vgs.initCommonVolumeGroup(ctx, id, csiDriver, creds)
if err != nil {
return nil, fmt.Errorf("failed to initialize volume group snapshot with id %q: %w", id, err)
}
defer func() {
if cleanVGS {
vgs.Destroy(ctx)
}
}()

attrs, err := vgs.getVolumeGroupAttributes(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get volume attributes for id %q: %w", vgs, err)
}

var snapshots []types.Snapshot
// it is needed to free the previously allocated snapshots in case of an error
defer func() {
// snapshotsToFree is empty in case of an error, let .Destroy() handle it otherwise
if len(vgs.snapshotsToFree) > 0 {
return
}

for _, s := range snapshots {
s.Destroy(ctx)
}
}()
for snapID := range attrs.VolumeMap {
snap, err := snapshotResolver.GetSnapshotByID(ctx, snapID)
if err != nil {
// free the previously allocated snapshots
for _, s := range snapshots {
s.Destroy(ctx)
}

return nil, fmt.Errorf("failed to resolve snapshot image %q for volume group snapshot %q: %w", snapID, vgs, err)
}

log.DebugLog(ctx, "resolved snapshot id %q to snapshot %q", snapID, snap)

snapshots = append(snapshots, snap)
}

vgs.snapshots = snapshots
// all allocated snapshots need to be free'd at Destroy() time
vgs.snapshotsToFree = snapshots

cleanVGS = false
log.DebugLog(ctx, "GetVolumeGroupSnapshot(%s) returns %+v", id, *vgs)

return vgs, nil
}

// NewVolumeGroupSnapshot creates a new VolumeGroupSnapshot object with the
// given slice of Snapshots and adds the objectmapping to the journal.
func NewVolumeGroupSnapshot(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
snapshots []types.Snapshot,
) (types.VolumeGroupSnapshot, error) {
cleanupVGS := true

vgs := &volumeGroupSnapshot{}
err := vgs.initCommonVolumeGroup(ctx, id, csiDriver, creds)
if err != nil {
return nil, fmt.Errorf("failed to initialize volume group snapshot with id %q: %w", id, err)
}
defer func() {
if cleanupVGS {
vgs.Destroy(ctx)
}
}()

vgs.snapshots = snapshots
vgs.snapshotsToFree = snapshots

_ /* attrs */, err = vgs.getVolumeGroupAttributes(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get volume attributes for id %q: %w", vgs, err)
}

volumeMap := make(map[string]string, len(snapshots))

// add the CSI handles of each snapshot to the journal
for _, snapshot := range snapshots {
handle, snapErr := snapshot.GetID(ctx)
if snapErr != nil {
return nil, fmt.Errorf("failed to get ID for snapshot %q of volume group snapshot %q: %w", snapshot, vgs, snapErr)
}

name, snapErr := snapshot.GetName(ctx)
if snapErr != nil {
return nil, fmt.Errorf("failed to get name for snapshot %q of volume group snapshot %q: %w", snapshot, vgs, snapErr)
}

volumeMap[handle] = name

// store the CSI ID of the group in the snapshot attributes
snapErr = snapshot.SetVolumeGroup(ctx, creds, vgs.id)
if snapErr != nil {
return nil, fmt.Errorf("failed to set volume group ID %q for snapshot %q: %w", vgs.id, name, snapErr)
}
}

j, err := vgs.getJournal(ctx)
if err != nil {
return nil, err
}

err = j.AddVolumesMapping(ctx, vgs.pool, vgs.objectUUID, volumeMap)
if err != nil {
return nil, fmt.Errorf("failed to add volume mapping for volume group snapshot %q: %w", vgs, err)
}

// all done successfully, no need to cleanup the returned vgs
cleanupVGS = false

return vgs, nil
}

// ToCSI creates a CSI type for the VolumeGroupSnapshot.
func (vgs *volumeGroupSnapshot) ToCSI(ctx context.Context) (*csi.VolumeGroupSnapshot, error) {
snapshots, err := vgs.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots for volume group %q: %w", vgs, err)
}

csiSnapshots := make([]*csi.Snapshot, len(snapshots))
for i, snap := range snapshots {
csiSnapshots[i], err = snap.ToCSI(ctx)
if err != nil {
return nil, fmt.Errorf("failed to convert snapshot %q to CSI type: %w", snap, err)
}
}

id, err := vgs.GetID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get id for volume group snapshot %q: %w", vgs, err)
}

ct, err := vgs.GetCreationTime(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get creation time for volume group snapshot %q: %w", vgs, err)
}

return &csi.VolumeGroupSnapshot{
GroupSnapshotId: id,
Snapshots: csiSnapshots,
CreationTime: timestamppb.New(*ct),
ReadyToUse: true,
}, nil
}

// Destroy frees the resources used by the volumeGroupSnapshot.
func (vgs *volumeGroupSnapshot) Destroy(ctx context.Context) {
// free the volumes that were allocated in GetVolumeGroup()
if len(vgs.snapshotsToFree) > 0 {
for _, volume := range vgs.snapshotsToFree {
volume.Destroy(ctx)
}
vgs.snapshotsToFree = make([]types.Snapshot, 0)
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved
}

vgs.commonVolumeGroup.Destroy(ctx)
}

// Delete removes all snapshots and eventually the volume group snapshot.
func (vgs *volumeGroupSnapshot) Delete(ctx context.Context) error {
for _, snapshot := range vgs.snapshots {
log.DebugLog(ctx, "deleting snapshot image %q for volume group snapshot %q", snapshot, vgs)

err := snapshot.Delete(ctx)
if err != nil {
return fmt.Errorf("failed to delete snapshot %q as part of volume groups snapshot %q: %w", snapshot, vgs, err)
}
}

return vgs.commonVolumeGroup.Delete(ctx)
}

func (vgs *volumeGroupSnapshot) ListSnapshots(ctx context.Context) ([]types.Snapshot, error) {
return vgs.snapshots, nil
}
Loading