Skip to content

Commit

Permalink
Merge pull request #1925 from FabianKramm/mappings
Browse files Browse the repository at this point in the history
refactor resource host to virtual mappings
  • Loading branch information
FabianKramm authored Jul 18, 2024
2 parents e50bf3c + 26e62df commit c082311
Show file tree
Hide file tree
Showing 153 changed files with 2,555 additions and 1,746 deletions.
16 changes: 10 additions & 6 deletions cmd/vcluster/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/setup"
"github.com/loft-sh/vcluster/pkg/telemetry"
util "github.com/loft-sh/vcluster/pkg/util/context"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -105,6 +106,12 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
return fmt.Errorf("start integrations: %w", err)
}

// start managers
syncers, err := setup.StartManagers(util.ToRegisterContext(controllerCtx))
if err != nil {
return fmt.Errorf("start managers: %w", err)
}

// start proxy
err = setup.StartProxy(controllerCtx)
if err != nil {
Expand All @@ -126,17 +133,14 @@ func ExecuteStart(ctx context.Context, options *StartOptions) error {
}
}

if err := pro.ConnectToPlatform(
ctx,
vConfig,
controllerCtx.VirtualManager,
); err != nil {
// connect to vCluster platform if configured
if err := pro.ConnectToPlatform(ctx, vConfig, controllerCtx.VirtualManager); err != nil {
return fmt.Errorf("connect to platform: %w", err)
}

// start leader election + controllers
err = StartLeaderElection(controllerCtx, func() error {
return setup.StartControllers(controllerCtx)
return setup.StartControllers(controllerCtx, syncers)
})
if err != nil {
return fmt.Errorf("start controllers: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions docs/pages/advanced-topics/plugins-development.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ The `SyncDown` function mentioned above is called by the vCluster SDK when a giv

```
func (s *carSyncer) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostCreate(ctx, vObj, s.TranslateMetadata(ctx.Context, vObj).(*examplev1.Car))
return s.SyncToHostCreate(ctx, vObj, s.TranslateMetadata(ctx, vObj).(*examplev1.Car))
}
func (s *carSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx.Context, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
}
```
The `TranslateMetadata` function used above produces a Car object that will be created in the host cluster. It is a deep copy of the Car from vCluster, but with certain metadata modifications - the name and labels are transformed, some vCluster labels and annotations are added, many metadata fields are stripped (uid, resourceVersion, etc.).
Expand All @@ -109,7 +109,7 @@ Next, we need to implement code that will handle the updates of the Car. When a
```
func (s *carSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx.Context, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
return s.SyncToHostUpdate(ctx, vObj, s.translateUpdate(ctx, pObj.(*examplev1.Car), vObj.(*examplev1.Car)))
}
func (s *carSyncer) translateUpdate(ctx context.Context, pObj, vObj *examplev1.Car) *examplev1.Car {
Expand Down
10 changes: 5 additions & 5 deletions pkg/apiservice/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func StartAPIServiceProxy(ctx *config.ControllerContext, targetServiceName, targ
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
// we only allow traffic to discovery paths
if !isAPIServiceProxyPathAllowed(request.Method, request.URL.Path) {
klog.FromContext(ctx.Context).Info("Denied access to api service proxy at path", "path", request.URL.Path, "method", request.Method)
klog.FromContext(ctx).Info("Denied access to api service proxy at path", "path", request.URL.Path, "method", request.Method)
responsewriters.ErrorNegotiated(
kerrors.NewForbidden(metav1.SchemeGroupVersion.WithResource("proxy").GroupResource(), "proxy", fmt.Errorf("paths other than discovery paths are not allowed")),
s,
Expand All @@ -187,7 +187,7 @@ func StartAPIServiceProxy(ctx *config.ControllerContext, targetServiceName, targ
klog.Infof("Listening apiservice proxy on localhost:%d...", hostPort)
err = server.ListenAndServeTLS(tlsCertFile, tlsKeyFile)
if err != nil {
klog.FromContext(ctx.Context).Error(err, "error listening for apiservice proxy and serve tls")
klog.FromContext(ctx).Error(err, "error listening for apiservice proxy and serve tls")
os.Exit(1)
}
}()
Expand Down Expand Up @@ -232,14 +232,14 @@ func isAPIServiceProxyPathAllowed(method, path string) bool {
}

func RegisterAPIService(ctx *config.ControllerContext, serviceName string, hostPort int, groupVersion schema.GroupVersion) error {
return applyOperation(ctx.Context, createOperation(ctx, serviceName, hostPort, groupVersion))
return applyOperation(ctx, createOperation(ctx, serviceName, hostPort, groupVersion))
}

func DeregisterAPIService(ctx *config.ControllerContext, groupVersion schema.GroupVersion) error {
// check if the api service should get created
exists := checkExistingAPIService(ctx.Context, ctx.VirtualManager.GetClient(), groupVersion)
exists := checkExistingAPIService(ctx, ctx.VirtualManager.GetClient(), groupVersion)
if exists {
return applyOperation(ctx.Context, deleteOperation(ctx, groupVersion))
return applyOperation(ctx, deleteOperation(ctx, groupVersion))
}

return nil
Expand Down
16 changes: 2 additions & 14 deletions pkg/config/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"net/http"

"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ControllerContext struct {
Context context.Context
context.Context

LocalManager ctrl.Manager
VirtualManager ctrl.Manager
Expand All @@ -34,17 +33,6 @@ type ControllerContext struct {
AcquiredLeaderHooks []Hook
}

type Filter func(http.Handler, Clients) http.Handler
type Filter func(http.Handler, *ControllerContext) http.Handler

type Hook func(ctx *ControllerContext) error

type Clients struct {
UncachedVirtualClient client.Client
CachedVirtualClient client.Client

UncachedHostClient client.Client
CachedHostClient client.Client

HostConfig *rest.Config
VirtualConfig *rest.Config
}
4 changes: 4 additions & 0 deletions pkg/constants/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const (
PausedReplicasAnnotation = "loft.sh/paused-replicas"
PausedDateAnnotation = "loft.sh/paused-date"

HostClusterPersistentVolumeAnnotation = "vcluster.loft.sh/host-pv"

HostClusterVSCAnnotation = "vcluster.loft.sh/host-volumesnapshotcontent"

// NodeSuffix is the dns suffix for our nodes
NodeSuffix = "nodes.vcluster.com"

Expand Down
7 changes: 7 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

const (
K8sKineEndpoint = "unix:///data/kine.sock"
K3sKineEndpoint = "unix:///data/server/kine.sock"
K0sKineEndpoint = "unix:///run/k0s/kine/kine.sock:2379"
)
1 change: 0 additions & 1 deletion pkg/constants/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const (
IndexByPhysicalName = "IndexByPhysicalName"
IndexByVirtualName = "IndexByVirtualName"
IndexByAssigned = "IndexByAssigned"
IndexByStorageClass = "IndexByStorageClass"
IndexByIngressSecret = "IndexByIngressSecret"
IndexByPodSecret = "IndexByPodSecret"
IndexByConfigMap = "IndexByConfigMap"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/deploy/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func RegisterInitManifestsController(controllerCtx *config.ControllerContext) er
return err
}

helmBinaryPath, err := helmdownloader.GetHelmBinaryPath(controllerCtx.Context, log.GetInstance())
helmBinaryPath, err := helmdownloader.GetHelmBinaryPath(controllerCtx, log.GetInstance())
if err != nil {
return err
}
Expand All @@ -37,7 +37,7 @@ func RegisterInitManifestsController(controllerCtx *config.ControllerContext) er

go func() {
for {
result, err := controller.Apply(controllerCtx.Context, controllerCtx.Config)
result, err := controller.Apply(controllerCtx, controllerCtx.Config)
if err != nil {
klog.Errorf("Error deploying manifests: %v", err)
time.Sleep(time.Second * 10)
Expand Down
30 changes: 18 additions & 12 deletions pkg/controllers/generic/export_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/mappings/generic"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -39,7 +40,7 @@ func CreateExporters(ctx *config.ControllerContext) error {

for _, exportConfig := range exporterConfig.Exports {
_, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster(
registerCtx.Context,
registerCtx,
registerCtx.PhysicalManager.GetConfig(),
registerCtx.VirtualManager.GetConfig(),
schema.FromAPIVersionAndKind(exportConfig.APIVersion, exportConfig.Kind))
Expand Down Expand Up @@ -98,12 +99,17 @@ func createExporterFromConfig(ctx *synccontext.RegisterContext, config *vcluster

gvk := schema.FromAPIVersionAndKind(config.APIVersion, config.Kind)
controllerID := fmt.Sprintf("%s/%s/GenericExport", strings.ToLower(gvk.Kind), strings.ToLower(gvk.Group))
mapper, err := generic.NewMapper(ctx, obj, translate.Default.PhysicalName)
if err != nil {
return nil, err
}

return &exporter{
ObjectPatcher: &exportPatcher{
config: config,
gvk: gvk,
},
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj),
NamespacedTranslator: translator.NewNamespacedTranslator(ctx, controllerID, obj, mapper),

patcher: NewPatcher(ctx.VirtualManager.GetClient(), ctx.PhysicalManager.GetClient(), hasStatusSubresource, log.New(controllerID)),
gvk: gvk,
Expand All @@ -123,7 +129,7 @@ func BuildCustomExporter(
replaceWhenInvalid bool,
) (syncertypes.Object, error) {
_, hasStatusSubresource, err := translate.EnsureCRDFromPhysicalCluster(
registerCtx.Context,
registerCtx,
registerCtx.PhysicalManager.GetConfig(),
registerCtx.VirtualManager.GetConfig(),
gvk)
Expand Down Expand Up @@ -165,7 +171,7 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object)

// apply object to physical cluster
ctx.Log.Infof("Create physical %s %s/%s, since it is missing, but virtual object exists", f.gvk.Kind, vObj.GetNamespace(), vObj.GetName())
pObj, err := f.patcher.ApplyPatches(ctx.Context, vObj, nil, f)
pObj, err := f.patcher.ApplyPatches(ctx, vObj, nil, f)
if kerrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
Expand All @@ -181,7 +187,7 @@ func (f *exporter) SyncToHost(ctx *synccontext.SyncContext, vObj client.Object)
}

// wait here for vObj to be created
err = wait.PollUntilContextTimeout(ctx.Context, time.Millisecond*10, time.Second, true, func(pollContext context.Context) (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(pollContext context.Context) (done bool, err error) {
err = ctx.PhysicalClient.Get(pollContext, types.NamespacedName{
Namespace: pObj.GetNamespace(),
Name: pObj.GetName(),
Expand All @@ -207,7 +213,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
// check if virtual object is not matching anymore
if !f.objectMatches(vObj) {
ctx.Log.Infof("delete physical %s %s/%s, because it is not used anymore", f.gvk.Kind, pObj.GetNamespace(), pObj.GetName())
err := ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{
err := ctx.PhysicalClient.Delete(ctx, pObj, &client.DeleteOptions{
GracePeriodSeconds: &[]int64{0}[0],
})
if err != nil {
Expand All @@ -222,12 +228,12 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
if vObj.GetDeletionTimestamp() != nil || pObj.GetDeletionTimestamp() != nil {
if pObj.GetDeletionTimestamp() == nil {
ctx.Log.Infof("delete physical object %s/%s, because the virtual object is being deleted", pObj.GetNamespace(), pObj.GetName())
if err := ctx.PhysicalClient.Delete(ctx.Context, pObj); err != nil {
if err := ctx.PhysicalClient.Delete(ctx, pObj); err != nil {
return ctrl.Result{}, err
}
} else if vObj.GetDeletionTimestamp() == nil {
ctx.Log.Infof("delete virtual object %s/%s, because physical object %s/%s is being deleted", vObj.GetNamespace(), vObj.GetName(), pObj.GetNamespace(), pObj.GetName())
if err := ctx.VirtualClient.Delete(ctx.Context, vObj); err != nil {
if err := ctx.VirtualClient.Delete(ctx, vObj); err != nil {
return ctrl.Result{}, nil
}
}
Expand All @@ -236,7 +242,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
}

// apply reverse patches
result, err := f.patcher.ApplyReversePatches(ctx.Context, vObj, pObj, f)
result, err := f.patcher.ApplyReversePatches(ctx, vObj, pObj, f)
if err != nil {
if kerrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
Expand All @@ -257,14 +263,14 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
}

// apply patches
pObj, err = f.patcher.ApplyPatches(ctx.Context, vObj, pObj, f)
pObj, err = f.patcher.ApplyPatches(ctx, vObj, pObj, f)
err = IgnoreAcceptableErrors(err)
if err != nil {
// when invalid, auto delete and recreate to recover
if kerrors.IsInvalid(err) && f.replaceWhenInvalid {
// Replace the object
ctx.Log.Infof("Replace physical object, because apply failed: %v", err)
err = ctx.PhysicalClient.Delete(ctx.Context, pObj, &client.DeleteOptions{
err = ctx.PhysicalClient.Delete(ctx, pObj, &client.DeleteOptions{
GracePeriodSeconds: &[]int64{0}[0],
})
if err != nil {
Expand All @@ -289,7 +295,7 @@ func (f *exporter) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj c
var _ syncertypes.ToVirtualSyncer = &exporter{}

func (f *exporter) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) {
isManaged, err := f.NamespacedTranslator.IsManaged(ctx.Context, pObj)
isManaged, err := f.NamespacedTranslator.IsManaged(ctx, pObj)
if err != nil {
return ctrl.Result{}, err
} else if !isManaged {
Expand Down
Loading

0 comments on commit c082311

Please sign in to comment.