diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 0cc4ce58a1df..79631d59b26f 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -24,6 +24,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" corerbd "github.com/ceph/ceph-csi/internal/rbd" @@ -40,6 +41,12 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +// imageResyncMap is used to store the volumeID and image creation time for resync. +// The volumeID gets added to the map when the DemoteVolume RPC is called and +// removed from the map when the PromoteVolume RPC is called or +// DisableVolumeReplication RPC is called. +var imageResyncMap = sync.Map{} + // imageMirroringMode is used to indicate the mirroring mode for an RBD image. type imageMirroringMode string @@ -337,11 +344,15 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, if err != nil { return nil, getGRPCError(err) } + imageResyncMap.Delete(volumeID) return &replication.DisableVolumeReplicationResponse{}, nil default: return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) } + // remove the volume from the resync map as the image mirroring is disabled and + // during next Demote operation cephcsi will re-add the image to the sync map + imageResyncMap.Delete(volumeID) return &replication.DisableVolumeReplicationResponse{}, nil } @@ -439,6 +450,13 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, rbdVol) } + // remove the volume from the resync map as the image is promoted and + // during next Demote operation cephcsi will re-add the image to the sync + // map. + // This is required if someone does not do DisableVolumeReplication but try + // to toggle between PromoteVolume and DemoteVolume. + imageResyncMap.Delete(volumeID) + return &replication.PromoteVolumeResponse{}, nil } @@ -480,6 +498,18 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, err } + + err = rbdVol.GetImageInfo() + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) + } + // store the image creation time for resync + if _, ok := imageResyncMap.Load(volumeID); !ok { + imageResyncMap.Store(volumeID, rbdVol.CreatedAt.AsTime()) + } + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -538,6 +568,8 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirro // ResyncVolume extracts the RBD volume information from the volumeID, If the // image is present, mirroring is enabled and the image is in demoted state. // If yes it will resync the image to correct the split-brain. +// +//nolint:gocyclo,cyclop // TODO: reduce complexity func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { @@ -572,22 +604,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.GetImageMirroringInfo() + err = rbdVol.CheckImageIsPrimary() if err != nil { - // in case of Resync the image will get deleted and gets recreated and - // it takes time for this operation. log.ErrorLog(ctx, err.Error()) - return nil, status.Error(codes.Aborted, err.Error()) - } - - if mirroringInfo.State != librbd.MirrorImageEnabled { - return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled") - } - - // return error if the image is still primary - if mirroringInfo.Primary { - return nil, status.Error(codes.InvalidArgument, "image is in primary state") + return nil, getGRPCError(err) } mirrorStatus, err := rbdVol.GetImageMirroringStatus() @@ -637,14 +658,28 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, mirrorStatus) } - err = rbdVol.ResyncVol(localStatus, req.Force) + err = rbdVol.GetImageInfo() if err != nil { - return nil, getGRPCError(err) + return nil, status.Errorf(codes.Internal, "failed to get image info: %s", err.Error()) } - err = checkVolumeResyncStatus(localStatus) + ok, err := resyncVolume(ctx, volumeID, rbdVol.CreatedAt) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Errorf(codes.Internal, "failed to check image needs resync: %s", err.Error()) + } + + if req.Force && ok { + err = rbdVol.ResyncVol(localStatus) + if err != nil { + return nil, getGRPCError(err) + } + } + + if !ready { + err = checkVolumeResyncStatus(localStatus) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } } err = rbdVol.RepairResyncedImageID(ctx, ready) @@ -659,6 +694,29 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return resp, nil } +func resyncVolume( + ctx context.Context, + volumeID string, + currentImageTime *timestamppb.Timestamp, +) (bool, error) { + savedImageTime, ok := imageResyncMap.Load(volumeID) + if !ok { + return false, errors.New("failed to get image creation time") + } + + log.UsefulLog(ctx, "savedImageTime=%v, currentImageTime=%v", savedImageTime, currentImageTime) + st, ok := savedImageTime.(time.Time) + if !ok { + return false, errors.New("failed to convert image creation time") + } + + if st.Equal(currentImageTime.AsTime()) { + return true, nil + } + + return false, nil +} + func getGRPCError(err error) error { if err == nil { return status.Error(codes.OK, codes.OK.String()) @@ -854,20 +912,17 @@ func getLastSyncInfo(description string) (*replication.GetVolumeReplicationInfoR } func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error { - // we are considering 2 states to check resync started and resync completed - // as below. all other states will be considered as an error state so that - // cephCSI can return error message and volume replication operator can - // mark the VolumeReplication status as not resyncing for the volume. - - // If the state is Replaying means the resync is going on. - // Once the volume on remote cluster is demoted and resync - // is completed the image state will be moved to UNKNOWN. - // RBD mirror daemon should be always running on the primary cluster. - if !localStatus.Up || (localStatus.State != librbd.MirrorImageStatusStateReplaying && - localStatus.State != librbd.MirrorImageStatusStateUnknown) { - return fmt.Errorf( - "not resyncing. Local status: daemon up=%t image is in %q state", - localStatus.Up, localStatus.State) + // we are considering local snapshot timestamp to check if the resync is + // started or not, if we dont see local_snapshot_timestamp in the + // description of localStatus, we are returning error. if we see the local + // snapshot timestamp in the description we return resyncing started. + description := localStatus.Description + resp, err := getLastSyncInfo(description) + if err != nil { + return fmt.Errorf("failed to get last sync info: %w", err) + } + if resp.LastSyncTime == nil { + return errors.New("last sync time is nil") } return nil diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index 8a7ea8806ff7..967d1acd1e29 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -225,74 +225,23 @@ func TestCheckVolumeResyncStatus(t *testing.T) { wantErr bool }{ { - name: "test when rbd mirror daemon is not running", + name: "test when local_snapshot_timestamp is non zero", args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateUnknown, - Up: false, - }, - wantErr: true, - }, - { - name: "test for unknown state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateUnknown, - Up: true, - }, - wantErr: false, - }, - { - name: "test for error state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateError, - Up: true, - }, - wantErr: true, - }, - { - name: "test for syncing state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateSyncing, - Up: true, - }, - wantErr: true, - }, - { - name: "test for starting_replay state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateStartingReplay, - Up: true, - }, - wantErr: true, - }, - { - name: "test for replaying state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateReplaying, - Up: true, + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, }, wantErr: false, }, { - name: "test for stopping_replay state", - args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateStoppingReplay, - Up: true, - }, - wantErr: true, - }, - { - name: "test for stopped state", + name: "test when local_snapshot_timestamp is zero", args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusStateStopped, - Up: true, + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":0,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, }, wantErr: true, }, { - name: "test for invalid state", + name: "test when local_snapshot_timestamp is not present", args: librbd.SiteMirrorImageStatus{ - State: librbd.MirrorImageStatusState(100), - Up: true, + Description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, }, wantErr: true, }, diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..9ddb649ef7c6 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -227,7 +227,7 @@ func (ns *NodeServer) populateRbdVol( } }() // get the image details from the ceph cluster. - err = rv.getImageInfo() + err = rv.GetImageInfo() if err != nil { log.ErrorLog(ctx, "failed to get image details %s: %v", rv, err) diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 55c6a033ebfd..a746d6dfa721 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -166,7 +166,7 @@ func checkSnapCloneExists( } vol.ReservedID = snapUUID // Fetch on-disk image attributes - err = vol.getImageInfo() + err = vol.GetImageInfo() if err != nil { if errors.Is(err, ErrImageNotFound) { err = parentVol.deleteSnapshot(ctx, rbdSnap) @@ -292,7 +292,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er // save it for size checks before fetching image data requestSize := rv.VolSize //nolint:ifshort // FIXME: rename and split function into helpers // Fetch on-disk image attributes and compare against request - err = rv.getImageInfo() + err = rv.GetImageInfo() if err != nil { if errors.Is(err, ErrImageNotFound) { // Need to check cloned info here not on createvolume, diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 64d786d734da..09068595818b 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -173,7 +173,7 @@ type rbdVolume struct { // Network namespace file path to execute nsenter command NetNamespaceFilePath string // RequestedVolSize has the size of the volume requested by the user and - // this value will not be updated when doing getImageInfo() on rbdVolume. + // this value will not be updated when doing GetImageInfo() on rbdVolume. RequestedVolSize int64 DisableInUseChecks bool readOnly bool @@ -716,7 +716,7 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { return depth, err } - err = vol.getImageInfo() + err = vol.GetImageInfo() // FIXME: create and destroy the vol inside the loop. // see https://github.com/ceph/ceph-csi/pull/1838#discussion_r598530807 vol.ioctx.Destroy() @@ -922,13 +922,13 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6 return false, err } - err = rbdImg.getImageInfo() + err = rbdImg.GetImageInfo() // FIXME: create and destroy the vol inside the loop. // see https://github.com/ceph/ceph-csi/pull/1838#discussion_r598530807 rbdImg.ioctx.Destroy() rbdImg.ioctx = nil if err != nil { - // call to getImageInfo returns the parent name even if the parent + // call to GetImageInfo returns the parent name even if the parent // is in the trash, when we try to open the parent image to get its // information it fails because it is already in trash. We should // treat error as nil if the parent is not found. @@ -1057,7 +1057,7 @@ func updateSnapshotDetails(rbdSnap *rbdSnapshot) error { } defer vol.Destroy() - err = vol.getImageInfo() + err = vol.GetImageInfo() if err != nil { return err } @@ -1153,7 +1153,7 @@ func generateVolumeFromVolumeID( return rbdVol, err } } - err = rbdVol.getImageInfo() + err = rbdVol.GetImageInfo() return rbdVol, err } @@ -1501,7 +1501,7 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot( }() // get image latest information - err = rv.getImageInfo() + err = rv.GetImageInfo() if err != nil { return fmt.Errorf("failed to get image info of %s: %w", rv, err) } @@ -1558,9 +1558,9 @@ func (rv *rbdVolume) setImageOptions(ctx context.Context, options *librbd.ImageO return nil } -// getImageInfo queries rbd about the given image and returns its metadata, and returns +// GetImageInfo queries rbd about the given image and returns its metadata, and returns // ErrImageNotFound if provided image is not found. -func (ri *rbdImage) getImageInfo() error { +func (ri *rbdImage) GetImageInfo() error { image, err := ri.open() if err != nil { return err @@ -1607,7 +1607,7 @@ func (ri *rbdImage) getImageInfo() error { // getParent returns parent image if it exists. func (ri *rbdImage) getParent() (*rbdImage, error) { - err := ri.getImageInfo() + err := ri.GetImageInfo() if err != nil { return nil, err } @@ -1623,7 +1623,7 @@ func (ri *rbdImage) getParent() (*rbdImage, error) { parentImage.RadosNamespace = ri.RadosNamespace parentImage.RbdImageName = ri.ParentName - err = parentImage.getImageInfo() + err = parentImage.GetImageInfo() if err != nil { return nil, err } diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index 29a3ff12c72a..2b750626f809 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -19,31 +19,19 @@ package rbd import ( "context" "fmt" - "strings" librbd "github.com/ceph/go-ceph/rbd" ) -func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error { - if resyncRequired(localStatus) { - // If the force option is not set return the error message to retry - // with Force option. - if !force { - return fmt.Errorf("%w: image is in %q state, description (%s). Force resync to recover volume", - ErrFailedPrecondition, localStatus.State, localStatus.Description) - } - err := rv.resyncImage() - if err != nil { - return fmt.Errorf("%w: failed to resync image: %w", ErrResyncImageFailed, err) - } - - // If we issued a resync, return a non-final error as image needs to be recreated - // locally. Caller retries till RBD syncs an initial version of the image to - // report its status in the resync request. - return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) +func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus) error { + if err := rv.resyncImage(); err != nil { + return fmt.Errorf("%w: failed to resync image: %w", ErrResyncImageFailed, err) } - return nil + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) } // repairResyncedImageID updates the existing image ID with new one. @@ -66,22 +54,6 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro return rv.repairImageID(ctx, j, true) } -// resyncRequired returns true if local image is in split-brain state and image -// needs resync. -func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool { - // resync is required if the image is in error state or the description - // contains split-brain message. - // In some corner cases like `re-player shutdown` the local image will not - // be in an error state. It would be also worth considering the `description` - // field to make sure about split-brain. - if localStatus.State == librbd.MirrorImageStatusStateError || - strings.Contains(localStatus.Description, "split-brain") { - return true - } - - return false -} - func (rv *rbdVolume) DisableVolumeReplication( mirroringInfo *librbd.MirrorImageInfo, force bool, @@ -126,3 +98,23 @@ func (rv *rbdVolume) DisableVolumeReplication( return nil } + +func (rv *rbdVolume) CheckImageIsPrimary() error { + mirroringInfo, err := rv.GetImageMirroringInfo() + if err != nil { + // in case of Resync the image will get deleted and gets recreated and + // it takes time for this operation. + return fmt.Errorf("%w: %w", ErrAborted, err) + } + + if mirroringInfo.State != librbd.MirrorImageEnabled { + return fmt.Errorf("%w: image mirroring is not enabled", ErrInvalidArgument) + } + + // return error if the image is still primary + if mirroringInfo.Primary { + return fmt.Errorf("%w: image is in primary state", ErrInvalidArgument) + } + + return nil +}