Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Volume resize #104

Merged
merged 8 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/ceph/go-ceph v0.25.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembj
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 h1:nvj0OLI3YqYXer/kZD8Ri1aaunCxIEsOst1BVJswV0o=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/ceph/go-ceph v0.25.0 h1:sorUSVkm0F7tYKrv8afd1eJ8UXwxkYIio9o0xLTX59E=
github.com/ceph/go-ceph v0.25.0/go.mod h1:HoEJSH32bMcGzmsqJNmSVeYrrcetSxnMfVnGIXBE59Q=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
1 change: 1 addition & 0 deletions pkg/api/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type VolumeStatus struct {
Name string `json:"name,omitempty"`
Handle string `json:"handle,omitempty"`
State VolumeState `json:"state,omitempty"`
Size int64 `json:"size,omitempty"`
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
}

type EmptyDiskSpec struct {
Expand Down
113 changes: 85 additions & 28 deletions pkg/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"slices"
"sync"
"time"

"github.com/digitalocean/go-libvirt"
"github.com/go-logr/logr"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ironcore-dev/libvirt-provider/pkg/store"
"github.com/ironcore-dev/libvirt-provider/pkg/utils"
machinev1alpha1 "github.com/ironcore-dev/libvirt-provider/provider/api/v1alpha1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"libvirt.org/go/libvirtxml"
Expand Down Expand Up @@ -57,13 +59,21 @@ var (
)

type MachineReconcilerOptions struct {
GuestCapabilities guest.Capabilities
TCMallocLibPath string
ImageCache providerimage.Cache
Raw raw.Raw
Host providerhost.Host
VolumePluginManager *providervolume.PluginManager
NetworkInterfacePlugin providernetworkinterface.Plugin
GuestCapabilities guest.Capabilities
TCMallocLibPath string
ImageCache providerimage.Cache
Raw raw.Raw
Host providerhost.Host
VolumePluginManager *providervolume.PluginManager
NetworkInterfacePlugin providernetworkinterface.Plugin
VolumeEvents event.Source[*api.Machine]
ResyncDurationVolumeSize time.Duration
}

func setMachineReconcilerOptionsDefaults(o *MachineReconcilerOptions) {
if o.ResyncDurationVolumeSize == 0 {
o.ResyncDurationVolumeSize = time.Minute
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
}
}

func NewMachineReconciler(
Expand All @@ -73,6 +83,8 @@ func NewMachineReconciler(
machineEvents event.Source[*api.Machine],
opts MachineReconcilerOptions,
) (*MachineReconciler, error) {
setMachineReconcilerOptionsDefaults(&opts)

if libvirt == nil {
return nil, fmt.Errorf("must specify libvirt client")
}
Expand All @@ -86,18 +98,19 @@ func NewMachineReconciler(
}

return &MachineReconciler{
log: log,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
libvirt: libvirt,
machines: machines,
machineEvents: machineEvents,
guestCapabilities: opts.GuestCapabilities,
tcMallocLibPath: opts.TCMallocLibPath,
host: opts.Host,
imageCache: opts.ImageCache,
raw: opts.Raw,
volumePluginManager: opts.VolumePluginManager,
networkInterfacePlugin: opts.NetworkInterfacePlugin,
log: log,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
libvirt: libvirt,
machines: machines,
machineEvents: machineEvents,
guestCapabilities: opts.GuestCapabilities,
tcMallocLibPath: opts.TCMallocLibPath,
host: opts.Host,
imageCache: opts.ImageCache,
raw: opts.Raw,
volumePluginManager: opts.VolumePluginManager,
networkInterfacePlugin: opts.NetworkInterfacePlugin,
resyncDurationVolumeSize: opts.ResyncDurationVolumeSize,
}, nil
}

Expand All @@ -117,6 +130,8 @@ type MachineReconciler struct {

machines store.Store[*api.Machine]
machineEvents event.Source[*api.Machine]

resyncDurationVolumeSize time.Duration
}

func (r *MachineReconciler) Start(ctx context.Context) error {
Expand Down Expand Up @@ -154,12 +169,54 @@ func (r *MachineReconciler) Start(ctx context.Context) error {
}
}()

var wg sync.WaitGroup
wg.Add(1)
go func() {
sizeLogger := r.log.WithName("size")
lukas016 marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()

wait.UntilWithContext(ctx, func(innerCtx context.Context) {
machines, err := r.machines.List(innerCtx)
if err != nil {
sizeLogger.Error(err, "failed to list machines")
return
}

for _, machine := range machines {
var shouldEnqueue bool

for _, volume := range machine.Spec.Volumes {
plugin, err := r.volumePluginManager.FindPluginBySpec(volume)
if err != nil {
sizeLogger.Error(err, "failed to get volume plugin", "machineID", machine, "volume", volume.Name)
continue
}

volumeSize, err := plugin.GetSize(innerCtx, volume)
if err != nil {
sizeLogger.Error(err, "failed to get volume size", "machineID", machine, "volume", volume.Name)
continue
}

if lastVolumeSize := getLastVolumeSize(machine, volume.Name); volumeSize != ptr.Deref(lastVolumeSize, 0) {
sizeLogger.V(1).Info("Enqueue machine: Volume Size changed", "machineID", machine.ID, "lastSize", lastVolumeSize, "volumeSize", volumeSize)
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
shouldEnqueue = true
break
}
}

if shouldEnqueue {
r.queue.AddRateLimited(machine.ID)
}
}
}, r.resyncDurationVolumeSize)
}()

go func() {
<-ctx.Done()
r.queue.ShutDown()
}()

var wg sync.WaitGroup
for i := 0; i < workerSize; i++ {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -520,15 +577,15 @@ func (r *MachineReconciler) domainFor(
return nil, nil, nil, err
}

if err := r.setDomainResources(ctx, log, machine, domainDesc); err != nil {
if err := r.setDomainResources(machine, domainDesc); err != nil {
return nil, nil, nil, err
}

if err := r.setDomainPCIControllers(machine, domainDesc); err != nil {
if err := r.setDomainPCIControllers(domainDesc); err != nil {
return nil, nil, nil, err
}

if err := r.setTCMallocPath(machine, domainDesc); err != nil {
if err := r.setTCMallocPath(domainDesc); err != nil {
return nil, nil, nil, err
}

Expand All @@ -539,7 +596,7 @@ func (r *MachineReconciler) domainFor(
}

if ignitionSpec := machine.Spec.Ignition; ignitionSpec != nil {
if err := r.setDomainIgnition(ctx, machine, domainDesc); err != nil {
if err := r.setDomainIgnition(machine, domainDesc); err != nil {
return nil, nil, nil, err
}
}
Expand Down Expand Up @@ -590,7 +647,7 @@ func (r *MachineReconciler) setDomainMetadata(log logr.Logger, machine *api.Mach
return nil
}

func (r *MachineReconciler) setDomainResources(ctx context.Context, log logr.Logger, machine *api.Machine, domain *libvirtxml.Domain) error {
func (r *MachineReconciler) setDomainResources(machine *api.Machine, domain *libvirtxml.Domain) error {
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
// TODO: check if there is better or check possible while conversion to uint
domain.Memory = &libvirtxml.DomainMemory{
Value: uint(machine.Spec.MemoryBytes),
Expand All @@ -606,7 +663,7 @@ func (r *MachineReconciler) setDomainResources(ctx context.Context, log logr.Log

// TODO: Investigate hotplugging the pcie-root-port controllers with disks.
// Ref: https://libvirt.org/pci-hotplug.html#x86_64-q35
func (r *MachineReconciler) setDomainPCIControllers(machine *api.Machine, domain *libvirtxml.Domain) error {
func (r *MachineReconciler) setDomainPCIControllers(domain *libvirtxml.Domain) error {
domain.Devices.Controllers = append(domain.Devices.Controllers, libvirtxml.DomainController{
Type: "pci",
Model: "pcie-root",
Expand All @@ -622,7 +679,7 @@ func (r *MachineReconciler) setDomainPCIControllers(machine *api.Machine, domain
}

// setTCMallocPath enables support for the tcmalloc for the VMs.
func (r *MachineReconciler) setTCMallocPath(machine *api.Machine, domain *libvirtxml.Domain) error {
func (r *MachineReconciler) setTCMallocPath(domain *libvirtxml.Domain) error {
if r.tcMallocLibPath == "" {
return nil
}
Expand Down Expand Up @@ -692,7 +749,7 @@ func (r *MachineReconciler) setDomainImage(
return nil
}

func (r *MachineReconciler) setDomainIgnition(ctx context.Context, machine *api.Machine, domain *libvirtxml.Domain) error {
func (r *MachineReconciler) setDomainIgnition(machine *api.Machine, domain *libvirtxml.Domain) error {
if machine.Spec.Ignition == nil {
return fmt.Errorf("no IgnitionData found in the Machine %s", machine.GetID())
}
Expand Down
47 changes: 35 additions & 12 deletions pkg/controllers/machine_controller_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
providervolume "github.com/ironcore-dev/libvirt-provider/pkg/plugins/volume"
providerhost "github.com/ironcore-dev/libvirt-provider/pkg/providerhost"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
utilstrings "k8s.io/utils/strings"
"libvirt.org/go/libvirtxml"
)
Expand Down Expand Up @@ -94,6 +95,22 @@ func (r *MachineReconciler) machineVolumeMounter(machine *api.Machine) VolumeMou
}
}

func getVolumeStatus(machine *api.Machine, volumeID string) *api.VolumeStatus {
for _, volumeStatus := range machine.Status.VolumeStatus {
if volumeID == volumeStatus.Handle {
return &volumeStatus
}
}
return nil
}

func getLastVolumeSize(machine *api.Machine, volumeID string) *int64 {
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
if status := getVolumeStatus(machine, volumeID); status != nil && status.Size != 0 {
return ptr.To(status.Size)
}
return nil
}

func (r *MachineReconciler) attachDetachVolumes(ctx context.Context, log logr.Logger, machine *api.Machine, attacher VolumeAttacher) ([]api.VolumeStatus, error) {
mounter := r.machineVolumeMounter(machine)
specVolumes := r.listDesiredVolumes(machine)
Expand Down Expand Up @@ -130,17 +147,23 @@ func (r *MachineReconciler) attachDetachVolumes(ctx context.Context, log logr.Lo
var volumeStates []api.VolumeStatus
for _, volume := range specVolumes {
log.V(1).Info("Reconciling volume", "VolumeName", volume.Name)
volumeID, err := r.applyVolume(ctx, log, volume, mounter, attacher)
volumeID, volumeSize, err := r.applyVolume(ctx, log, volume, mounter, attacher)
if err != nil {
errs = append(errs, fmt.Errorf("[volume %s] error reconciling: %w", volume.Name, err))
} else {
log.V(1).Info("Successfully reconciled volume", "VolumeName", volume.Name, "VolumeID", volumeID)
volumeStates = append(volumeStates, api.VolumeStatus{
Name: volume.Name,
Handle: volumeID,
State: api.VolumeStateAttached,
})
continue
}

if lastVolumeSize := getLastVolumeSize(machine, volumeID); lastVolumeSize != nil && volumeSize != ptr.Deref(lastVolumeSize, 0) {
log.V(1).Info("Resizing volume", "lastSize", lastVolumeSize, "volumeSize", volumeSize)
lukasfrank marked this conversation as resolved.
Show resolved Hide resolved
}

log.V(1).Info("Successfully reconciled volume", "VolumeName", volume.Name, "VolumeID", volumeID)
volumeStates = append(volumeStates, api.VolumeStatus{
Name: volume.Name,
Handle: volumeID,
State: api.VolumeStateAttached,
Size: volumeSize,
})
}

if len(errs) > 0 {
Expand Down Expand Up @@ -617,7 +640,7 @@ func (r *MachineReconciler) applyVolume(
desiredVolume *api.VolumeSpec,
mountedVolumes VolumeMounter,
attacher VolumeAttacher,
) (string, error) {
) (string, int64, error) {
log.V(1).Info("Getting volume spec")

log.V(1).Info("Applying volume")
Expand All @@ -629,7 +652,7 @@ func (r *MachineReconciler) applyVolume(
return nil
})
if err != nil {
return "", fmt.Errorf("error applying volume mount: %w", err)
return "", 0, fmt.Errorf("error applying volume mount: %w", err)
}

log.V(1).Info("Ensuring volume is attached")
Expand All @@ -638,9 +661,9 @@ func (r *MachineReconciler) applyVolume(
Device: desiredVolume.Device,
Spec: *providerVolume,
}); err != nil && !errors.Is(err, ErrAttachedVolumeAlreadyExists) {
return "", fmt.Errorf("error ensuring volume is attached: %w", err)
return "", 0, fmt.Errorf("error ensuring volume is attached: %w", err)
}
return volumeID, nil
return volumeID, providerVolume.Size, nil
}

func (r *MachineReconciler) listDesiredVolumes(machine *api.Machine) map[string]*api.VolumeSpec {
Expand Down
3 changes: 0 additions & 3 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ func (s *ListWatchSource[E]) Start(ctx context.Context) error {
}
}()

ticker := time.NewTicker(s.resyncDuration)
defer ticker.Stop()

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
8 changes: 7 additions & 1 deletion pkg/plugins/volume/ceph/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func readVolumeAttributes(attrs map[string]string) (monitors []volume.CephMonito
func (p *plugin) Apply(ctx context.Context, spec *api.VolumeSpec, machine *api.Machine) (*volume.Volume, error) {
volumeData, err := p.getVolumeData(spec)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get volume data: %w", err)
}

var cephEncryption *volume.CephEncryption
Expand All @@ -138,6 +138,11 @@ func (p *plugin) Apply(ctx context.Context, spec *api.VolumeSpec, machine *api.M
}
}

volumeSize, err := p.GetSize(ctx, spec)
if err != nil {
return nil, fmt.Errorf("failed to get volume size: %w", err)
}

return &volume.Volume{
QCow2File: "",
RawFile: "",
Expand All @@ -151,6 +156,7 @@ func (p *plugin) Apply(ctx context.Context, spec *api.VolumeSpec, machine *api.M
Encryption: cephEncryption,
},
Handle: volumeData.handle,
Size: volumeSize,
}, nil
}

Expand Down
Loading
Loading