Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added patch logic for services #1960

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 63 additions & 44 deletions pkg/controllers/resources/services/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package services
import (
"context"
"errors"
"fmt"
"time"

"github.com/loft-sh/vcluster/pkg/controllers/syncer"
synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
syncertypes "github.com/loft-sh/vcluster/pkg/controllers/syncer/types"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/patcher"
"github.com/loft-sh/vcluster/pkg/specialservices"
"github.com/loft-sh/vcluster/pkg/util/translate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
kerrors "k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -54,7 +57,7 @@ func (s *serviceSyncer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Obj
return s.SyncToHostCreate(ctx, vObj, s.translate(ctx, vObj.(*corev1.Service)))
}

func (s *serviceSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
func (s *serviceSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (_ ctrl.Result, retErr error) {
vService := vObj.(*corev1.Service)
pService := pObj.(*corev1.Service)

Expand All @@ -63,53 +66,69 @@ func (s *serviceSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, v
return ctrl.Result{RequeueAfter: time.Second * 3}, nil
}

// check if backwards update is necessary
newService := s.translateUpdateBackwards(pService, vService)
if newService != nil {
if vService.Spec.ClusterIP != pService.Spec.ClusterIP {
newService.Spec.ClusterIPs = nil
ctx.Log.Infof("recreating virtual service %s/%s, because cluster ip differs %s != %s", vService.Namespace, vService.Name, pService.Spec.ClusterIP, vService.Spec.ClusterIP)

// recreate the new service with the correct cluster ip
_, err := recreateService(ctx, ctx.VirtualClient, newService)
if err != nil {
ctx.Log.Errorf("error creating virtual service: %s/%s", vService.Namespace, vService.Name)
return ctrl.Result{}, err
}
} else {
// update with correct ports
ctx.Log.Infof("update virtual service %s/%s, because spec is out of sync", vService.Namespace, vService.Name)
err := ctx.VirtualClient.Update(ctx, newService)
if err != nil {
return ctrl.Result{}, err
}
}
// check if recreating service is necessary
if vService.Spec.ClusterIP != pService.Spec.ClusterIP {
vService.Spec.ClusterIPs = nil
vService.Spec.ClusterIP = pService.Spec.ClusterIP
ctx.Log.Infof("recreating virtual service %s/%s, because cluster ip differs %s != %s", vService.Namespace, vService.Name, pService.Spec.ClusterIP, vService.Spec.ClusterIP)

// we will requeue anyways
return ctrl.Result{Requeue: true}, nil
}

// check if backwards status update is necessary
if !equality.Semantic.DeepEqual(vService.Status, pService.Status) {
newService := vService.DeepCopy()
newService.Status = pService.Status
ctx.Log.Infof("update virtual service %s/%s, because status is out of sync", vService.Namespace, vService.Name)
translator.PrintChanges(vService, newService, ctx.Log)
err := ctx.VirtualClient.Status().Update(ctx, newService)
// recreate the new service with the correct cluster ip
err := recreateService(ctx, ctx.VirtualClient, vService)
if err != nil {
ctx.Log.Errorf("error creating virtual service: %s/%s", vService.Namespace, vService.Name)
return ctrl.Result{}, err
}

return ctrl.Result{Requeue: true}, nil
return ctrl.Result{Requeue: true}, err
}

// forward update
newService = s.translateUpdate(ctx, pService, vService)
if newService != nil {
translator.PrintChanges(pService, newService, ctx.Log)
// patch the service
patch, err := patcher.NewSyncerPatcher(ctx, pService, vService)
if err != nil {
return ctrl.Result{}, fmt.Errorf("new syncer patcher: %w", err)
}
defer func() {
if err := patch.Patch(ctx, pService, vService); err != nil {
retErr = utilerrors.NewAggregate([]error{retErr, err})
}
if retErr != nil {
s.EventRecorder().Eventf(vObj, "Warning", "SyncError", "Error syncing: %v", retErr)
}
}()

// check
sourceService, targetService := synccontext.SyncSourceTarget(ctx, pService, vService)

// update spec bidirectionally
targetService.Spec.ExternalIPs = sourceService.Spec.ExternalIPs
targetService.Spec.LoadBalancerIP = sourceService.Spec.LoadBalancerIP
targetService.Spec.Ports = sourceService.Spec.Ports
targetService.Spec.PublishNotReadyAddresses = sourceService.Spec.PublishNotReadyAddresses
targetService.Spec.Type = sourceService.Spec.Type
targetService.Spec.ExternalName = sourceService.Spec.ExternalName
targetService.Spec.ExternalTrafficPolicy = sourceService.Spec.ExternalTrafficPolicy
targetService.Spec.SessionAffinity = sourceService.Spec.SessionAffinity
targetService.Spec.SessionAffinityConfig = sourceService.Spec.SessionAffinityConfig
targetService.Spec.LoadBalancerSourceRanges = sourceService.Spec.LoadBalancerSourceRanges
targetService.Spec.HealthCheckNodePort = sourceService.Spec.HealthCheckNodePort
facchettos marked this conversation as resolved.
Show resolved Hide resolved

// update status
vService.Status = pService.Status

// check annotations
_, updatedAnnotations, updatedLabels := s.TranslateMetadataUpdate(ctx, vObj, pObj)

// remove the ServiceBlockDeletion annotation if it's not needed
if vService.Spec.ClusterIP == pService.Spec.ClusterIP {
delete(updatedAnnotations, ServiceBlockDeletion)
}

pService.Annotations = updatedAnnotations
pService.Labels = updatedLabels

return s.SyncToHostUpdate(ctx, vObj, newService)
// translate selector
pService.Spec.Selector = translate.Default.TranslateLabels(vService.Spec.Selector, vService.Namespace, nil)
return ctrl.Result{}, nil
}

func isSwitchingFromExternalName(pService *corev1.Service, vService *corev1.Service) bool {
Expand Down Expand Up @@ -137,11 +156,11 @@ func (s *serviceSyncer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.
return syncer.DeleteHostObject(ctx, pObj, "virtual object was deleted")
}

func recreateService(ctx context.Context, virtualClient client.Client, vService *corev1.Service) (*corev1.Service, error) {
func recreateService(ctx context.Context, virtualClient client.Client, vService *corev1.Service) error {
// delete & create with correct ClusterIP
err := virtualClient.Delete(ctx, vService)
if err != nil && !kerrors.IsNotFound(err) {
return nil, err
return err
}

// make sure we don't set the resource version during create
Expand All @@ -155,10 +174,10 @@ func recreateService(ctx context.Context, virtualClient client.Client, vService
err = virtualClient.Create(ctx, vService)
if err != nil {
klog.Errorf("error recreating virtual service: %s/%s: %v", vService.Namespace, vService.Name, err)
return nil, err
return err
}

return vService, nil
return nil
}

var _ syncertypes.Starter = &serviceSyncer{}
Expand Down
25 changes: 15 additions & 10 deletions pkg/controllers/resources/services/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestSync(t *testing.T) {
Namespace: vObjectMeta.Namespace,
},
Spec: corev1.ServiceSpec{
ExternalName: "backwardExternal",
ExternalIPs: []string{"123:221:123:221"},
LoadBalancerIP: "123:213:123:213",
},
Expand Down Expand Up @@ -244,6 +245,7 @@ func TestSync(t *testing.T) {
{
Name: "test",
Port: 123,
NodePort: 567,
TargetPort: intstr.FromInt(10),
},
},
Expand Down Expand Up @@ -332,7 +334,8 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServicePorts1, vServicePorts1)
syncCtx.EventSource = synccontext.EventSourceHost
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServicePorts1.DeepCopy(), vServicePorts1.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -348,7 +351,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServicePorts2, vServicePorts1)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServicePorts2.DeepCopy(), vServicePorts1.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -364,7 +367,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdByServerService, updateForwardService)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdByServerService.DeepCopy(), updateForwardService.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -380,7 +383,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdService, baseService)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdService.DeepCopy(), baseService.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -398,6 +401,7 @@ func TestSync(t *testing.T) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
baseService := baseService.DeepCopy()
updateBackwardSpecService := updateBackwardSpecService.DeepCopy()
syncCtx.EventSource = synccontext.EventSourceHost
_, err := syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardSpecService, baseService)
assert.NilError(t, err)

Expand All @@ -408,7 +412,7 @@ func TestSync(t *testing.T) {
assert.NilError(t, err)

baseService.Spec.ExternalName = updateBackwardSpecService.Spec.ExternalName
_, err = syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardSpecService, baseService)
_, err = syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardSpecService.DeepCopy(), baseService.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -435,8 +439,9 @@ func TestSync(t *testing.T) {
err = ctx.PhysicalManager.GetClient().Get(ctx, types.NamespacedName{Namespace: updateBackwardSpecRecreateService.Namespace, Name: updateBackwardSpecRecreateService.Name}, updateBackwardSpecRecreateService)
assert.NilError(t, err)

syncCtx.EventSource = synccontext.EventSourceHost
baseService.Spec.ExternalName = updateBackwardSpecService.Spec.ExternalName
_, err = syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardSpecRecreateService, baseService)
_, err = syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardSpecRecreateService.DeepCopy(), baseService.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -452,7 +457,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardStatusService, baseService)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, updateBackwardStatusService.DeepCopy(), baseService.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -468,7 +473,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdService, baseService)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, createdService.DeepCopy(), baseService.DeepCopy())
assert.NilError(t, err)
},
},
Expand Down Expand Up @@ -556,7 +561,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServiceExternal, vServiceClusterIPFromExternal)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServiceExternal.DeepCopy(), vServiceClusterIPFromExternal.DeepCopy())
assert.NilError(t, err)
},
},
Expand All @@ -572,7 +577,7 @@ func TestSync(t *testing.T) {
},
Sync: func(ctx *synccontext.RegisterContext) {
syncCtx, syncer := generictesting.FakeStartSyncer(t, ctx, New)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServiceExternal, vServiceNodePortFromExternal)
_, err := syncer.(*serviceSyncer).Sync(syncCtx, pServiceExternal.DeepCopy(), vServiceNodePortFromExternal.DeepCopy())
assert.NilError(t, err)
},
},
Expand Down
Loading
Loading